You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2021/09/21 00:38:29 UTC

[GitHub] [spark] ueshin opened a new pull request #34053: [SPARK-36813][SQL][PYTHON] Propose an infrastructure of as-of join and imlement ps.merge_asof

ueshin opened a new pull request #34053:
URL: https://github.com/apache/spark/pull/34053


   ### What changes were proposed in this pull request?
   
   Proposes an infrastructure for as-of join and implements `ps.merge_asof` here.
   
   1. Introduce `AsOfJoin` logical plan
   2. Rewrite the plan in the optimize phase:
   
   - From something like (SQL syntax is not determied):
   
   ```sql
   SELECT * FROM left ASOF JOIN right ON (condition, as_of on(left.t, right.t), tolerance)
   ```
   
   - To
   
   ```sql
   SELECT left.*, __right__.*
   FROM (
        SELECT
             left.*,
             (
                  SELECT MIN_BY(STRUCT(right.*), left.t - right.t)
                  FROM right
                  WHERE condition AND left.t >= right.t AND right.t >= left.t - tolerance
             ) as __right__
        FROM left
        )
   ```
   
   3. The rewritten scalar-subquery will be handled by the existing decorrelation framework.
   
   Note: APIs on SQL DataFrames and SQL syntax are TBD ([SPARK-22947](https://issues.apache.org/jira/browse/SPARK-22947)), although there are temporary APIs added here.
   
   ### Why are the changes needed?
   
   Pandas' `merge_asof` or as-of join for SQL/DataFrame is useful for time series analysis.
   
   ### Does this PR introduce _any_ user-facing change?
   
   Yes. `ps.merge_asof` can be used.
   
   ```py
   >>> quotes
                        time ticker     bid     ask
   0 2016-05-25 13:30:00.023   GOOG  720.50  720.93
   1 2016-05-25 13:30:00.023   MSFT   51.95   51.96
   2 2016-05-25 13:30:00.030   MSFT   51.97   51.98
   3 2016-05-25 13:30:00.041   MSFT   51.99   52.00
   4 2016-05-25 13:30:00.048   GOOG  720.50  720.93
   5 2016-05-25 13:30:00.049   AAPL   97.99   98.01
   6 2016-05-25 13:30:00.072   GOOG  720.50  720.88
   7 2016-05-25 13:30:00.075   MSFT   52.01   52.03
   
   >>> trades
                        time ticker   price  quantity
   0 2016-05-25 13:30:00.023   MSFT   51.95        75
   1 2016-05-25 13:30:00.038   MSFT   51.95       155
   2 2016-05-25 13:30:00.048   GOOG  720.77       100
   3 2016-05-25 13:30:00.048   GOOG  720.92       100
   4 2016-05-25 13:30:00.048   AAPL   98.00       100
   
   >>> ps.merge_asof(
   ...    trades, quotes, on="time", by="ticker"
   ... ).sort_values(["time", "ticker", "price"]).reset_index(drop=True)
                        time ticker   price  quantity     bid     ask
   0 2016-05-25 13:30:00.023   MSFT   51.95        75   51.95   51.96
   1 2016-05-25 13:30:00.038   MSFT   51.95       155   51.97   51.98
   2 2016-05-25 13:30:00.048   AAPL   98.00       100     NaN     NaN
   3 2016-05-25 13:30:00.048   GOOG  720.77       100  720.50  720.93
   4 2016-05-25 13:30:00.048   GOOG  720.92       100  720.50  720.93
   
   >>> ps.merge_asof(
   ...     trades,
   ...     quotes,
   ...     on="time",
   ...     by="ticker",
   ...     tolerance=F.expr("INTERVAL 2 MILLISECONDS")  # pd.Timedelta("2ms")
   ... ).sort_values(["time", "ticker", "price"]).reset_index(drop=True)
                        time ticker   price  quantity     bid     ask
   0 2016-05-25 13:30:00.023   MSFT   51.95        75   51.95   51.96
   1 2016-05-25 13:30:00.038   MSFT   51.95       155     NaN     NaN
   2 2016-05-25 13:30:00.048   AAPL   98.00       100     NaN     NaN
   3 2016-05-25 13:30:00.048   GOOG  720.77       100  720.50  720.93
   4 2016-05-25 13:30:00.048   GOOG  720.92       100  720.50  720.93
   ```
   
   Note: As `IntervalType` literal is not supported yet, we have to specify the `IntervalType` value with `F.expr` as a workaround.
   
   ### How was this patch tested?
   
   Added tests.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a change in pull request #34053: [SPARK-36813][SQL][PYTHON] Propose an infrastructure of as-of join and imlement ps.merge_asof

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on a change in pull request #34053:
URL: https://github.com/apache/spark/pull/34053#discussion_r716520340



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
##########
@@ -1597,3 +1597,119 @@ case class LateralJoin(
     copy(left = newChild)
   }
 }
+
+/**
+ * A logical plan for as-of join.
+ */
+case class AsOfJoin(
+    left: LogicalPlan,
+    right: LogicalPlan,
+    asOfCondition: Expression,
+    condition: Option[Expression],
+    joinType: JoinType,
+    orderExpression: Expression,
+    toleranceAssertion: Option[Expression]) extends BinaryNode {
+
+  require(Seq(Inner, LeftOuter).contains(joinType),
+    s"Unsupported as-of join type $joinType")
+
+  override protected def stringArgs: Iterator[Any] = super.stringArgs.take(5)
+
+  override def output: Seq[Attribute] = {
+    joinType match {
+      case LeftOuter =>
+        left.output ++ right.output.map(_.withNullability(true))
+      case _ =>
+        left.output ++ right.output
+    }
+  }
+
+  def duplicateResolved: Boolean = left.outputSet.intersect(right.outputSet).isEmpty
+
+  override lazy val resolved: Boolean = {
+    childrenResolved &&
+      expressions.forall(_.resolved) &&
+      duplicateResolved &&
+      asOfCondition.dataType == BooleanType &&
+      condition.forall(_.dataType == BooleanType) &&
+      toleranceAssertion.forall { assertion =>
+        assertion.foldable && assertion.eval().asInstanceOf[Boolean]
+      }
+  }
+
+  final override val nodePatterns: Seq[TreePattern] = Seq(AS_OF_JOIN)
+
+  override protected def withNewChildrenInternal(
+      newLeft: LogicalPlan, newRight: LogicalPlan): AsOfJoin = {
+    copy(left = newLeft, right = newRight)
+  }
+}
+
+object AsOfJoin {
+
+  def apply(
+      left: LogicalPlan,
+      right: LogicalPlan,
+      leftAsOf: Expression,
+      rightAsOf: Expression,
+      condition: Option[Expression],
+      joinType: JoinType,
+      tolerance: Option[Expression],
+      allowExactMatches: Boolean,
+      direction: AsOfJoinDirection): AsOfJoin = {
+    val asOfCond = makeAsOfCond(leftAsOf, rightAsOf, tolerance, allowExactMatches, direction)
+    val orderingExpr = makeOrderingExpr(leftAsOf, rightAsOf, direction)
+    AsOfJoin(left, right, asOfCond, condition, joinType,
+      orderingExpr, tolerance.map(t => GreaterThanOrEqual(t, Literal.default(t.dataType))))
+  }
+
+  private def makeAsOfCond(
+      leftAsOf: Expression,
+      rightAsOf: Expression,
+      tolerance: Option[Expression],
+      allowExactMatches: Boolean,
+      direction: AsOfJoinDirection): Expression = {
+    val base = (allowExactMatches, direction) match {
+      case (true, Backward) => GreaterThanOrEqual(leftAsOf, rightAsOf)
+      case (false, Backward) => GreaterThan(leftAsOf, rightAsOf)
+      case (true, Forward) => LessThanOrEqual(leftAsOf, rightAsOf)
+      case (false, Forward) => LessThan(leftAsOf, rightAsOf)
+      case (true, Nearest) => Literal.TrueLiteral
+      case (false, Nearest) => Not(EqualTo(leftAsOf, rightAsOf))
+    }
+    tolerance match {
+      case Some(tolerance) =>
+        (allowExactMatches, direction) match {
+          case (true, Backward) =>
+            And(base, GreaterThanOrEqual(rightAsOf, Subtract(leftAsOf, tolerance)))
+          case (false, Backward) =>
+            And(base, GreaterThan(rightAsOf, Subtract(leftAsOf, tolerance)))
+          case (true, Forward) =>
+            And(base, LessThanOrEqual(rightAsOf, Add(leftAsOf, tolerance)))

Review comment:
       shall we use `TimeAdd` expression in case the `tolerance` is interval type?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34053: [SPARK-36813][SQL][PYTHON] Propose an infrastructure of as-of join and imlement ps.merge_asof

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34053:
URL: https://github.com/apache/spark/pull/34053#issuecomment-926320996


   **[Test build #143571 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/143571/testReport)** for PR 34053 at commit [`973b09d`](https://github.com/apache/spark/commit/973b09d5e6d9c2360bfdef5ad4c5e69d6bb929f8).
    * This patch passes all tests.
    * This patch merges cleanly.
    * This patch adds no public classes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34053: [SPARK-36813][SQL][PYTHON] Propose an infrastructure of as-of join and imlement ps.merge_asof

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34053:
URL: https://github.com/apache/spark/pull/34053#issuecomment-926265621


   Kubernetes integration test status failure
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/48080/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #34053: [SPARK-36813][SQL][PYTHON] Propose an infrastructure of as-of join and imlement ps.merge_asof

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #34053:
URL: https://github.com/apache/spark/pull/34053#issuecomment-926135504


   **[Test build #143569 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/143569/testReport)** for PR 34053 at commit [`7932e7d`](https://github.com/apache/spark/commit/7932e7d3e5f1c9ac4e24aaaae4a0fc36461c9a3b).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ueshin commented on a change in pull request #34053: [SPARK-36813][SQL][PYTHON] Propose an infrastructure of as-of join and imlement ps.merge_asof

Posted by GitBox <gi...@apache.org>.
ueshin commented on a change in pull request #34053:
URL: https://github.com/apache/spark/pull/34053#discussion_r715126366



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala
##########
@@ -41,14 +41,18 @@ case class ReferenceEqualPlanWrapper(plan: LogicalPlan) {
 object DeduplicateRelations extends Rule[LogicalPlan] {
   override def apply(plan: LogicalPlan): LogicalPlan = {
     renewDuplicatedRelations(mutable.HashSet.empty, plan)._1.resolveOperatorsUpWithPruning(
-      _.containsAnyPattern(JOIN, LATERAL_JOIN, INTERSECT, EXCEPT, UNION, COMMAND), ruleId) {
+      _.containsAnyPattern(JOIN, LATERAL_JOIN, AS_OF_JOIN, INTERSECT, EXCEPT, UNION, COMMAND),
+      ruleId) {
       case p: LogicalPlan if !p.childrenResolved => p
       // To resolve duplicate expression IDs for Join.
       case j @ Join(left, right, _, _, _) if !j.duplicateResolved =>
         j.copy(right = dedupRight(left, right))
       // Resolve duplicate output for LateralJoin.
       case j @ LateralJoin(left, right, _, _) if right.resolved && !j.duplicateResolved =>
         j.copy(right = right.withNewPlan(dedupRight(left, right.plan)))
+      // Resolve duplicate output for AsOfJoin.
+      case j @ AsOfJoin(left, right, _, _, _, _) if right.resolved && !j.duplicateResolved =>

Review comment:
       Ah, that's right. Thanks!

##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
##########
@@ -2122,6 +2125,68 @@ object RewriteIntersectAll extends Rule[LogicalPlan] {
   }
 }
 
+/**
+ * Replaces logical [[AsOfJoin]] operator using a combination of Join and Aggregate operator.
+ *
+ * Input Pseudo-Query:
+ * {{{
+ *    SELECT * FROM left ASOF JOIN right ON (condition, as_of on(left.t, right.t), tolerance)
+ * }}}
+ *
+ * Rewritten Query:
+ * {{{
+ *   SELECT left.*, __right__.*
+ *   FROM (
+ *        SELECT
+ *             left.*,
+ *             (
+ *                  SELECT MIN_BY(STRUCT(right.*), left.t - right.t)

Review comment:
       Thanks! Updated. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #34053: [SPARK-36813][SQL][PYTHON] Propose an infrastructure of as-of join and imlement ps.merge_asof

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #34053:
URL: https://github.com/apache/spark/pull/34053#issuecomment-926353009


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/143575/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #34053: [SPARK-36813][SQL][PYTHON] Propose an infrastructure of as-of join and imlement ps.merge_asof

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #34053:
URL: https://github.com/apache/spark/pull/34053#issuecomment-926223898


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/48078/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on pull request #34053: [SPARK-36813][SQL][PYTHON] Propose an infrastructure of as-of join and imlement ps.merge_asof

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on pull request #34053:
URL: https://github.com/apache/spark/pull/34053#issuecomment-927222757


   Will merge it in few days if there are no more comments.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sigmod commented on a change in pull request #34053: [SPARK-36813][SQL][PYTHON] Propose an infrastructure of as-of join and imlement ps.merge_asof

Posted by GitBox <gi...@apache.org>.
sigmod commented on a change in pull request #34053:
URL: https://github.com/apache/spark/pull/34053#discussion_r714446686



##########
File path: sql/core/src/test/scala/org/apache/spark/sql/DataFrameAsOfJoinSuite.scala
##########
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types._
+
+class DataFrameAsOfJoinSuite extends QueryTest
+  with SharedSparkSession
+  with AdaptiveSparkPlanHelper {
+
+  def prepareForAsOfJoin(): (DataFrame, DataFrame) = {
+    val schema1 = StructType(
+      StructField("a", IntegerType, false) ::
+        StructField("b", StringType, false) ::
+        StructField("left_val", StringType, false) :: Nil)
+    val rowSeq1: List[Row] = List(Row(1, "x", "a"), Row(5, "y", "b"), Row(10, "z", "c"))
+    val df1 = spark.createDataFrame(rowSeq1.asJava, schema1)
+
+    val schema2 = StructType(
+      StructField("a", IntegerType) ::
+        StructField("b", StringType) ::
+        StructField("right_val", IntegerType) :: Nil)
+    val rowSeq2: List[Row] = List(Row(1, "v", 1), Row(2, "w", 2), Row(3, "x", 3),
+      Row(6, "y", 6), Row(7, "z", 7))
+    val df2 = spark.createDataFrame(rowSeq2.asJava, schema2)
+
+    (df1, df2)
+  }
+
+  test("as-of join - simple") {
+    val (df1, df2) = prepareForAsOfJoin()
+    checkAnswer(
+      df1.joinAsOf(
+        df2, df1.col("a"), df2.col("a"), usingColumns = Seq.empty,
+        joinType = "left", tolerance = null, allowExactMatches = true, direction = "backward"),
+      Seq(
+        Row(1, "x", "a", 1, "v", 1),
+        Row(5, "y", "b", 3, "x", 3),
+        Row(10, "z", "c", 7, "z", 7)
+      )
+    )
+  }
+
+  test("as-of join - usingColumns") {
+    val (df1, df2) = prepareForAsOfJoin()
+    checkAnswer(
+      df1.joinAsOf(df2, df1.col("a"), df2.col("a"), usingColumns = Seq("b"),
+        joinType = "left", tolerance = null, allowExactMatches = true, direction = "backward"),
+      Seq(
+        Row(1, "x", "a", null, null, null),
+        Row(5, "y", "b", null, null, null),
+        Row(10, "z", "c", 7, "z", 7)
+      )
+    )
+  }
+
+  test("as-of join - usingColumns, inner") {
+    val (df1, df2) = prepareForAsOfJoin()
+    checkAnswer(
+      df1.joinAsOf(df2, df1.col("a"), df2.col("a"), usingColumns = Seq("b"),
+        joinType = "inner", tolerance = null, allowExactMatches = true, direction = "backward"),
+      Seq(
+        Row(10, "z", "c", 7, "z", 7)
+      )
+    )
+  }
+
+  test("as-of join - tolerance = 1") {
+    val (df1, df2) = prepareForAsOfJoin()
+    checkAnswer(
+      df1.joinAsOf(df2, df1.col("a"), df2.col("a"), usingColumns = Seq.empty,
+        joinType = "left", tolerance = lit(1), allowExactMatches = true, direction = "backward"),
+      Seq(
+        Row(1, "x", "a", 1, "v", 1),
+        Row(5, "y", "b", null, null, null),
+        Row(10, "z", "c", null, null, null)
+      )
+    )
+  }
+
+  test("as-of join - allowExactMatches = false") {
+    val (df1, df2) = prepareForAsOfJoin()
+    checkAnswer(
+      df1.joinAsOf(df2, df1.col("a"), df2.col("a"), usingColumns = Seq.empty,
+        joinType = "left", tolerance = null, allowExactMatches = false, direction = "backward"),
+      Seq(
+        Row(1, "x", "a", null, null, null),

Review comment:
       In the examples in comments, non-matches' numeric columns are NaN?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34053: [SPARK-36813][SQL][PYTHON] Propose an infrastructure of as-of join and imlement ps.merge_asof

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34053:
URL: https://github.com/apache/spark/pull/34053#issuecomment-923491218


   **[Test build #143468 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/143468/testReport)** for PR 34053 at commit [`c8237f4`](https://github.com/apache/spark/commit/c8237f46c325a0f95d4793a809d2bcc53cb948c2).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34053: [SPARK-36813][SQL][PYTHON] Propose an infrastructure of as-of join and imlement ps.merge_asof

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34053:
URL: https://github.com/apache/spark/pull/34053#issuecomment-923564848


   Kubernetes integration test status failure
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/47979/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ueshin commented on a change in pull request #34053: [SPARK-36813][SQL][PYTHON] Propose an infrastructure of as-of join and imlement ps.merge_asof

Posted by GitBox <gi...@apache.org>.
ueshin commented on a change in pull request #34053:
URL: https://github.com/apache/spark/pull/34053#discussion_r715202851



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
##########
@@ -2122,6 +2125,68 @@ object RewriteIntersectAll extends Rule[LogicalPlan] {
   }
 }
 
+/**
+ * Replaces logical [[AsOfJoin]] operator using a combination of Join and Aggregate operator.
+ *
+ * Input Pseudo-Query:
+ * {{{
+ *    SELECT * FROM left ASOF JOIN right ON (condition, as_of on(left.t, right.t), tolerance)
+ * }}}
+ *
+ * Rewritten Query:
+ * {{{
+ *   SELECT left.*, __right__.*
+ *   FROM (
+ *        SELECT
+ *             left.*,
+ *             (
+ *                  SELECT MIN_BY(STRUCT(right.*), left.t - right.t)
+ *                  FROM right
+ *                  WHERE condition AND left.t >= right.t AND right.t >= left.t - tolerance

Review comment:
       Added a `CheckAnalysis` rule. https://github.com/apache/spark/pull/34053/commits/20c8281b5f8929e0cb6510456d5acbfb3ea5349f




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #34053: [SPARK-36813][SQL][PYTHON] Propose an infrastructure of as-of join and imlement ps.merge_asof

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #34053:
URL: https://github.com/apache/spark/pull/34053#issuecomment-923491218


   **[Test build #143468 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/143468/testReport)** for PR 34053 at commit [`c8237f4`](https://github.com/apache/spark/commit/c8237f46c325a0f95d4793a809d2bcc53cb948c2).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #34053: [SPARK-36813][SQL][PYTHON] Propose an infrastructure of as-of join and imlement ps.merge_asof

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #34053:
URL: https://github.com/apache/spark/pull/34053#issuecomment-923568239


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/47979/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34053: [SPARK-36813][SQL][PYTHON] Propose an infrastructure of as-of join and imlement ps.merge_asof

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34053:
URL: https://github.com/apache/spark/pull/34053#issuecomment-926352150


   **[Test build #143575 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/143575/testReport)** for PR 34053 at commit [`cd0f707`](https://github.com/apache/spark/commit/cd0f7070b4a504d2aba57d7e4b71fcc225731603).
    * This patch passes all tests.
    * This patch merges cleanly.
    * This patch adds no public classes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sigmod commented on a change in pull request #34053: [SPARK-36813][SQL][PYTHON] Propose an infrastructure of as-of join and imlement ps.merge_asof

Posted by GitBox <gi...@apache.org>.
sigmod commented on a change in pull request #34053:
URL: https://github.com/apache/spark/pull/34053#discussion_r714443723



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
##########
@@ -2122,6 +2125,68 @@ object RewriteIntersectAll extends Rule[LogicalPlan] {
   }
 }
 
+/**
+ * Replaces logical [[AsOfJoin]] operator using a combination of Join and Aggregate operator.
+ *
+ * Input Pseudo-Query:
+ * {{{
+ *    SELECT * FROM left ASOF JOIN right ON (condition, as_of on(left.t, right.t), tolerance)
+ * }}}
+ *
+ * Rewritten Query:
+ * {{{
+ *   SELECT left.*, __right__.*
+ *   FROM (
+ *        SELECT
+ *             left.*,
+ *             (
+ *                  SELECT MIN_BY(STRUCT(right.*), left.t - right.t)
+ *                  FROM right
+ *                  WHERE condition AND left.t >= right.t AND right.t >= left.t - tolerance
+ *             ) as __right__
+ *        FROM left
+ *        )
+ * }}}
+ */
+object RewriteAsOfJoin extends Rule[LogicalPlan] {
+  def apply(plan: LogicalPlan): LogicalPlan = plan.transformWithPruning(
+    _.containsPattern(AS_OF_JOIN), ruleId) {
+    case AsOfJoin(left, right, asOfCondition, condition, orderExpression, joinType) =>
+      val conditionWithOuterReference =
+        condition.map(And(_, asOfCondition)).getOrElse(asOfCondition).transformUp {
+          case a: AttributeReference if left.outputSet.contains(a) =>
+            OuterReference(a)
+      }
+      val filtered = Filter(conditionWithOuterReference, right)
+
+      val orderExpressionWithOuterReference = orderExpression.transformUp {
+          case a: AttributeReference if left.outputSet.contains(a) =>
+            OuterReference(a)
+        }
+      val rightStruct = CreateStruct(right.output)
+      val nearestRight = MinBy(rightStruct, orderExpressionWithOuterReference)
+        .toAggregateExpression()
+      val aggExpr = Alias(nearestRight, "__nearest_right__")()
+      val aggregate = Aggregate(Seq.empty, Seq(aggExpr), filtered)
+
+      val scalarSubquery = Project(

Review comment:
       Nit: projectWithScalarSubquery ?

##########
File path: sql/core/src/test/scala/org/apache/spark/sql/DataFrameAsOfJoinSuite.scala
##########
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types._
+
+class DataFrameAsOfJoinSuite extends QueryTest
+  with SharedSparkSession
+  with AdaptiveSparkPlanHelper {
+
+  def prepareForAsOfJoin(): (DataFrame, DataFrame) = {
+    val schema1 = StructType(
+      StructField("a", IntegerType, false) ::
+        StructField("b", StringType, false) ::
+        StructField("left_val", StringType, false) :: Nil)
+    val rowSeq1: List[Row] = List(Row(1, "x", "a"), Row(5, "y", "b"), Row(10, "z", "c"))
+    val df1 = spark.createDataFrame(rowSeq1.asJava, schema1)
+
+    val schema2 = StructType(
+      StructField("a", IntegerType) ::
+        StructField("b", StringType) ::
+        StructField("right_val", IntegerType) :: Nil)
+    val rowSeq2: List[Row] = List(Row(1, "v", 1), Row(2, "w", 2), Row(3, "x", 3),
+      Row(6, "y", 6), Row(7, "z", 7))
+    val df2 = spark.createDataFrame(rowSeq2.asJava, schema2)
+
+    (df1, df2)
+  }
+
+  test("as-of join - simple") {
+    val (df1, df2) = prepareForAsOfJoin()
+    checkAnswer(
+      df1.joinAsOf(
+        df2, df1.col("a"), df2.col("a"), usingColumns = Seq.empty,
+        joinType = "left", tolerance = null, allowExactMatches = true, direction = "backward"),
+      Seq(
+        Row(1, "x", "a", 1, "v", 1),
+        Row(5, "y", "b", 3, "x", 3),
+        Row(10, "z", "c", 7, "z", 7)
+      )
+    )
+  }
+
+  test("as-of join - usingColumns") {
+    val (df1, df2) = prepareForAsOfJoin()
+    checkAnswer(
+      df1.joinAsOf(df2, df1.col("a"), df2.col("a"), usingColumns = Seq("b"),
+        joinType = "left", tolerance = null, allowExactMatches = true, direction = "backward"),
+      Seq(
+        Row(1, "x", "a", null, null, null),
+        Row(5, "y", "b", null, null, null),
+        Row(10, "z", "c", 7, "z", 7)
+      )
+    )
+  }
+
+  test("as-of join - usingColumns, inner") {
+    val (df1, df2) = prepareForAsOfJoin()
+    checkAnswer(
+      df1.joinAsOf(df2, df1.col("a"), df2.col("a"), usingColumns = Seq("b"),
+        joinType = "inner", tolerance = null, allowExactMatches = true, direction = "backward"),
+      Seq(
+        Row(10, "z", "c", 7, "z", 7)
+      )
+    )
+  }
+
+  test("as-of join - tolerance = 1") {
+    val (df1, df2) = prepareForAsOfJoin()
+    checkAnswer(
+      df1.joinAsOf(df2, df1.col("a"), df2.col("a"), usingColumns = Seq.empty,
+        joinType = "left", tolerance = lit(1), allowExactMatches = true, direction = "backward"),
+      Seq(
+        Row(1, "x", "a", 1, "v", 1),
+        Row(5, "y", "b", null, null, null),
+        Row(10, "z", "c", null, null, null)
+      )
+    )
+  }
+
+  test("as-of join - allowExactMatches = false") {
+    val (df1, df2) = prepareForAsOfJoin()
+    checkAnswer(
+      df1.joinAsOf(df2, df1.col("a"), df2.col("a"), usingColumns = Seq.empty,
+        joinType = "left", tolerance = null, allowExactMatches = false, direction = "backward"),
+      Seq(
+        Row(1, "x", "a", null, null, null),

Review comment:
       In the example, non-matches' numeric columns are NaN?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34053: [SPARK-36813][SQL][PYTHON] Propose an infrastructure of as-of join and imlement ps.merge_asof

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34053:
URL: https://github.com/apache/spark/pull/34053#issuecomment-926135504


   **[Test build #143569 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/143569/testReport)** for PR 34053 at commit [`7932e7d`](https://github.com/apache/spark/commit/7932e7d3e5f1c9ac4e24aaaae4a0fc36461c9a3b).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon closed pull request #34053: [SPARK-36813][SQL][PYTHON] Propose an infrastructure of as-of join and imlement ps.merge_asof

Posted by GitBox <gi...@apache.org>.
HyukjinKwon closed pull request #34053:
URL: https://github.com/apache/spark/pull/34053


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34053: [SPARK-36813][SQL][PYTHON] Propose an infrastructure of as-of join and imlement ps.merge_asof

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34053:
URL: https://github.com/apache/spark/pull/34053#issuecomment-926245086


   Kubernetes integration test starting
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/48080/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #34053: [SPARK-36813][SQL][PYTHON] Propose an infrastructure of as-of join and imlement ps.merge_asof

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #34053:
URL: https://github.com/apache/spark/pull/34053#issuecomment-926224948


   **[Test build #143571 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/143571/testReport)** for PR 34053 at commit [`973b09d`](https://github.com/apache/spark/commit/973b09d5e6d9c2360bfdef5ad4c5e69d6bb929f8).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon closed pull request #34053: [SPARK-36813][SQL][PYTHON] Propose an infrastructure of as-of join and imlement ps.merge_asof

Posted by GitBox <gi...@apache.org>.
HyukjinKwon closed pull request #34053:
URL: https://github.com/apache/spark/pull/34053


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34053: [SPARK-36813][SQL][PYTHON] Propose an infrastructure of as-of join and imlement ps.merge_asof

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34053:
URL: https://github.com/apache/spark/pull/34053#issuecomment-926276699


   **[Test build #143569 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/143569/testReport)** for PR 34053 at commit [`7932e7d`](https://github.com/apache/spark/commit/7932e7d3e5f1c9ac4e24aaaae4a0fc36461c9a3b).
    * This patch passes all tests.
    * This patch merges cleanly.
    * This patch adds no public classes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #34053: [SPARK-36813][SQL][PYTHON] Propose an infrastructure of as-of join and imlement ps.merge_asof

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #34053:
URL: https://github.com/apache/spark/pull/34053#issuecomment-926325603


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/143571/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a change in pull request #34053: [SPARK-36813][SQL][PYTHON] Propose an infrastructure of as-of join and imlement ps.merge_asof

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on a change in pull request #34053:
URL: https://github.com/apache/spark/pull/34053#discussion_r716507165



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteAsOfJoin.scala
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules._
+import org.apache.spark.sql.catalyst.trees.TreePattern._
+
+/**
+ * Replaces logical [[AsOfJoin]] operator using a combination of Join and Aggregate operator.
+ *
+ * Input Pseudo-Query:
+ * {{{
+ *    SELECT * FROM left ASOF JOIN right ON (condition, as_of on(left.t, right.t), tolerance)
+ * }}}
+ *
+ * Rewritten Query:
+ * {{{
+ *   SELECT left.*, __right__.*
+ *   FROM (
+ *        SELECT
+ *             left.*,
+ *             (
+ *                  SELECT MIN_BY(STRUCT(right.*), left.t - right.t) AS __nearest_right__
+ *                  FROM right
+ *                  WHERE condition AND left.t >= right.t AND right.t >= left.t - tolerance

Review comment:
       is `right.t >= left.t - tolerance` a perf-only thing? It doesn't seem to affect the query result.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #34053: [SPARK-36813][SQL][PYTHON] Propose an infrastructure of as-of join and imlement ps.merge_asof

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #34053:
URL: https://github.com/apache/spark/pull/34053#issuecomment-926277456


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/143569/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34053: [SPARK-36813][SQL][PYTHON] Propose an infrastructure of as-of join and imlement ps.merge_asof

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34053:
URL: https://github.com/apache/spark/pull/34053#issuecomment-923513204


   Kubernetes integration test starting
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/47979/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on pull request #34053: [SPARK-36813][SQL][PYTHON] Propose an infrastructure of as-of join and imlement ps.merge_asof

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on pull request #34053:
URL: https://github.com/apache/spark/pull/34053#issuecomment-929720849


   Merged to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #34053: [SPARK-36813][SQL][PYTHON] Propose an infrastructure of as-of join and imlement ps.merge_asof

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #34053:
URL: https://github.com/apache/spark/pull/34053#issuecomment-923568239


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/47979/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ueshin commented on a change in pull request #34053: [SPARK-36813][SQL][PYTHON] Propose an infrastructure of as-of join and imlement ps.merge_asof

Posted by GitBox <gi...@apache.org>.
ueshin commented on a change in pull request #34053:
URL: https://github.com/apache/spark/pull/34053#discussion_r716999204



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
##########
@@ -1597,3 +1597,119 @@ case class LateralJoin(
     copy(left = newChild)
   }
 }
+
+/**
+ * A logical plan for as-of join.
+ */
+case class AsOfJoin(
+    left: LogicalPlan,
+    right: LogicalPlan,
+    asOfCondition: Expression,
+    condition: Option[Expression],
+    joinType: JoinType,
+    orderExpression: Expression,
+    toleranceAssertion: Option[Expression]) extends BinaryNode {
+
+  require(Seq(Inner, LeftOuter).contains(joinType),
+    s"Unsupported as-of join type $joinType")
+
+  override protected def stringArgs: Iterator[Any] = super.stringArgs.take(5)
+
+  override def output: Seq[Attribute] = {
+    joinType match {
+      case LeftOuter =>
+        left.output ++ right.output.map(_.withNullability(true))
+      case _ =>
+        left.output ++ right.output
+    }
+  }
+
+  def duplicateResolved: Boolean = left.outputSet.intersect(right.outputSet).isEmpty
+
+  override lazy val resolved: Boolean = {
+    childrenResolved &&
+      expressions.forall(_.resolved) &&
+      duplicateResolved &&
+      asOfCondition.dataType == BooleanType &&
+      condition.forall(_.dataType == BooleanType) &&
+      toleranceAssertion.forall { assertion =>
+        assertion.foldable && assertion.eval().asInstanceOf[Boolean]
+      }
+  }
+
+  final override val nodePatterns: Seq[TreePattern] = Seq(AS_OF_JOIN)
+
+  override protected def withNewChildrenInternal(
+      newLeft: LogicalPlan, newRight: LogicalPlan): AsOfJoin = {
+    copy(left = newLeft, right = newRight)
+  }
+}
+
+object AsOfJoin {
+
+  def apply(
+      left: LogicalPlan,
+      right: LogicalPlan,
+      leftAsOf: Expression,
+      rightAsOf: Expression,
+      condition: Option[Expression],
+      joinType: JoinType,
+      tolerance: Option[Expression],
+      allowExactMatches: Boolean,
+      direction: AsOfJoinDirection): AsOfJoin = {
+    val asOfCond = makeAsOfCond(leftAsOf, rightAsOf, tolerance, allowExactMatches, direction)
+    val orderingExpr = makeOrderingExpr(leftAsOf, rightAsOf, direction)
+    AsOfJoin(left, right, asOfCond, condition, joinType,
+      orderingExpr, tolerance.map(t => GreaterThanOrEqual(t, Literal.default(t.dataType))))
+  }
+
+  private def makeAsOfCond(
+      leftAsOf: Expression,
+      rightAsOf: Expression,
+      tolerance: Option[Expression],
+      allowExactMatches: Boolean,
+      direction: AsOfJoinDirection): Expression = {
+    val base = (allowExactMatches, direction) match {
+      case (true, Backward) => GreaterThanOrEqual(leftAsOf, rightAsOf)
+      case (false, Backward) => GreaterThan(leftAsOf, rightAsOf)
+      case (true, Forward) => LessThanOrEqual(leftAsOf, rightAsOf)
+      case (false, Forward) => LessThan(leftAsOf, rightAsOf)
+      case (true, Nearest) => Literal.TrueLiteral
+      case (false, Nearest) => Not(EqualTo(leftAsOf, rightAsOf))
+    }
+    tolerance match {
+      case Some(tolerance) =>
+        (allowExactMatches, direction) match {
+          case (true, Backward) =>
+            And(base, GreaterThanOrEqual(rightAsOf, Subtract(leftAsOf, tolerance)))
+          case (false, Backward) =>
+            And(base, GreaterThan(rightAsOf, Subtract(leftAsOf, tolerance)))
+          case (true, Forward) =>
+            And(base, LessThanOrEqual(rightAsOf, Add(leftAsOf, tolerance)))

Review comment:
       I think we can leave it as-is because `Analyzer` should replace it with a proper expression; otherwise we have to manage by hand here as well.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #34053: [SPARK-36813][SQL][PYTHON] Propose an infrastructure of as-of join and imlement ps.merge_asof

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #34053:
URL: https://github.com/apache/spark/pull/34053#issuecomment-926288171


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/48085/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34053: [SPARK-36813][SQL][PYTHON] Propose an infrastructure of as-of join and imlement ps.merge_asof

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34053:
URL: https://github.com/apache/spark/pull/34053#issuecomment-926252459


   **[Test build #143575 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/143575/testReport)** for PR 34053 at commit [`cd0f707`](https://github.com/apache/spark/commit/cd0f7070b4a504d2aba57d7e4b71fcc225731603).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #34053: [SPARK-36813][SQL][PYTHON] Propose an infrastructure of as-of join and imlement ps.merge_asof

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #34053:
URL: https://github.com/apache/spark/pull/34053#issuecomment-926325603


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/143571/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ueshin commented on a change in pull request #34053: [SPARK-36813][SQL][PYTHON] Propose an infrastructure of as-of join and imlement ps.merge_asof

Posted by GitBox <gi...@apache.org>.
ueshin commented on a change in pull request #34053:
URL: https://github.com/apache/spark/pull/34053#discussion_r716998942



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
##########
@@ -1597,3 +1597,119 @@ case class LateralJoin(
     copy(left = newChild)
   }
 }
+
+/**
+ * A logical plan for as-of join.
+ */
+case class AsOfJoin(
+    left: LogicalPlan,
+    right: LogicalPlan,
+    asOfCondition: Expression,
+    condition: Option[Expression],
+    joinType: JoinType,
+    orderExpression: Expression,
+    toleranceAssertion: Option[Expression]) extends BinaryNode {
+
+  require(Seq(Inner, LeftOuter).contains(joinType),
+    s"Unsupported as-of join type $joinType")
+
+  override protected def stringArgs: Iterator[Any] = super.stringArgs.take(5)

Review comment:
       I don't think they are very useful to appear in explains because they are for only internal usage.

##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
##########
@@ -249,6 +249,20 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
               s"join condition '${condition.sql}' " +
                 s"of type ${condition.dataType.catalogString} is not a boolean.")
 
+          case j @ AsOfJoin(_, _, _, Some(condition), _, _, _)
+              if condition.dataType != BooleanType =>
+            failAnalysis(
+              s"join condition '${condition.sql}' " +
+                s"of type ${condition.dataType.catalogString} is not a boolean.")
+
+          case j @ AsOfJoin(_, _, _, _, _, _, Some(toleranceAssertion)) =>
+            if (!toleranceAssertion.foldable) {
+              failAnalysis("Input argument tolerance must be a constant.")
+            }
+            if (!toleranceAssertion.eval().asInstanceOf[Boolean]) {

Review comment:
       That makes sense, but the result of `eval()` is type dependent, e.g. if the `tolerance` is `CatelendarInterval`, the cast will fail, IIUC, whereas if we make the condition with expressions, it will only return `Boolean` value after the analysis phase. WDYT?

##########
File path: python/pyspark/pandas/namespace.py
##########
@@ -2747,6 +2751,499 @@ def merge(
     )
 
 
+def merge_asof(
+    left: Union[DataFrame, Series],
+    right: Union[DataFrame, Series],
+    on: Optional[Name] = None,
+    left_on: Optional[Name] = None,
+    right_on: Optional[Name] = None,
+    left_index: bool = False,
+    right_index: bool = False,
+    by: Optional[Union[Name, List[Name]]] = None,
+    left_by: Optional[Union[Name, List[Name]]] = None,
+    right_by: Optional[Union[Name, List[Name]]] = None,
+    suffixes: Tuple[str, str] = ("_x", "_y"),
+    tolerance: Optional[Any] = None,
+    allow_exact_matches: bool = True,
+    direction: str = "backward",
+) -> DataFrame:
+    """
+    Perform an asof merge.
+
+    This is similar to a left-join except that we match on nearest
+    key rather than equal keys.
+
+    For each row in the left DataFrame:
+
+      - A "backward" search selects the last row in the right DataFrame whose
+        'on' key is less than or equal to the left's key.
+
+      - A "forward" search selects the first row in the right DataFrame whose
+        'on' key is greater than or equal to the left's key.
+
+      - A "nearest" search selects the row in the right DataFrame whose 'on'
+        key is closest in absolute distance to the left's key.
+
+    Optionally match on equivalent keys with 'by' before searching with 'on'.
+
+    .. versionadded:: 3.3.0
+
+    Parameters
+    ----------
+    left : DataFrame or named Series
+    right : DataFrame or named Series
+    on : label
+        Field name to join on. Must be found in both DataFrames.
+        The data MUST be ordered. Furthermore this must be a numeric column,
+        such as datetimelike, integer, or float. On or left_on/right_on
+        must be given.
+    left_on : label
+        Field name to join on in left DataFrame.
+    right_on : label
+        Field name to join on in right DataFrame.
+    left_index : bool
+        Use the index of the left DataFrame as the join key.
+    right_index : bool
+        Use the index of the right DataFrame as the join key.
+    by : column name or list of column names
+        Match on these columns before performing merge operation.
+    left_by : column name
+        Field names to match on in the left DataFrame.
+    right_by : column name
+        Field names to match on in the right DataFrame.
+    suffixes : 2-length sequence (tuple, list, ...)
+        Suffix to apply to overlapping column names in the left and right
+        side, respectively.
+    tolerance : int or Timedelta, optional, default None
+        Select asof tolerance within this range; must be compatible
+        with the merge index.
+    allow_exact_matches : bool, default True
+
+        - If True, allow matching with the same 'on' value
+          (i.e. less-than-or-equal-to / greater-than-or-equal-to)
+        - If False, don't match the same 'on' value
+          (i.e., strictly less-than / strictly greater-than).
+
+    direction : 'backward' (default), 'forward', or 'nearest'
+        Whether to search for prior, subsequent, or closest matches.
+
+    Returns
+    -------
+    merged : DataFrame
+
+    See Also
+    --------
+    merge : Merge with a database-style join.
+    merge_ordered : Merge with optional filling/interpolation.
+
+    Examples
+    --------
+    >>> left = ps.DataFrame({"a": [1, 5, 10], "left_val": ["a", "b", "c"]})
+    >>> left
+        a left_val
+    0   1        a
+    1   5        b
+    2  10        c
+
+    >>> right = ps.DataFrame({"a": [1, 2, 3, 6, 7], "right_val": [1, 2, 3, 6, 7]})
+    >>> right
+       a  right_val
+    0  1          1
+    1  2          2
+    2  3          3
+    3  6          6
+    4  7          7
+
+    >>> ps.merge_asof(left, right, on="a").sort_values("a").reset_index(drop=True)
+        a left_val  right_val
+    0   1        a          1
+    1   5        b          3
+    2  10        c          7
+
+    >>> ps.merge_asof(
+    ...     left,
+    ...     right,
+    ...     on="a",
+    ...     allow_exact_matches=False
+    ... ).sort_values("a").reset_index(drop=True)
+        a left_val  right_val
+    0   1        a        NaN
+    1   5        b        3.0
+    2  10        c        7.0
+
+    >>> ps.merge_asof(
+    ...     left,
+    ...     right,
+    ...     on="a",
+    ...     direction="forward"
+    ... ).sort_values("a").reset_index(drop=True)
+        a left_val  right_val
+    0   1        a        1.0
+    1   5        b        6.0
+    2  10        c        NaN
+
+    >>> ps.merge_asof(
+    ...     left,
+    ...     right,
+    ...     on="a",
+    ...     direction="nearest"
+    ... ).sort_values("a").reset_index(drop=True)
+        a left_val  right_val
+    0   1        a          1
+    1   5        b          6
+    2  10        c          7
+
+    We can use indexed DataFrames as well.
+
+    >>> left = ps.DataFrame({"left_val": ["a", "b", "c"]}, index=[1, 5, 10])
+    >>> left
+       left_val
+    1         a
+    5         b
+    10        c
+
+    >>> right = ps.DataFrame({"right_val": [1, 2, 3, 6, 7]}, index=[1, 2, 3, 6, 7])
+    >>> right
+       right_val
+    1          1
+    2          2
+    3          3
+    6          6
+    7          7
+
+    >>> ps.merge_asof(left, right, left_index=True, right_index=True).sort_index()
+       left_val  right_val
+    1         a          1
+    5         b          3
+    10        c          7
+
+    Here is a real-world times-series example
+
+    >>> quotes = ps.DataFrame(
+    ...     {
+    ...         "time": [
+    ...             pd.Timestamp("2016-05-25 13:30:00.023"),
+    ...             pd.Timestamp("2016-05-25 13:30:00.023"),
+    ...             pd.Timestamp("2016-05-25 13:30:00.030"),
+    ...             pd.Timestamp("2016-05-25 13:30:00.041"),
+    ...             pd.Timestamp("2016-05-25 13:30:00.048"),
+    ...             pd.Timestamp("2016-05-25 13:30:00.049"),
+    ...             pd.Timestamp("2016-05-25 13:30:00.072"),
+    ...             pd.Timestamp("2016-05-25 13:30:00.075")
+    ...         ],
+    ...         "ticker": [
+    ...                "GOOG",
+    ...                "MSFT",
+    ...                "MSFT",
+    ...                "MSFT",
+    ...                "GOOG",
+    ...                "AAPL",
+    ...                "GOOG",
+    ...                "MSFT"
+    ...            ],
+    ...            "bid": [720.50, 51.95, 51.97, 51.99, 720.50, 97.99, 720.50, 52.01],
+    ...            "ask": [720.93, 51.96, 51.98, 52.00, 720.93, 98.01, 720.88, 52.03]
+    ...     }
+    ... )
+    >>> quotes
+                         time ticker     bid     ask
+    0 2016-05-25 13:30:00.023   GOOG  720.50  720.93
+    1 2016-05-25 13:30:00.023   MSFT   51.95   51.96
+    2 2016-05-25 13:30:00.030   MSFT   51.97   51.98
+    3 2016-05-25 13:30:00.041   MSFT   51.99   52.00
+    4 2016-05-25 13:30:00.048   GOOG  720.50  720.93
+    5 2016-05-25 13:30:00.049   AAPL   97.99   98.01
+    6 2016-05-25 13:30:00.072   GOOG  720.50  720.88
+    7 2016-05-25 13:30:00.075   MSFT   52.01   52.03
+
+    >>> trades = ps.DataFrame(
+    ...        {
+    ...            "time": [
+    ...                pd.Timestamp("2016-05-25 13:30:00.023"),
+    ...                pd.Timestamp("2016-05-25 13:30:00.038"),
+    ...                pd.Timestamp("2016-05-25 13:30:00.048"),
+    ...                pd.Timestamp("2016-05-25 13:30:00.048"),
+    ...                pd.Timestamp("2016-05-25 13:30:00.048")
+    ...            ],
+    ...            "ticker": ["MSFT", "MSFT", "GOOG", "GOOG", "AAPL"],
+    ...            "price": [51.95, 51.95, 720.77, 720.92, 98.0],
+    ...            "quantity": [75, 155, 100, 100, 100]
+    ...        }
+    ...    )
+    >>> trades
+                         time ticker   price  quantity
+    0 2016-05-25 13:30:00.023   MSFT   51.95        75
+    1 2016-05-25 13:30:00.038   MSFT   51.95       155
+    2 2016-05-25 13:30:00.048   GOOG  720.77       100
+    3 2016-05-25 13:30:00.048   GOOG  720.92       100
+    4 2016-05-25 13:30:00.048   AAPL   98.00       100
+
+    By default we are taking the asof of the quotes
+
+    >>> ps.merge_asof(
+    ...    trades, quotes, on="time", by="ticker"
+    ... ).sort_values(["time", "ticker", "price"]).reset_index(drop=True)
+                         time ticker   price  quantity     bid     ask
+    0 2016-05-25 13:30:00.023   MSFT   51.95        75   51.95   51.96
+    1 2016-05-25 13:30:00.038   MSFT   51.95       155   51.97   51.98
+    2 2016-05-25 13:30:00.048   AAPL   98.00       100     NaN     NaN
+    3 2016-05-25 13:30:00.048   GOOG  720.77       100  720.50  720.93
+    4 2016-05-25 13:30:00.048   GOOG  720.92       100  720.50  720.93
+
+    We only asof within 2ms between the quote time and the trade time
+
+    >>> ps.merge_asof(
+    ...     trades,
+    ...     quotes,
+    ...     on="time",
+    ...     by="ticker",
+    ...     tolerance=F.expr("INTERVAL 2 MILLISECONDS")  # pd.Timedelta("2ms")
+    ... ).sort_values(["time", "ticker", "price"]).reset_index(drop=True)
+                         time ticker   price  quantity     bid     ask
+    0 2016-05-25 13:30:00.023   MSFT   51.95        75   51.95   51.96
+    1 2016-05-25 13:30:00.038   MSFT   51.95       155     NaN     NaN
+    2 2016-05-25 13:30:00.048   AAPL   98.00       100     NaN     NaN
+    3 2016-05-25 13:30:00.048   GOOG  720.77       100  720.50  720.93
+    4 2016-05-25 13:30:00.048   GOOG  720.92       100  720.50  720.93
+
+    We only asof within 10ms between the quote time and the trade time
+    and we exclude exact matches on time. However *prior* data will
+    propagate forward
+
+    >>> ps.merge_asof(
+    ...     trades,
+    ...     quotes,
+    ...     on="time",
+    ...     by="ticker",
+    ...     tolerance=F.expr("INTERVAL 10 MILLISECONDS"),  # pd.Timedelta("10ms")

Review comment:
       Yes, it should be handled in `pyspark side, and it's still WIP [SPARK-33054](https://issues.apache.org/jira/browse/SPARK-33054). We can remove this workaround when it's done.

##########
File path: python/pyspark/pandas/namespace.py
##########
@@ -2747,6 +2751,499 @@ def merge(
     )
 
 
+def merge_asof(
+    left: Union[DataFrame, Series],
+    right: Union[DataFrame, Series],
+    on: Optional[Name] = None,
+    left_on: Optional[Name] = None,
+    right_on: Optional[Name] = None,
+    left_index: bool = False,
+    right_index: bool = False,
+    by: Optional[Union[Name, List[Name]]] = None,
+    left_by: Optional[Union[Name, List[Name]]] = None,
+    right_by: Optional[Union[Name, List[Name]]] = None,
+    suffixes: Tuple[str, str] = ("_x", "_y"),
+    tolerance: Optional[Any] = None,
+    allow_exact_matches: bool = True,
+    direction: str = "backward",
+) -> DataFrame:
+    """
+    Perform an asof merge.
+
+    This is similar to a left-join except that we match on nearest
+    key rather than equal keys.
+
+    For each row in the left DataFrame:
+
+      - A "backward" search selects the last row in the right DataFrame whose
+        'on' key is less than or equal to the left's key.
+
+      - A "forward" search selects the first row in the right DataFrame whose
+        'on' key is greater than or equal to the left's key.
+
+      - A "nearest" search selects the row in the right DataFrame whose 'on'
+        key is closest in absolute distance to the left's key.
+
+    Optionally match on equivalent keys with 'by' before searching with 'on'.
+
+    .. versionadded:: 3.3.0
+
+    Parameters
+    ----------
+    left : DataFrame or named Series
+    right : DataFrame or named Series
+    on : label
+        Field name to join on. Must be found in both DataFrames.
+        The data MUST be ordered. Furthermore this must be a numeric column,
+        such as datetimelike, integer, or float. On or left_on/right_on
+        must be given.
+    left_on : label
+        Field name to join on in left DataFrame.
+    right_on : label
+        Field name to join on in right DataFrame.
+    left_index : bool
+        Use the index of the left DataFrame as the join key.
+    right_index : bool
+        Use the index of the right DataFrame as the join key.
+    by : column name or list of column names
+        Match on these columns before performing merge operation.
+    left_by : column name
+        Field names to match on in the left DataFrame.
+    right_by : column name
+        Field names to match on in the right DataFrame.
+    suffixes : 2-length sequence (tuple, list, ...)
+        Suffix to apply to overlapping column names in the left and right
+        side, respectively.
+    tolerance : int or Timedelta, optional, default None
+        Select asof tolerance within this range; must be compatible
+        with the merge index.
+    allow_exact_matches : bool, default True
+
+        - If True, allow matching with the same 'on' value
+          (i.e. less-than-or-equal-to / greater-than-or-equal-to)
+        - If False, don't match the same 'on' value
+          (i.e., strictly less-than / strictly greater-than).
+
+    direction : 'backward' (default), 'forward', or 'nearest'
+        Whether to search for prior, subsequent, or closest matches.
+
+    Returns
+    -------
+    merged : DataFrame
+
+    See Also
+    --------
+    merge : Merge with a database-style join.
+    merge_ordered : Merge with optional filling/interpolation.
+
+    Examples
+    --------
+    >>> left = ps.DataFrame({"a": [1, 5, 10], "left_val": ["a", "b", "c"]})
+    >>> left
+        a left_val
+    0   1        a
+    1   5        b
+    2  10        c
+
+    >>> right = ps.DataFrame({"a": [1, 2, 3, 6, 7], "right_val": [1, 2, 3, 6, 7]})
+    >>> right
+       a  right_val
+    0  1          1
+    1  2          2
+    2  3          3
+    3  6          6
+    4  7          7
+
+    >>> ps.merge_asof(left, right, on="a").sort_values("a").reset_index(drop=True)
+        a left_val  right_val
+    0   1        a          1
+    1   5        b          3
+    2  10        c          7
+
+    >>> ps.merge_asof(
+    ...     left,
+    ...     right,
+    ...     on="a",
+    ...     allow_exact_matches=False
+    ... ).sort_values("a").reset_index(drop=True)
+        a left_val  right_val
+    0   1        a        NaN
+    1   5        b        3.0
+    2  10        c        7.0
+
+    >>> ps.merge_asof(
+    ...     left,
+    ...     right,
+    ...     on="a",
+    ...     direction="forward"
+    ... ).sort_values("a").reset_index(drop=True)
+        a left_val  right_val
+    0   1        a        1.0
+    1   5        b        6.0
+    2  10        c        NaN
+
+    >>> ps.merge_asof(
+    ...     left,
+    ...     right,
+    ...     on="a",
+    ...     direction="nearest"
+    ... ).sort_values("a").reset_index(drop=True)
+        a left_val  right_val
+    0   1        a          1
+    1   5        b          6
+    2  10        c          7
+
+    We can use indexed DataFrames as well.
+
+    >>> left = ps.DataFrame({"left_val": ["a", "b", "c"]}, index=[1, 5, 10])
+    >>> left
+       left_val
+    1         a
+    5         b
+    10        c
+
+    >>> right = ps.DataFrame({"right_val": [1, 2, 3, 6, 7]}, index=[1, 2, 3, 6, 7])
+    >>> right
+       right_val
+    1          1
+    2          2
+    3          3
+    6          6
+    7          7
+
+    >>> ps.merge_asof(left, right, left_index=True, right_index=True).sort_index()
+       left_val  right_val
+    1         a          1
+    5         b          3
+    10        c          7
+
+    Here is a real-world times-series example
+
+    >>> quotes = ps.DataFrame(
+    ...     {
+    ...         "time": [
+    ...             pd.Timestamp("2016-05-25 13:30:00.023"),
+    ...             pd.Timestamp("2016-05-25 13:30:00.023"),
+    ...             pd.Timestamp("2016-05-25 13:30:00.030"),
+    ...             pd.Timestamp("2016-05-25 13:30:00.041"),
+    ...             pd.Timestamp("2016-05-25 13:30:00.048"),
+    ...             pd.Timestamp("2016-05-25 13:30:00.049"),
+    ...             pd.Timestamp("2016-05-25 13:30:00.072"),
+    ...             pd.Timestamp("2016-05-25 13:30:00.075")
+    ...         ],
+    ...         "ticker": [
+    ...                "GOOG",
+    ...                "MSFT",
+    ...                "MSFT",
+    ...                "MSFT",
+    ...                "GOOG",
+    ...                "AAPL",
+    ...                "GOOG",
+    ...                "MSFT"
+    ...            ],
+    ...            "bid": [720.50, 51.95, 51.97, 51.99, 720.50, 97.99, 720.50, 52.01],
+    ...            "ask": [720.93, 51.96, 51.98, 52.00, 720.93, 98.01, 720.88, 52.03]
+    ...     }
+    ... )
+    >>> quotes
+                         time ticker     bid     ask
+    0 2016-05-25 13:30:00.023   GOOG  720.50  720.93
+    1 2016-05-25 13:30:00.023   MSFT   51.95   51.96
+    2 2016-05-25 13:30:00.030   MSFT   51.97   51.98
+    3 2016-05-25 13:30:00.041   MSFT   51.99   52.00
+    4 2016-05-25 13:30:00.048   GOOG  720.50  720.93
+    5 2016-05-25 13:30:00.049   AAPL   97.99   98.01
+    6 2016-05-25 13:30:00.072   GOOG  720.50  720.88
+    7 2016-05-25 13:30:00.075   MSFT   52.01   52.03
+
+    >>> trades = ps.DataFrame(
+    ...        {
+    ...            "time": [
+    ...                pd.Timestamp("2016-05-25 13:30:00.023"),
+    ...                pd.Timestamp("2016-05-25 13:30:00.038"),
+    ...                pd.Timestamp("2016-05-25 13:30:00.048"),
+    ...                pd.Timestamp("2016-05-25 13:30:00.048"),
+    ...                pd.Timestamp("2016-05-25 13:30:00.048")
+    ...            ],
+    ...            "ticker": ["MSFT", "MSFT", "GOOG", "GOOG", "AAPL"],
+    ...            "price": [51.95, 51.95, 720.77, 720.92, 98.0],
+    ...            "quantity": [75, 155, 100, 100, 100]
+    ...        }
+    ...    )
+    >>> trades
+                         time ticker   price  quantity
+    0 2016-05-25 13:30:00.023   MSFT   51.95        75
+    1 2016-05-25 13:30:00.038   MSFT   51.95       155
+    2 2016-05-25 13:30:00.048   GOOG  720.77       100
+    3 2016-05-25 13:30:00.048   GOOG  720.92       100
+    4 2016-05-25 13:30:00.048   AAPL   98.00       100
+
+    By default we are taking the asof of the quotes
+
+    >>> ps.merge_asof(
+    ...    trades, quotes, on="time", by="ticker"
+    ... ).sort_values(["time", "ticker", "price"]).reset_index(drop=True)
+                         time ticker   price  quantity     bid     ask
+    0 2016-05-25 13:30:00.023   MSFT   51.95        75   51.95   51.96
+    1 2016-05-25 13:30:00.038   MSFT   51.95       155   51.97   51.98
+    2 2016-05-25 13:30:00.048   AAPL   98.00       100     NaN     NaN
+    3 2016-05-25 13:30:00.048   GOOG  720.77       100  720.50  720.93
+    4 2016-05-25 13:30:00.048   GOOG  720.92       100  720.50  720.93
+
+    We only asof within 2ms between the quote time and the trade time
+
+    >>> ps.merge_asof(
+    ...     trades,
+    ...     quotes,
+    ...     on="time",
+    ...     by="ticker",
+    ...     tolerance=F.expr("INTERVAL 2 MILLISECONDS")  # pd.Timedelta("2ms")
+    ... ).sort_values(["time", "ticker", "price"]).reset_index(drop=True)
+                         time ticker   price  quantity     bid     ask
+    0 2016-05-25 13:30:00.023   MSFT   51.95        75   51.95   51.96
+    1 2016-05-25 13:30:00.038   MSFT   51.95       155     NaN     NaN
+    2 2016-05-25 13:30:00.048   AAPL   98.00       100     NaN     NaN
+    3 2016-05-25 13:30:00.048   GOOG  720.77       100  720.50  720.93
+    4 2016-05-25 13:30:00.048   GOOG  720.92       100  720.50  720.93
+
+    We only asof within 10ms between the quote time and the trade time
+    and we exclude exact matches on time. However *prior* data will
+    propagate forward
+
+    >>> ps.merge_asof(
+    ...     trades,
+    ...     quotes,
+    ...     on="time",
+    ...     by="ticker",
+    ...     tolerance=F.expr("INTERVAL 10 MILLISECONDS"),  # pd.Timedelta("10ms")

Review comment:
       Yes, it should be handled in pyspark side, and it's still WIP [SPARK-33054](https://issues.apache.org/jira/browse/SPARK-33054). We can remove this workaround when it's done.

##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
##########
@@ -1597,3 +1597,119 @@ case class LateralJoin(
     copy(left = newChild)
   }
 }
+
+/**
+ * A logical plan for as-of join.
+ */
+case class AsOfJoin(
+    left: LogicalPlan,
+    right: LogicalPlan,
+    asOfCondition: Expression,
+    condition: Option[Expression],
+    joinType: JoinType,
+    orderExpression: Expression,
+    toleranceAssertion: Option[Expression]) extends BinaryNode {
+
+  require(Seq(Inner, LeftOuter).contains(joinType),
+    s"Unsupported as-of join type $joinType")
+
+  override protected def stringArgs: Iterator[Any] = super.stringArgs.take(5)
+
+  override def output: Seq[Attribute] = {
+    joinType match {
+      case LeftOuter =>
+        left.output ++ right.output.map(_.withNullability(true))
+      case _ =>
+        left.output ++ right.output
+    }
+  }
+
+  def duplicateResolved: Boolean = left.outputSet.intersect(right.outputSet).isEmpty
+
+  override lazy val resolved: Boolean = {
+    childrenResolved &&
+      expressions.forall(_.resolved) &&
+      duplicateResolved &&
+      asOfCondition.dataType == BooleanType &&
+      condition.forall(_.dataType == BooleanType) &&
+      toleranceAssertion.forall { assertion =>
+        assertion.foldable && assertion.eval().asInstanceOf[Boolean]
+      }
+  }
+
+  final override val nodePatterns: Seq[TreePattern] = Seq(AS_OF_JOIN)
+
+  override protected def withNewChildrenInternal(
+      newLeft: LogicalPlan, newRight: LogicalPlan): AsOfJoin = {
+    copy(left = newLeft, right = newRight)
+  }
+}
+
+object AsOfJoin {
+
+  def apply(
+      left: LogicalPlan,
+      right: LogicalPlan,
+      leftAsOf: Expression,
+      rightAsOf: Expression,
+      condition: Option[Expression],
+      joinType: JoinType,
+      tolerance: Option[Expression],
+      allowExactMatches: Boolean,
+      direction: AsOfJoinDirection): AsOfJoin = {
+    val asOfCond = makeAsOfCond(leftAsOf, rightAsOf, tolerance, allowExactMatches, direction)
+    val orderingExpr = makeOrderingExpr(leftAsOf, rightAsOf, direction)
+    AsOfJoin(left, right, asOfCond, condition, joinType,
+      orderingExpr, tolerance.map(t => GreaterThanOrEqual(t, Literal.default(t.dataType))))
+  }
+
+  private def makeAsOfCond(
+      leftAsOf: Expression,
+      rightAsOf: Expression,
+      tolerance: Option[Expression],
+      allowExactMatches: Boolean,
+      direction: AsOfJoinDirection): Expression = {
+    val base = (allowExactMatches, direction) match {
+      case (true, Backward) => GreaterThanOrEqual(leftAsOf, rightAsOf)
+      case (false, Backward) => GreaterThan(leftAsOf, rightAsOf)
+      case (true, Forward) => LessThanOrEqual(leftAsOf, rightAsOf)
+      case (false, Forward) => LessThan(leftAsOf, rightAsOf)
+      case (true, Nearest) => Literal.TrueLiteral
+      case (false, Nearest) => Not(EqualTo(leftAsOf, rightAsOf))
+    }
+    tolerance match {
+      case Some(tolerance) =>
+        (allowExactMatches, direction) match {
+          case (true, Backward) =>
+            And(base, GreaterThanOrEqual(rightAsOf, Subtract(leftAsOf, tolerance)))
+          case (false, Backward) =>
+            And(base, GreaterThan(rightAsOf, Subtract(leftAsOf, tolerance)))
+          case (true, Forward) =>
+            And(base, LessThanOrEqual(rightAsOf, Add(leftAsOf, tolerance)))

Review comment:
       I think we can leave it as-is because `Analyzer` should replace it with a proper expression; otherwise we should manage by hand here as well.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34053: [SPARK-36813][SQL][PYTHON] Propose an infrastructure of as-of join and imlement ps.merge_asof

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34053:
URL: https://github.com/apache/spark/pull/34053#issuecomment-926283610


   Kubernetes integration test status failure
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/48085/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ueshin commented on a change in pull request #34053: [SPARK-36813][SQL][PYTHON] Propose an infrastructure of as-of join and imlement ps.merge_asof

Posted by GitBox <gi...@apache.org>.
ueshin commented on a change in pull request #34053:
URL: https://github.com/apache/spark/pull/34053#discussion_r716999030



##########
File path: python/pyspark/pandas/namespace.py
##########
@@ -2747,6 +2751,499 @@ def merge(
     )
 
 
+def merge_asof(
+    left: Union[DataFrame, Series],
+    right: Union[DataFrame, Series],
+    on: Optional[Name] = None,
+    left_on: Optional[Name] = None,
+    right_on: Optional[Name] = None,
+    left_index: bool = False,
+    right_index: bool = False,
+    by: Optional[Union[Name, List[Name]]] = None,
+    left_by: Optional[Union[Name, List[Name]]] = None,
+    right_by: Optional[Union[Name, List[Name]]] = None,
+    suffixes: Tuple[str, str] = ("_x", "_y"),
+    tolerance: Optional[Any] = None,
+    allow_exact_matches: bool = True,
+    direction: str = "backward",
+) -> DataFrame:
+    """
+    Perform an asof merge.
+
+    This is similar to a left-join except that we match on nearest
+    key rather than equal keys.
+
+    For each row in the left DataFrame:
+
+      - A "backward" search selects the last row in the right DataFrame whose
+        'on' key is less than or equal to the left's key.
+
+      - A "forward" search selects the first row in the right DataFrame whose
+        'on' key is greater than or equal to the left's key.
+
+      - A "nearest" search selects the row in the right DataFrame whose 'on'
+        key is closest in absolute distance to the left's key.
+
+    Optionally match on equivalent keys with 'by' before searching with 'on'.
+
+    .. versionadded:: 3.3.0
+
+    Parameters
+    ----------
+    left : DataFrame or named Series
+    right : DataFrame or named Series
+    on : label
+        Field name to join on. Must be found in both DataFrames.
+        The data MUST be ordered. Furthermore this must be a numeric column,
+        such as datetimelike, integer, or float. On or left_on/right_on
+        must be given.
+    left_on : label
+        Field name to join on in left DataFrame.
+    right_on : label
+        Field name to join on in right DataFrame.
+    left_index : bool
+        Use the index of the left DataFrame as the join key.
+    right_index : bool
+        Use the index of the right DataFrame as the join key.
+    by : column name or list of column names
+        Match on these columns before performing merge operation.
+    left_by : column name
+        Field names to match on in the left DataFrame.
+    right_by : column name
+        Field names to match on in the right DataFrame.
+    suffixes : 2-length sequence (tuple, list, ...)
+        Suffix to apply to overlapping column names in the left and right
+        side, respectively.
+    tolerance : int or Timedelta, optional, default None
+        Select asof tolerance within this range; must be compatible
+        with the merge index.
+    allow_exact_matches : bool, default True
+
+        - If True, allow matching with the same 'on' value
+          (i.e. less-than-or-equal-to / greater-than-or-equal-to)
+        - If False, don't match the same 'on' value
+          (i.e., strictly less-than / strictly greater-than).
+
+    direction : 'backward' (default), 'forward', or 'nearest'
+        Whether to search for prior, subsequent, or closest matches.
+
+    Returns
+    -------
+    merged : DataFrame
+
+    See Also
+    --------
+    merge : Merge with a database-style join.
+    merge_ordered : Merge with optional filling/interpolation.
+
+    Examples
+    --------
+    >>> left = ps.DataFrame({"a": [1, 5, 10], "left_val": ["a", "b", "c"]})
+    >>> left
+        a left_val
+    0   1        a
+    1   5        b
+    2  10        c
+
+    >>> right = ps.DataFrame({"a": [1, 2, 3, 6, 7], "right_val": [1, 2, 3, 6, 7]})
+    >>> right
+       a  right_val
+    0  1          1
+    1  2          2
+    2  3          3
+    3  6          6
+    4  7          7
+
+    >>> ps.merge_asof(left, right, on="a").sort_values("a").reset_index(drop=True)
+        a left_val  right_val
+    0   1        a          1
+    1   5        b          3
+    2  10        c          7
+
+    >>> ps.merge_asof(
+    ...     left,
+    ...     right,
+    ...     on="a",
+    ...     allow_exact_matches=False
+    ... ).sort_values("a").reset_index(drop=True)
+        a left_val  right_val
+    0   1        a        NaN
+    1   5        b        3.0
+    2  10        c        7.0
+
+    >>> ps.merge_asof(
+    ...     left,
+    ...     right,
+    ...     on="a",
+    ...     direction="forward"
+    ... ).sort_values("a").reset_index(drop=True)
+        a left_val  right_val
+    0   1        a        1.0
+    1   5        b        6.0
+    2  10        c        NaN
+
+    >>> ps.merge_asof(
+    ...     left,
+    ...     right,
+    ...     on="a",
+    ...     direction="nearest"
+    ... ).sort_values("a").reset_index(drop=True)
+        a left_val  right_val
+    0   1        a          1
+    1   5        b          6
+    2  10        c          7
+
+    We can use indexed DataFrames as well.
+
+    >>> left = ps.DataFrame({"left_val": ["a", "b", "c"]}, index=[1, 5, 10])
+    >>> left
+       left_val
+    1         a
+    5         b
+    10        c
+
+    >>> right = ps.DataFrame({"right_val": [1, 2, 3, 6, 7]}, index=[1, 2, 3, 6, 7])
+    >>> right
+       right_val
+    1          1
+    2          2
+    3          3
+    6          6
+    7          7
+
+    >>> ps.merge_asof(left, right, left_index=True, right_index=True).sort_index()
+       left_val  right_val
+    1         a          1
+    5         b          3
+    10        c          7
+
+    Here is a real-world times-series example
+
+    >>> quotes = ps.DataFrame(
+    ...     {
+    ...         "time": [
+    ...             pd.Timestamp("2016-05-25 13:30:00.023"),
+    ...             pd.Timestamp("2016-05-25 13:30:00.023"),
+    ...             pd.Timestamp("2016-05-25 13:30:00.030"),
+    ...             pd.Timestamp("2016-05-25 13:30:00.041"),
+    ...             pd.Timestamp("2016-05-25 13:30:00.048"),
+    ...             pd.Timestamp("2016-05-25 13:30:00.049"),
+    ...             pd.Timestamp("2016-05-25 13:30:00.072"),
+    ...             pd.Timestamp("2016-05-25 13:30:00.075")
+    ...         ],
+    ...         "ticker": [
+    ...                "GOOG",
+    ...                "MSFT",
+    ...                "MSFT",
+    ...                "MSFT",
+    ...                "GOOG",
+    ...                "AAPL",
+    ...                "GOOG",
+    ...                "MSFT"
+    ...            ],
+    ...            "bid": [720.50, 51.95, 51.97, 51.99, 720.50, 97.99, 720.50, 52.01],
+    ...            "ask": [720.93, 51.96, 51.98, 52.00, 720.93, 98.01, 720.88, 52.03]
+    ...     }
+    ... )
+    >>> quotes
+                         time ticker     bid     ask
+    0 2016-05-25 13:30:00.023   GOOG  720.50  720.93
+    1 2016-05-25 13:30:00.023   MSFT   51.95   51.96
+    2 2016-05-25 13:30:00.030   MSFT   51.97   51.98
+    3 2016-05-25 13:30:00.041   MSFT   51.99   52.00
+    4 2016-05-25 13:30:00.048   GOOG  720.50  720.93
+    5 2016-05-25 13:30:00.049   AAPL   97.99   98.01
+    6 2016-05-25 13:30:00.072   GOOG  720.50  720.88
+    7 2016-05-25 13:30:00.075   MSFT   52.01   52.03
+
+    >>> trades = ps.DataFrame(
+    ...        {
+    ...            "time": [
+    ...                pd.Timestamp("2016-05-25 13:30:00.023"),
+    ...                pd.Timestamp("2016-05-25 13:30:00.038"),
+    ...                pd.Timestamp("2016-05-25 13:30:00.048"),
+    ...                pd.Timestamp("2016-05-25 13:30:00.048"),
+    ...                pd.Timestamp("2016-05-25 13:30:00.048")
+    ...            ],
+    ...            "ticker": ["MSFT", "MSFT", "GOOG", "GOOG", "AAPL"],
+    ...            "price": [51.95, 51.95, 720.77, 720.92, 98.0],
+    ...            "quantity": [75, 155, 100, 100, 100]
+    ...        }
+    ...    )
+    >>> trades
+                         time ticker   price  quantity
+    0 2016-05-25 13:30:00.023   MSFT   51.95        75
+    1 2016-05-25 13:30:00.038   MSFT   51.95       155
+    2 2016-05-25 13:30:00.048   GOOG  720.77       100
+    3 2016-05-25 13:30:00.048   GOOG  720.92       100
+    4 2016-05-25 13:30:00.048   AAPL   98.00       100
+
+    By default we are taking the asof of the quotes
+
+    >>> ps.merge_asof(
+    ...    trades, quotes, on="time", by="ticker"
+    ... ).sort_values(["time", "ticker", "price"]).reset_index(drop=True)
+                         time ticker   price  quantity     bid     ask
+    0 2016-05-25 13:30:00.023   MSFT   51.95        75   51.95   51.96
+    1 2016-05-25 13:30:00.038   MSFT   51.95       155   51.97   51.98
+    2 2016-05-25 13:30:00.048   AAPL   98.00       100     NaN     NaN
+    3 2016-05-25 13:30:00.048   GOOG  720.77       100  720.50  720.93
+    4 2016-05-25 13:30:00.048   GOOG  720.92       100  720.50  720.93
+
+    We only asof within 2ms between the quote time and the trade time
+
+    >>> ps.merge_asof(
+    ...     trades,
+    ...     quotes,
+    ...     on="time",
+    ...     by="ticker",
+    ...     tolerance=F.expr("INTERVAL 2 MILLISECONDS")  # pd.Timedelta("2ms")
+    ... ).sort_values(["time", "ticker", "price"]).reset_index(drop=True)
+                         time ticker   price  quantity     bid     ask
+    0 2016-05-25 13:30:00.023   MSFT   51.95        75   51.95   51.96
+    1 2016-05-25 13:30:00.038   MSFT   51.95       155     NaN     NaN
+    2 2016-05-25 13:30:00.048   AAPL   98.00       100     NaN     NaN
+    3 2016-05-25 13:30:00.048   GOOG  720.77       100  720.50  720.93
+    4 2016-05-25 13:30:00.048   GOOG  720.92       100  720.50  720.93
+
+    We only asof within 10ms between the quote time and the trade time
+    and we exclude exact matches on time. However *prior* data will
+    propagate forward
+
+    >>> ps.merge_asof(
+    ...     trades,
+    ...     quotes,
+    ...     on="time",
+    ...     by="ticker",
+    ...     tolerance=F.expr("INTERVAL 10 MILLISECONDS"),  # pd.Timedelta("10ms")

Review comment:
       Yes, it should be handled in pyspark side, but it's still WIP [SPARK-33054](https://issues.apache.org/jira/browse/SPARK-33054). We can remove this workaround when it's done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34053: [SPARK-36813][SQL][PYTHON] Propose an infrastructure of as-of join and imlement ps.merge_asof

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34053:
URL: https://github.com/apache/spark/pull/34053#issuecomment-926219569


   Kubernetes integration test status failure
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/48078/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ueshin commented on pull request #34053: [SPARK-36813][SQL][PYTHON] Propose an infrastructure of as-of join and imlement ps.merge_asof

Posted by GitBox <gi...@apache.org>.
ueshin commented on pull request #34053:
URL: https://github.com/apache/spark/pull/34053#issuecomment-923484258


   cc @cloud-fan @sigmod @allisonwang-db @HyukjinKwon 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a change in pull request #34053: [SPARK-36813][SQL][PYTHON] Propose an infrastructure of as-of join and imlement ps.merge_asof

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on a change in pull request #34053:
URL: https://github.com/apache/spark/pull/34053#discussion_r716516207



##########
File path: python/pyspark/pandas/namespace.py
##########
@@ -2747,6 +2751,499 @@ def merge(
     )
 
 
+def merge_asof(
+    left: Union[DataFrame, Series],
+    right: Union[DataFrame, Series],
+    on: Optional[Name] = None,
+    left_on: Optional[Name] = None,
+    right_on: Optional[Name] = None,
+    left_index: bool = False,
+    right_index: bool = False,
+    by: Optional[Union[Name, List[Name]]] = None,
+    left_by: Optional[Union[Name, List[Name]]] = None,
+    right_by: Optional[Union[Name, List[Name]]] = None,
+    suffixes: Tuple[str, str] = ("_x", "_y"),
+    tolerance: Optional[Any] = None,
+    allow_exact_matches: bool = True,
+    direction: str = "backward",
+) -> DataFrame:
+    """
+    Perform an asof merge.
+
+    This is similar to a left-join except that we match on nearest
+    key rather than equal keys.
+
+    For each row in the left DataFrame:
+
+      - A "backward" search selects the last row in the right DataFrame whose
+        'on' key is less than or equal to the left's key.
+
+      - A "forward" search selects the first row in the right DataFrame whose
+        'on' key is greater than or equal to the left's key.
+
+      - A "nearest" search selects the row in the right DataFrame whose 'on'
+        key is closest in absolute distance to the left's key.
+
+    Optionally match on equivalent keys with 'by' before searching with 'on'.
+
+    .. versionadded:: 3.3.0
+
+    Parameters
+    ----------
+    left : DataFrame or named Series
+    right : DataFrame or named Series
+    on : label
+        Field name to join on. Must be found in both DataFrames.
+        The data MUST be ordered. Furthermore this must be a numeric column,
+        such as datetimelike, integer, or float. On or left_on/right_on
+        must be given.
+    left_on : label
+        Field name to join on in left DataFrame.
+    right_on : label
+        Field name to join on in right DataFrame.
+    left_index : bool
+        Use the index of the left DataFrame as the join key.
+    right_index : bool
+        Use the index of the right DataFrame as the join key.
+    by : column name or list of column names
+        Match on these columns before performing merge operation.
+    left_by : column name
+        Field names to match on in the left DataFrame.
+    right_by : column name
+        Field names to match on in the right DataFrame.
+    suffixes : 2-length sequence (tuple, list, ...)
+        Suffix to apply to overlapping column names in the left and right
+        side, respectively.
+    tolerance : int or Timedelta, optional, default None
+        Select asof tolerance within this range; must be compatible
+        with the merge index.
+    allow_exact_matches : bool, default True
+
+        - If True, allow matching with the same 'on' value
+          (i.e. less-than-or-equal-to / greater-than-or-equal-to)
+        - If False, don't match the same 'on' value
+          (i.e., strictly less-than / strictly greater-than).
+
+    direction : 'backward' (default), 'forward', or 'nearest'
+        Whether to search for prior, subsequent, or closest matches.
+
+    Returns
+    -------
+    merged : DataFrame
+
+    See Also
+    --------
+    merge : Merge with a database-style join.
+    merge_ordered : Merge with optional filling/interpolation.
+
+    Examples
+    --------
+    >>> left = ps.DataFrame({"a": [1, 5, 10], "left_val": ["a", "b", "c"]})
+    >>> left
+        a left_val
+    0   1        a
+    1   5        b
+    2  10        c
+
+    >>> right = ps.DataFrame({"a": [1, 2, 3, 6, 7], "right_val": [1, 2, 3, 6, 7]})
+    >>> right
+       a  right_val
+    0  1          1
+    1  2          2
+    2  3          3
+    3  6          6
+    4  7          7
+
+    >>> ps.merge_asof(left, right, on="a").sort_values("a").reset_index(drop=True)
+        a left_val  right_val
+    0   1        a          1
+    1   5        b          3
+    2  10        c          7
+
+    >>> ps.merge_asof(
+    ...     left,
+    ...     right,
+    ...     on="a",
+    ...     allow_exact_matches=False
+    ... ).sort_values("a").reset_index(drop=True)
+        a left_val  right_val
+    0   1        a        NaN
+    1   5        b        3.0
+    2  10        c        7.0
+
+    >>> ps.merge_asof(
+    ...     left,
+    ...     right,
+    ...     on="a",
+    ...     direction="forward"
+    ... ).sort_values("a").reset_index(drop=True)
+        a left_val  right_val
+    0   1        a        1.0
+    1   5        b        6.0
+    2  10        c        NaN
+
+    >>> ps.merge_asof(
+    ...     left,
+    ...     right,
+    ...     on="a",
+    ...     direction="nearest"
+    ... ).sort_values("a").reset_index(drop=True)
+        a left_val  right_val
+    0   1        a          1
+    1   5        b          6
+    2  10        c          7
+
+    We can use indexed DataFrames as well.
+
+    >>> left = ps.DataFrame({"left_val": ["a", "b", "c"]}, index=[1, 5, 10])
+    >>> left
+       left_val
+    1         a
+    5         b
+    10        c
+
+    >>> right = ps.DataFrame({"right_val": [1, 2, 3, 6, 7]}, index=[1, 2, 3, 6, 7])
+    >>> right
+       right_val
+    1          1
+    2          2
+    3          3
+    6          6
+    7          7
+
+    >>> ps.merge_asof(left, right, left_index=True, right_index=True).sort_index()
+       left_val  right_val
+    1         a          1
+    5         b          3
+    10        c          7
+
+    Here is a real-world times-series example
+
+    >>> quotes = ps.DataFrame(
+    ...     {
+    ...         "time": [
+    ...             pd.Timestamp("2016-05-25 13:30:00.023"),
+    ...             pd.Timestamp("2016-05-25 13:30:00.023"),
+    ...             pd.Timestamp("2016-05-25 13:30:00.030"),
+    ...             pd.Timestamp("2016-05-25 13:30:00.041"),
+    ...             pd.Timestamp("2016-05-25 13:30:00.048"),
+    ...             pd.Timestamp("2016-05-25 13:30:00.049"),
+    ...             pd.Timestamp("2016-05-25 13:30:00.072"),
+    ...             pd.Timestamp("2016-05-25 13:30:00.075")
+    ...         ],
+    ...         "ticker": [
+    ...                "GOOG",
+    ...                "MSFT",
+    ...                "MSFT",
+    ...                "MSFT",
+    ...                "GOOG",
+    ...                "AAPL",
+    ...                "GOOG",
+    ...                "MSFT"
+    ...            ],
+    ...            "bid": [720.50, 51.95, 51.97, 51.99, 720.50, 97.99, 720.50, 52.01],
+    ...            "ask": [720.93, 51.96, 51.98, 52.00, 720.93, 98.01, 720.88, 52.03]
+    ...     }
+    ... )
+    >>> quotes
+                         time ticker     bid     ask
+    0 2016-05-25 13:30:00.023   GOOG  720.50  720.93
+    1 2016-05-25 13:30:00.023   MSFT   51.95   51.96
+    2 2016-05-25 13:30:00.030   MSFT   51.97   51.98
+    3 2016-05-25 13:30:00.041   MSFT   51.99   52.00
+    4 2016-05-25 13:30:00.048   GOOG  720.50  720.93
+    5 2016-05-25 13:30:00.049   AAPL   97.99   98.01
+    6 2016-05-25 13:30:00.072   GOOG  720.50  720.88
+    7 2016-05-25 13:30:00.075   MSFT   52.01   52.03
+
+    >>> trades = ps.DataFrame(
+    ...        {
+    ...            "time": [
+    ...                pd.Timestamp("2016-05-25 13:30:00.023"),
+    ...                pd.Timestamp("2016-05-25 13:30:00.038"),
+    ...                pd.Timestamp("2016-05-25 13:30:00.048"),
+    ...                pd.Timestamp("2016-05-25 13:30:00.048"),
+    ...                pd.Timestamp("2016-05-25 13:30:00.048")
+    ...            ],
+    ...            "ticker": ["MSFT", "MSFT", "GOOG", "GOOG", "AAPL"],
+    ...            "price": [51.95, 51.95, 720.77, 720.92, 98.0],
+    ...            "quantity": [75, 155, 100, 100, 100]
+    ...        }
+    ...    )
+    >>> trades
+                         time ticker   price  quantity
+    0 2016-05-25 13:30:00.023   MSFT   51.95        75
+    1 2016-05-25 13:30:00.038   MSFT   51.95       155
+    2 2016-05-25 13:30:00.048   GOOG  720.77       100
+    3 2016-05-25 13:30:00.048   GOOG  720.92       100
+    4 2016-05-25 13:30:00.048   AAPL   98.00       100
+
+    By default we are taking the asof of the quotes
+
+    >>> ps.merge_asof(
+    ...    trades, quotes, on="time", by="ticker"
+    ... ).sort_values(["time", "ticker", "price"]).reset_index(drop=True)
+                         time ticker   price  quantity     bid     ask
+    0 2016-05-25 13:30:00.023   MSFT   51.95        75   51.95   51.96
+    1 2016-05-25 13:30:00.038   MSFT   51.95       155   51.97   51.98
+    2 2016-05-25 13:30:00.048   AAPL   98.00       100     NaN     NaN
+    3 2016-05-25 13:30:00.048   GOOG  720.77       100  720.50  720.93
+    4 2016-05-25 13:30:00.048   GOOG  720.92       100  720.50  720.93
+
+    We only asof within 2ms between the quote time and the trade time
+
+    >>> ps.merge_asof(
+    ...     trades,
+    ...     quotes,
+    ...     on="time",
+    ...     by="ticker",
+    ...     tolerance=F.expr("INTERVAL 2 MILLISECONDS")  # pd.Timedelta("2ms")
+    ... ).sort_values(["time", "ticker", "price"]).reset_index(drop=True)
+                         time ticker   price  quantity     bid     ask
+    0 2016-05-25 13:30:00.023   MSFT   51.95        75   51.95   51.96
+    1 2016-05-25 13:30:00.038   MSFT   51.95       155     NaN     NaN
+    2 2016-05-25 13:30:00.048   AAPL   98.00       100     NaN     NaN
+    3 2016-05-25 13:30:00.048   GOOG  720.77       100  720.50  720.93
+    4 2016-05-25 13:30:00.048   GOOG  720.92       100  720.50  720.93
+
+    We only asof within 10ms between the quote time and the trade time
+    and we exclude exact matches on time. However *prior* data will
+    propagate forward
+
+    >>> ps.merge_asof(
+    ...     trades,
+    ...     quotes,
+    ...     on="time",
+    ...     by="ticker",
+    ...     tolerance=F.expr("INTERVAL 10 MILLISECONDS"),  # pd.Timedelta("10ms")

Review comment:
       if the allowed `tolerance` can only be number or `pd.Timedelta` in Pandas, maybe we can convert it to long value at pyspark side, and the logical plan can use `toleranceAssertion: Option[Long]`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34053: [SPARK-36813][SQL][PYTHON] Propose an infrastructure of as-of join and imlement ps.merge_asof

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34053:
URL: https://github.com/apache/spark/pull/34053#issuecomment-926224948


   **[Test build #143571 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/143571/testReport)** for PR 34053 at commit [`973b09d`](https://github.com/apache/spark/commit/973b09d5e6d9c2360bfdef5ad4c5e69d6bb929f8).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ueshin commented on a change in pull request #34053: [SPARK-36813][SQL][PYTHON] Propose an infrastructure of as-of join and imlement ps.merge_asof

Posted by GitBox <gi...@apache.org>.
ueshin commented on a change in pull request #34053:
URL: https://github.com/apache/spark/pull/34053#discussion_r715202851



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
##########
@@ -2122,6 +2125,68 @@ object RewriteIntersectAll extends Rule[LogicalPlan] {
   }
 }
 
+/**
+ * Replaces logical [[AsOfJoin]] operator using a combination of Join and Aggregate operator.
+ *
+ * Input Pseudo-Query:
+ * {{{
+ *    SELECT * FROM left ASOF JOIN right ON (condition, as_of on(left.t, right.t), tolerance)
+ * }}}
+ *
+ * Rewritten Query:
+ * {{{
+ *   SELECT left.*, __right__.*
+ *   FROM (
+ *        SELECT
+ *             left.*,
+ *             (
+ *                  SELECT MIN_BY(STRUCT(right.*), left.t - right.t)
+ *                  FROM right
+ *                  WHERE condition AND left.t >= right.t AND right.t >= left.t - tolerance

Review comment:
       Added some rules to `CheckAnalysis`. https://github.com/apache/spark/pull/34053/commits/20c8281b5f8929e0cb6510456d5acbfb3ea5349f




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ueshin commented on a change in pull request #34053: [SPARK-36813][SQL][PYTHON] Propose an infrastructure of as-of join and imlement ps.merge_asof

Posted by GitBox <gi...@apache.org>.
ueshin commented on a change in pull request #34053:
URL: https://github.com/apache/spark/pull/34053#discussion_r716998942



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
##########
@@ -1597,3 +1597,119 @@ case class LateralJoin(
     copy(left = newChild)
   }
 }
+
+/**
+ * A logical plan for as-of join.
+ */
+case class AsOfJoin(
+    left: LogicalPlan,
+    right: LogicalPlan,
+    asOfCondition: Expression,
+    condition: Option[Expression],
+    joinType: JoinType,
+    orderExpression: Expression,
+    toleranceAssertion: Option[Expression]) extends BinaryNode {
+
+  require(Seq(Inner, LeftOuter).contains(joinType),
+    s"Unsupported as-of join type $joinType")
+
+  override protected def stringArgs: Iterator[Any] = super.stringArgs.take(5)

Review comment:
       I don't think they are very useful to appear in explains because they are for only internal usage.

##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
##########
@@ -249,6 +249,20 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
               s"join condition '${condition.sql}' " +
                 s"of type ${condition.dataType.catalogString} is not a boolean.")
 
+          case j @ AsOfJoin(_, _, _, Some(condition), _, _, _)
+              if condition.dataType != BooleanType =>
+            failAnalysis(
+              s"join condition '${condition.sql}' " +
+                s"of type ${condition.dataType.catalogString} is not a boolean.")
+
+          case j @ AsOfJoin(_, _, _, _, _, _, Some(toleranceAssertion)) =>
+            if (!toleranceAssertion.foldable) {
+              failAnalysis("Input argument tolerance must be a constant.")
+            }
+            if (!toleranceAssertion.eval().asInstanceOf[Boolean]) {

Review comment:
       That makes sense, but the result of `eval()` is type dependent, e.g. if the `tolerance` is `CatelendarInterval`, the cast will fail, IIUC, whereas if we make the condition with expressions, it will only return `Boolean` value after the analysis phase. WDYT?

##########
File path: python/pyspark/pandas/namespace.py
##########
@@ -2747,6 +2751,499 @@ def merge(
     )
 
 
+def merge_asof(
+    left: Union[DataFrame, Series],
+    right: Union[DataFrame, Series],
+    on: Optional[Name] = None,
+    left_on: Optional[Name] = None,
+    right_on: Optional[Name] = None,
+    left_index: bool = False,
+    right_index: bool = False,
+    by: Optional[Union[Name, List[Name]]] = None,
+    left_by: Optional[Union[Name, List[Name]]] = None,
+    right_by: Optional[Union[Name, List[Name]]] = None,
+    suffixes: Tuple[str, str] = ("_x", "_y"),
+    tolerance: Optional[Any] = None,
+    allow_exact_matches: bool = True,
+    direction: str = "backward",
+) -> DataFrame:
+    """
+    Perform an asof merge.
+
+    This is similar to a left-join except that we match on nearest
+    key rather than equal keys.
+
+    For each row in the left DataFrame:
+
+      - A "backward" search selects the last row in the right DataFrame whose
+        'on' key is less than or equal to the left's key.
+
+      - A "forward" search selects the first row in the right DataFrame whose
+        'on' key is greater than or equal to the left's key.
+
+      - A "nearest" search selects the row in the right DataFrame whose 'on'
+        key is closest in absolute distance to the left's key.
+
+    Optionally match on equivalent keys with 'by' before searching with 'on'.
+
+    .. versionadded:: 3.3.0
+
+    Parameters
+    ----------
+    left : DataFrame or named Series
+    right : DataFrame or named Series
+    on : label
+        Field name to join on. Must be found in both DataFrames.
+        The data MUST be ordered. Furthermore this must be a numeric column,
+        such as datetimelike, integer, or float. On or left_on/right_on
+        must be given.
+    left_on : label
+        Field name to join on in left DataFrame.
+    right_on : label
+        Field name to join on in right DataFrame.
+    left_index : bool
+        Use the index of the left DataFrame as the join key.
+    right_index : bool
+        Use the index of the right DataFrame as the join key.
+    by : column name or list of column names
+        Match on these columns before performing merge operation.
+    left_by : column name
+        Field names to match on in the left DataFrame.
+    right_by : column name
+        Field names to match on in the right DataFrame.
+    suffixes : 2-length sequence (tuple, list, ...)
+        Suffix to apply to overlapping column names in the left and right
+        side, respectively.
+    tolerance : int or Timedelta, optional, default None
+        Select asof tolerance within this range; must be compatible
+        with the merge index.
+    allow_exact_matches : bool, default True
+
+        - If True, allow matching with the same 'on' value
+          (i.e. less-than-or-equal-to / greater-than-or-equal-to)
+        - If False, don't match the same 'on' value
+          (i.e., strictly less-than / strictly greater-than).
+
+    direction : 'backward' (default), 'forward', or 'nearest'
+        Whether to search for prior, subsequent, or closest matches.
+
+    Returns
+    -------
+    merged : DataFrame
+
+    See Also
+    --------
+    merge : Merge with a database-style join.
+    merge_ordered : Merge with optional filling/interpolation.
+
+    Examples
+    --------
+    >>> left = ps.DataFrame({"a": [1, 5, 10], "left_val": ["a", "b", "c"]})
+    >>> left
+        a left_val
+    0   1        a
+    1   5        b
+    2  10        c
+
+    >>> right = ps.DataFrame({"a": [1, 2, 3, 6, 7], "right_val": [1, 2, 3, 6, 7]})
+    >>> right
+       a  right_val
+    0  1          1
+    1  2          2
+    2  3          3
+    3  6          6
+    4  7          7
+
+    >>> ps.merge_asof(left, right, on="a").sort_values("a").reset_index(drop=True)
+        a left_val  right_val
+    0   1        a          1
+    1   5        b          3
+    2  10        c          7
+
+    >>> ps.merge_asof(
+    ...     left,
+    ...     right,
+    ...     on="a",
+    ...     allow_exact_matches=False
+    ... ).sort_values("a").reset_index(drop=True)
+        a left_val  right_val
+    0   1        a        NaN
+    1   5        b        3.0
+    2  10        c        7.0
+
+    >>> ps.merge_asof(
+    ...     left,
+    ...     right,
+    ...     on="a",
+    ...     direction="forward"
+    ... ).sort_values("a").reset_index(drop=True)
+        a left_val  right_val
+    0   1        a        1.0
+    1   5        b        6.0
+    2  10        c        NaN
+
+    >>> ps.merge_asof(
+    ...     left,
+    ...     right,
+    ...     on="a",
+    ...     direction="nearest"
+    ... ).sort_values("a").reset_index(drop=True)
+        a left_val  right_val
+    0   1        a          1
+    1   5        b          6
+    2  10        c          7
+
+    We can use indexed DataFrames as well.
+
+    >>> left = ps.DataFrame({"left_val": ["a", "b", "c"]}, index=[1, 5, 10])
+    >>> left
+       left_val
+    1         a
+    5         b
+    10        c
+
+    >>> right = ps.DataFrame({"right_val": [1, 2, 3, 6, 7]}, index=[1, 2, 3, 6, 7])
+    >>> right
+       right_val
+    1          1
+    2          2
+    3          3
+    6          6
+    7          7
+
+    >>> ps.merge_asof(left, right, left_index=True, right_index=True).sort_index()
+       left_val  right_val
+    1         a          1
+    5         b          3
+    10        c          7
+
+    Here is a real-world times-series example
+
+    >>> quotes = ps.DataFrame(
+    ...     {
+    ...         "time": [
+    ...             pd.Timestamp("2016-05-25 13:30:00.023"),
+    ...             pd.Timestamp("2016-05-25 13:30:00.023"),
+    ...             pd.Timestamp("2016-05-25 13:30:00.030"),
+    ...             pd.Timestamp("2016-05-25 13:30:00.041"),
+    ...             pd.Timestamp("2016-05-25 13:30:00.048"),
+    ...             pd.Timestamp("2016-05-25 13:30:00.049"),
+    ...             pd.Timestamp("2016-05-25 13:30:00.072"),
+    ...             pd.Timestamp("2016-05-25 13:30:00.075")
+    ...         ],
+    ...         "ticker": [
+    ...                "GOOG",
+    ...                "MSFT",
+    ...                "MSFT",
+    ...                "MSFT",
+    ...                "GOOG",
+    ...                "AAPL",
+    ...                "GOOG",
+    ...                "MSFT"
+    ...            ],
+    ...            "bid": [720.50, 51.95, 51.97, 51.99, 720.50, 97.99, 720.50, 52.01],
+    ...            "ask": [720.93, 51.96, 51.98, 52.00, 720.93, 98.01, 720.88, 52.03]
+    ...     }
+    ... )
+    >>> quotes
+                         time ticker     bid     ask
+    0 2016-05-25 13:30:00.023   GOOG  720.50  720.93
+    1 2016-05-25 13:30:00.023   MSFT   51.95   51.96
+    2 2016-05-25 13:30:00.030   MSFT   51.97   51.98
+    3 2016-05-25 13:30:00.041   MSFT   51.99   52.00
+    4 2016-05-25 13:30:00.048   GOOG  720.50  720.93
+    5 2016-05-25 13:30:00.049   AAPL   97.99   98.01
+    6 2016-05-25 13:30:00.072   GOOG  720.50  720.88
+    7 2016-05-25 13:30:00.075   MSFT   52.01   52.03
+
+    >>> trades = ps.DataFrame(
+    ...        {
+    ...            "time": [
+    ...                pd.Timestamp("2016-05-25 13:30:00.023"),
+    ...                pd.Timestamp("2016-05-25 13:30:00.038"),
+    ...                pd.Timestamp("2016-05-25 13:30:00.048"),
+    ...                pd.Timestamp("2016-05-25 13:30:00.048"),
+    ...                pd.Timestamp("2016-05-25 13:30:00.048")
+    ...            ],
+    ...            "ticker": ["MSFT", "MSFT", "GOOG", "GOOG", "AAPL"],
+    ...            "price": [51.95, 51.95, 720.77, 720.92, 98.0],
+    ...            "quantity": [75, 155, 100, 100, 100]
+    ...        }
+    ...    )
+    >>> trades
+                         time ticker   price  quantity
+    0 2016-05-25 13:30:00.023   MSFT   51.95        75
+    1 2016-05-25 13:30:00.038   MSFT   51.95       155
+    2 2016-05-25 13:30:00.048   GOOG  720.77       100
+    3 2016-05-25 13:30:00.048   GOOG  720.92       100
+    4 2016-05-25 13:30:00.048   AAPL   98.00       100
+
+    By default we are taking the asof of the quotes
+
+    >>> ps.merge_asof(
+    ...    trades, quotes, on="time", by="ticker"
+    ... ).sort_values(["time", "ticker", "price"]).reset_index(drop=True)
+                         time ticker   price  quantity     bid     ask
+    0 2016-05-25 13:30:00.023   MSFT   51.95        75   51.95   51.96
+    1 2016-05-25 13:30:00.038   MSFT   51.95       155   51.97   51.98
+    2 2016-05-25 13:30:00.048   AAPL   98.00       100     NaN     NaN
+    3 2016-05-25 13:30:00.048   GOOG  720.77       100  720.50  720.93
+    4 2016-05-25 13:30:00.048   GOOG  720.92       100  720.50  720.93
+
+    We only asof within 2ms between the quote time and the trade time
+
+    >>> ps.merge_asof(
+    ...     trades,
+    ...     quotes,
+    ...     on="time",
+    ...     by="ticker",
+    ...     tolerance=F.expr("INTERVAL 2 MILLISECONDS")  # pd.Timedelta("2ms")
+    ... ).sort_values(["time", "ticker", "price"]).reset_index(drop=True)
+                         time ticker   price  quantity     bid     ask
+    0 2016-05-25 13:30:00.023   MSFT   51.95        75   51.95   51.96
+    1 2016-05-25 13:30:00.038   MSFT   51.95       155     NaN     NaN
+    2 2016-05-25 13:30:00.048   AAPL   98.00       100     NaN     NaN
+    3 2016-05-25 13:30:00.048   GOOG  720.77       100  720.50  720.93
+    4 2016-05-25 13:30:00.048   GOOG  720.92       100  720.50  720.93
+
+    We only asof within 10ms between the quote time and the trade time
+    and we exclude exact matches on time. However *prior* data will
+    propagate forward
+
+    >>> ps.merge_asof(
+    ...     trades,
+    ...     quotes,
+    ...     on="time",
+    ...     by="ticker",
+    ...     tolerance=F.expr("INTERVAL 10 MILLISECONDS"),  # pd.Timedelta("10ms")

Review comment:
       Yes, it should be handled in `pyspark side, and it's still WIP [SPARK-33054](https://issues.apache.org/jira/browse/SPARK-33054). We can remove this workaround when it's done.

##########
File path: python/pyspark/pandas/namespace.py
##########
@@ -2747,6 +2751,499 @@ def merge(
     )
 
 
+def merge_asof(
+    left: Union[DataFrame, Series],
+    right: Union[DataFrame, Series],
+    on: Optional[Name] = None,
+    left_on: Optional[Name] = None,
+    right_on: Optional[Name] = None,
+    left_index: bool = False,
+    right_index: bool = False,
+    by: Optional[Union[Name, List[Name]]] = None,
+    left_by: Optional[Union[Name, List[Name]]] = None,
+    right_by: Optional[Union[Name, List[Name]]] = None,
+    suffixes: Tuple[str, str] = ("_x", "_y"),
+    tolerance: Optional[Any] = None,
+    allow_exact_matches: bool = True,
+    direction: str = "backward",
+) -> DataFrame:
+    """
+    Perform an asof merge.
+
+    This is similar to a left-join except that we match on nearest
+    key rather than equal keys.
+
+    For each row in the left DataFrame:
+
+      - A "backward" search selects the last row in the right DataFrame whose
+        'on' key is less than or equal to the left's key.
+
+      - A "forward" search selects the first row in the right DataFrame whose
+        'on' key is greater than or equal to the left's key.
+
+      - A "nearest" search selects the row in the right DataFrame whose 'on'
+        key is closest in absolute distance to the left's key.
+
+    Optionally match on equivalent keys with 'by' before searching with 'on'.
+
+    .. versionadded:: 3.3.0
+
+    Parameters
+    ----------
+    left : DataFrame or named Series
+    right : DataFrame or named Series
+    on : label
+        Field name to join on. Must be found in both DataFrames.
+        The data MUST be ordered. Furthermore this must be a numeric column,
+        such as datetimelike, integer, or float. On or left_on/right_on
+        must be given.
+    left_on : label
+        Field name to join on in left DataFrame.
+    right_on : label
+        Field name to join on in right DataFrame.
+    left_index : bool
+        Use the index of the left DataFrame as the join key.
+    right_index : bool
+        Use the index of the right DataFrame as the join key.
+    by : column name or list of column names
+        Match on these columns before performing merge operation.
+    left_by : column name
+        Field names to match on in the left DataFrame.
+    right_by : column name
+        Field names to match on in the right DataFrame.
+    suffixes : 2-length sequence (tuple, list, ...)
+        Suffix to apply to overlapping column names in the left and right
+        side, respectively.
+    tolerance : int or Timedelta, optional, default None
+        Select asof tolerance within this range; must be compatible
+        with the merge index.
+    allow_exact_matches : bool, default True
+
+        - If True, allow matching with the same 'on' value
+          (i.e. less-than-or-equal-to / greater-than-or-equal-to)
+        - If False, don't match the same 'on' value
+          (i.e., strictly less-than / strictly greater-than).
+
+    direction : 'backward' (default), 'forward', or 'nearest'
+        Whether to search for prior, subsequent, or closest matches.
+
+    Returns
+    -------
+    merged : DataFrame
+
+    See Also
+    --------
+    merge : Merge with a database-style join.
+    merge_ordered : Merge with optional filling/interpolation.
+
+    Examples
+    --------
+    >>> left = ps.DataFrame({"a": [1, 5, 10], "left_val": ["a", "b", "c"]})
+    >>> left
+        a left_val
+    0   1        a
+    1   5        b
+    2  10        c
+
+    >>> right = ps.DataFrame({"a": [1, 2, 3, 6, 7], "right_val": [1, 2, 3, 6, 7]})
+    >>> right
+       a  right_val
+    0  1          1
+    1  2          2
+    2  3          3
+    3  6          6
+    4  7          7
+
+    >>> ps.merge_asof(left, right, on="a").sort_values("a").reset_index(drop=True)
+        a left_val  right_val
+    0   1        a          1
+    1   5        b          3
+    2  10        c          7
+
+    >>> ps.merge_asof(
+    ...     left,
+    ...     right,
+    ...     on="a",
+    ...     allow_exact_matches=False
+    ... ).sort_values("a").reset_index(drop=True)
+        a left_val  right_val
+    0   1        a        NaN
+    1   5        b        3.0
+    2  10        c        7.0
+
+    >>> ps.merge_asof(
+    ...     left,
+    ...     right,
+    ...     on="a",
+    ...     direction="forward"
+    ... ).sort_values("a").reset_index(drop=True)
+        a left_val  right_val
+    0   1        a        1.0
+    1   5        b        6.0
+    2  10        c        NaN
+
+    >>> ps.merge_asof(
+    ...     left,
+    ...     right,
+    ...     on="a",
+    ...     direction="nearest"
+    ... ).sort_values("a").reset_index(drop=True)
+        a left_val  right_val
+    0   1        a          1
+    1   5        b          6
+    2  10        c          7
+
+    We can use indexed DataFrames as well.
+
+    >>> left = ps.DataFrame({"left_val": ["a", "b", "c"]}, index=[1, 5, 10])
+    >>> left
+       left_val
+    1         a
+    5         b
+    10        c
+
+    >>> right = ps.DataFrame({"right_val": [1, 2, 3, 6, 7]}, index=[1, 2, 3, 6, 7])
+    >>> right
+       right_val
+    1          1
+    2          2
+    3          3
+    6          6
+    7          7
+
+    >>> ps.merge_asof(left, right, left_index=True, right_index=True).sort_index()
+       left_val  right_val
+    1         a          1
+    5         b          3
+    10        c          7
+
+    Here is a real-world times-series example
+
+    >>> quotes = ps.DataFrame(
+    ...     {
+    ...         "time": [
+    ...             pd.Timestamp("2016-05-25 13:30:00.023"),
+    ...             pd.Timestamp("2016-05-25 13:30:00.023"),
+    ...             pd.Timestamp("2016-05-25 13:30:00.030"),
+    ...             pd.Timestamp("2016-05-25 13:30:00.041"),
+    ...             pd.Timestamp("2016-05-25 13:30:00.048"),
+    ...             pd.Timestamp("2016-05-25 13:30:00.049"),
+    ...             pd.Timestamp("2016-05-25 13:30:00.072"),
+    ...             pd.Timestamp("2016-05-25 13:30:00.075")
+    ...         ],
+    ...         "ticker": [
+    ...                "GOOG",
+    ...                "MSFT",
+    ...                "MSFT",
+    ...                "MSFT",
+    ...                "GOOG",
+    ...                "AAPL",
+    ...                "GOOG",
+    ...                "MSFT"
+    ...            ],
+    ...            "bid": [720.50, 51.95, 51.97, 51.99, 720.50, 97.99, 720.50, 52.01],
+    ...            "ask": [720.93, 51.96, 51.98, 52.00, 720.93, 98.01, 720.88, 52.03]
+    ...     }
+    ... )
+    >>> quotes
+                         time ticker     bid     ask
+    0 2016-05-25 13:30:00.023   GOOG  720.50  720.93
+    1 2016-05-25 13:30:00.023   MSFT   51.95   51.96
+    2 2016-05-25 13:30:00.030   MSFT   51.97   51.98
+    3 2016-05-25 13:30:00.041   MSFT   51.99   52.00
+    4 2016-05-25 13:30:00.048   GOOG  720.50  720.93
+    5 2016-05-25 13:30:00.049   AAPL   97.99   98.01
+    6 2016-05-25 13:30:00.072   GOOG  720.50  720.88
+    7 2016-05-25 13:30:00.075   MSFT   52.01   52.03
+
+    >>> trades = ps.DataFrame(
+    ...        {
+    ...            "time": [
+    ...                pd.Timestamp("2016-05-25 13:30:00.023"),
+    ...                pd.Timestamp("2016-05-25 13:30:00.038"),
+    ...                pd.Timestamp("2016-05-25 13:30:00.048"),
+    ...                pd.Timestamp("2016-05-25 13:30:00.048"),
+    ...                pd.Timestamp("2016-05-25 13:30:00.048")
+    ...            ],
+    ...            "ticker": ["MSFT", "MSFT", "GOOG", "GOOG", "AAPL"],
+    ...            "price": [51.95, 51.95, 720.77, 720.92, 98.0],
+    ...            "quantity": [75, 155, 100, 100, 100]
+    ...        }
+    ...    )
+    >>> trades
+                         time ticker   price  quantity
+    0 2016-05-25 13:30:00.023   MSFT   51.95        75
+    1 2016-05-25 13:30:00.038   MSFT   51.95       155
+    2 2016-05-25 13:30:00.048   GOOG  720.77       100
+    3 2016-05-25 13:30:00.048   GOOG  720.92       100
+    4 2016-05-25 13:30:00.048   AAPL   98.00       100
+
+    By default we are taking the asof of the quotes
+
+    >>> ps.merge_asof(
+    ...    trades, quotes, on="time", by="ticker"
+    ... ).sort_values(["time", "ticker", "price"]).reset_index(drop=True)
+                         time ticker   price  quantity     bid     ask
+    0 2016-05-25 13:30:00.023   MSFT   51.95        75   51.95   51.96
+    1 2016-05-25 13:30:00.038   MSFT   51.95       155   51.97   51.98
+    2 2016-05-25 13:30:00.048   AAPL   98.00       100     NaN     NaN
+    3 2016-05-25 13:30:00.048   GOOG  720.77       100  720.50  720.93
+    4 2016-05-25 13:30:00.048   GOOG  720.92       100  720.50  720.93
+
+    We only asof within 2ms between the quote time and the trade time
+
+    >>> ps.merge_asof(
+    ...     trades,
+    ...     quotes,
+    ...     on="time",
+    ...     by="ticker",
+    ...     tolerance=F.expr("INTERVAL 2 MILLISECONDS")  # pd.Timedelta("2ms")
+    ... ).sort_values(["time", "ticker", "price"]).reset_index(drop=True)
+                         time ticker   price  quantity     bid     ask
+    0 2016-05-25 13:30:00.023   MSFT   51.95        75   51.95   51.96
+    1 2016-05-25 13:30:00.038   MSFT   51.95       155     NaN     NaN
+    2 2016-05-25 13:30:00.048   AAPL   98.00       100     NaN     NaN
+    3 2016-05-25 13:30:00.048   GOOG  720.77       100  720.50  720.93
+    4 2016-05-25 13:30:00.048   GOOG  720.92       100  720.50  720.93
+
+    We only asof within 10ms between the quote time and the trade time
+    and we exclude exact matches on time. However *prior* data will
+    propagate forward
+
+    >>> ps.merge_asof(
+    ...     trades,
+    ...     quotes,
+    ...     on="time",
+    ...     by="ticker",
+    ...     tolerance=F.expr("INTERVAL 10 MILLISECONDS"),  # pd.Timedelta("10ms")

Review comment:
       Yes, it should be handled in pyspark side, and it's still WIP [SPARK-33054](https://issues.apache.org/jira/browse/SPARK-33054). We can remove this workaround when it's done.

##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
##########
@@ -1597,3 +1597,119 @@ case class LateralJoin(
     copy(left = newChild)
   }
 }
+
+/**
+ * A logical plan for as-of join.
+ */
+case class AsOfJoin(
+    left: LogicalPlan,
+    right: LogicalPlan,
+    asOfCondition: Expression,
+    condition: Option[Expression],
+    joinType: JoinType,
+    orderExpression: Expression,
+    toleranceAssertion: Option[Expression]) extends BinaryNode {
+
+  require(Seq(Inner, LeftOuter).contains(joinType),
+    s"Unsupported as-of join type $joinType")
+
+  override protected def stringArgs: Iterator[Any] = super.stringArgs.take(5)
+
+  override def output: Seq[Attribute] = {
+    joinType match {
+      case LeftOuter =>
+        left.output ++ right.output.map(_.withNullability(true))
+      case _ =>
+        left.output ++ right.output
+    }
+  }
+
+  def duplicateResolved: Boolean = left.outputSet.intersect(right.outputSet).isEmpty
+
+  override lazy val resolved: Boolean = {
+    childrenResolved &&
+      expressions.forall(_.resolved) &&
+      duplicateResolved &&
+      asOfCondition.dataType == BooleanType &&
+      condition.forall(_.dataType == BooleanType) &&
+      toleranceAssertion.forall { assertion =>
+        assertion.foldable && assertion.eval().asInstanceOf[Boolean]
+      }
+  }
+
+  final override val nodePatterns: Seq[TreePattern] = Seq(AS_OF_JOIN)
+
+  override protected def withNewChildrenInternal(
+      newLeft: LogicalPlan, newRight: LogicalPlan): AsOfJoin = {
+    copy(left = newLeft, right = newRight)
+  }
+}
+
+object AsOfJoin {
+
+  def apply(
+      left: LogicalPlan,
+      right: LogicalPlan,
+      leftAsOf: Expression,
+      rightAsOf: Expression,
+      condition: Option[Expression],
+      joinType: JoinType,
+      tolerance: Option[Expression],
+      allowExactMatches: Boolean,
+      direction: AsOfJoinDirection): AsOfJoin = {
+    val asOfCond = makeAsOfCond(leftAsOf, rightAsOf, tolerance, allowExactMatches, direction)
+    val orderingExpr = makeOrderingExpr(leftAsOf, rightAsOf, direction)
+    AsOfJoin(left, right, asOfCond, condition, joinType,
+      orderingExpr, tolerance.map(t => GreaterThanOrEqual(t, Literal.default(t.dataType))))
+  }
+
+  private def makeAsOfCond(
+      leftAsOf: Expression,
+      rightAsOf: Expression,
+      tolerance: Option[Expression],
+      allowExactMatches: Boolean,
+      direction: AsOfJoinDirection): Expression = {
+    val base = (allowExactMatches, direction) match {
+      case (true, Backward) => GreaterThanOrEqual(leftAsOf, rightAsOf)
+      case (false, Backward) => GreaterThan(leftAsOf, rightAsOf)
+      case (true, Forward) => LessThanOrEqual(leftAsOf, rightAsOf)
+      case (false, Forward) => LessThan(leftAsOf, rightAsOf)
+      case (true, Nearest) => Literal.TrueLiteral
+      case (false, Nearest) => Not(EqualTo(leftAsOf, rightAsOf))
+    }
+    tolerance match {
+      case Some(tolerance) =>
+        (allowExactMatches, direction) match {
+          case (true, Backward) =>
+            And(base, GreaterThanOrEqual(rightAsOf, Subtract(leftAsOf, tolerance)))
+          case (false, Backward) =>
+            And(base, GreaterThan(rightAsOf, Subtract(leftAsOf, tolerance)))
+          case (true, Forward) =>
+            And(base, LessThanOrEqual(rightAsOf, Add(leftAsOf, tolerance)))

Review comment:
       I think we can leave it as-is because `Analyzer` should replace it with a proper expression; otherwise we should manage by hand here as well.

##########
File path: python/pyspark/pandas/namespace.py
##########
@@ -2747,6 +2751,499 @@ def merge(
     )
 
 
+def merge_asof(
+    left: Union[DataFrame, Series],
+    right: Union[DataFrame, Series],
+    on: Optional[Name] = None,
+    left_on: Optional[Name] = None,
+    right_on: Optional[Name] = None,
+    left_index: bool = False,
+    right_index: bool = False,
+    by: Optional[Union[Name, List[Name]]] = None,
+    left_by: Optional[Union[Name, List[Name]]] = None,
+    right_by: Optional[Union[Name, List[Name]]] = None,
+    suffixes: Tuple[str, str] = ("_x", "_y"),
+    tolerance: Optional[Any] = None,
+    allow_exact_matches: bool = True,
+    direction: str = "backward",
+) -> DataFrame:
+    """
+    Perform an asof merge.
+
+    This is similar to a left-join except that we match on nearest
+    key rather than equal keys.
+
+    For each row in the left DataFrame:
+
+      - A "backward" search selects the last row in the right DataFrame whose
+        'on' key is less than or equal to the left's key.
+
+      - A "forward" search selects the first row in the right DataFrame whose
+        'on' key is greater than or equal to the left's key.
+
+      - A "nearest" search selects the row in the right DataFrame whose 'on'
+        key is closest in absolute distance to the left's key.
+
+    Optionally match on equivalent keys with 'by' before searching with 'on'.
+
+    .. versionadded:: 3.3.0
+
+    Parameters
+    ----------
+    left : DataFrame or named Series
+    right : DataFrame or named Series
+    on : label
+        Field name to join on. Must be found in both DataFrames.
+        The data MUST be ordered. Furthermore this must be a numeric column,
+        such as datetimelike, integer, or float. On or left_on/right_on
+        must be given.
+    left_on : label
+        Field name to join on in left DataFrame.
+    right_on : label
+        Field name to join on in right DataFrame.
+    left_index : bool
+        Use the index of the left DataFrame as the join key.
+    right_index : bool
+        Use the index of the right DataFrame as the join key.
+    by : column name or list of column names
+        Match on these columns before performing merge operation.
+    left_by : column name
+        Field names to match on in the left DataFrame.
+    right_by : column name
+        Field names to match on in the right DataFrame.
+    suffixes : 2-length sequence (tuple, list, ...)
+        Suffix to apply to overlapping column names in the left and right
+        side, respectively.
+    tolerance : int or Timedelta, optional, default None
+        Select asof tolerance within this range; must be compatible
+        with the merge index.
+    allow_exact_matches : bool, default True
+
+        - If True, allow matching with the same 'on' value
+          (i.e. less-than-or-equal-to / greater-than-or-equal-to)
+        - If False, don't match the same 'on' value
+          (i.e., strictly less-than / strictly greater-than).
+
+    direction : 'backward' (default), 'forward', or 'nearest'
+        Whether to search for prior, subsequent, or closest matches.
+
+    Returns
+    -------
+    merged : DataFrame
+
+    See Also
+    --------
+    merge : Merge with a database-style join.
+    merge_ordered : Merge with optional filling/interpolation.
+
+    Examples
+    --------
+    >>> left = ps.DataFrame({"a": [1, 5, 10], "left_val": ["a", "b", "c"]})
+    >>> left
+        a left_val
+    0   1        a
+    1   5        b
+    2  10        c
+
+    >>> right = ps.DataFrame({"a": [1, 2, 3, 6, 7], "right_val": [1, 2, 3, 6, 7]})
+    >>> right
+       a  right_val
+    0  1          1
+    1  2          2
+    2  3          3
+    3  6          6
+    4  7          7
+
+    >>> ps.merge_asof(left, right, on="a").sort_values("a").reset_index(drop=True)
+        a left_val  right_val
+    0   1        a          1
+    1   5        b          3
+    2  10        c          7
+
+    >>> ps.merge_asof(
+    ...     left,
+    ...     right,
+    ...     on="a",
+    ...     allow_exact_matches=False
+    ... ).sort_values("a").reset_index(drop=True)
+        a left_val  right_val
+    0   1        a        NaN
+    1   5        b        3.0
+    2  10        c        7.0
+
+    >>> ps.merge_asof(
+    ...     left,
+    ...     right,
+    ...     on="a",
+    ...     direction="forward"
+    ... ).sort_values("a").reset_index(drop=True)
+        a left_val  right_val
+    0   1        a        1.0
+    1   5        b        6.0
+    2  10        c        NaN
+
+    >>> ps.merge_asof(
+    ...     left,
+    ...     right,
+    ...     on="a",
+    ...     direction="nearest"
+    ... ).sort_values("a").reset_index(drop=True)
+        a left_val  right_val
+    0   1        a          1
+    1   5        b          6
+    2  10        c          7
+
+    We can use indexed DataFrames as well.
+
+    >>> left = ps.DataFrame({"left_val": ["a", "b", "c"]}, index=[1, 5, 10])
+    >>> left
+       left_val
+    1         a
+    5         b
+    10        c
+
+    >>> right = ps.DataFrame({"right_val": [1, 2, 3, 6, 7]}, index=[1, 2, 3, 6, 7])
+    >>> right
+       right_val
+    1          1
+    2          2
+    3          3
+    6          6
+    7          7
+
+    >>> ps.merge_asof(left, right, left_index=True, right_index=True).sort_index()
+       left_val  right_val
+    1         a          1
+    5         b          3
+    10        c          7
+
+    Here is a real-world times-series example
+
+    >>> quotes = ps.DataFrame(
+    ...     {
+    ...         "time": [
+    ...             pd.Timestamp("2016-05-25 13:30:00.023"),
+    ...             pd.Timestamp("2016-05-25 13:30:00.023"),
+    ...             pd.Timestamp("2016-05-25 13:30:00.030"),
+    ...             pd.Timestamp("2016-05-25 13:30:00.041"),
+    ...             pd.Timestamp("2016-05-25 13:30:00.048"),
+    ...             pd.Timestamp("2016-05-25 13:30:00.049"),
+    ...             pd.Timestamp("2016-05-25 13:30:00.072"),
+    ...             pd.Timestamp("2016-05-25 13:30:00.075")
+    ...         ],
+    ...         "ticker": [
+    ...                "GOOG",
+    ...                "MSFT",
+    ...                "MSFT",
+    ...                "MSFT",
+    ...                "GOOG",
+    ...                "AAPL",
+    ...                "GOOG",
+    ...                "MSFT"
+    ...            ],
+    ...            "bid": [720.50, 51.95, 51.97, 51.99, 720.50, 97.99, 720.50, 52.01],
+    ...            "ask": [720.93, 51.96, 51.98, 52.00, 720.93, 98.01, 720.88, 52.03]
+    ...     }
+    ... )
+    >>> quotes
+                         time ticker     bid     ask
+    0 2016-05-25 13:30:00.023   GOOG  720.50  720.93
+    1 2016-05-25 13:30:00.023   MSFT   51.95   51.96
+    2 2016-05-25 13:30:00.030   MSFT   51.97   51.98
+    3 2016-05-25 13:30:00.041   MSFT   51.99   52.00
+    4 2016-05-25 13:30:00.048   GOOG  720.50  720.93
+    5 2016-05-25 13:30:00.049   AAPL   97.99   98.01
+    6 2016-05-25 13:30:00.072   GOOG  720.50  720.88
+    7 2016-05-25 13:30:00.075   MSFT   52.01   52.03
+
+    >>> trades = ps.DataFrame(
+    ...        {
+    ...            "time": [
+    ...                pd.Timestamp("2016-05-25 13:30:00.023"),
+    ...                pd.Timestamp("2016-05-25 13:30:00.038"),
+    ...                pd.Timestamp("2016-05-25 13:30:00.048"),
+    ...                pd.Timestamp("2016-05-25 13:30:00.048"),
+    ...                pd.Timestamp("2016-05-25 13:30:00.048")
+    ...            ],
+    ...            "ticker": ["MSFT", "MSFT", "GOOG", "GOOG", "AAPL"],
+    ...            "price": [51.95, 51.95, 720.77, 720.92, 98.0],
+    ...            "quantity": [75, 155, 100, 100, 100]
+    ...        }
+    ...    )
+    >>> trades
+                         time ticker   price  quantity
+    0 2016-05-25 13:30:00.023   MSFT   51.95        75
+    1 2016-05-25 13:30:00.038   MSFT   51.95       155
+    2 2016-05-25 13:30:00.048   GOOG  720.77       100
+    3 2016-05-25 13:30:00.048   GOOG  720.92       100
+    4 2016-05-25 13:30:00.048   AAPL   98.00       100
+
+    By default we are taking the asof of the quotes
+
+    >>> ps.merge_asof(
+    ...    trades, quotes, on="time", by="ticker"
+    ... ).sort_values(["time", "ticker", "price"]).reset_index(drop=True)
+                         time ticker   price  quantity     bid     ask
+    0 2016-05-25 13:30:00.023   MSFT   51.95        75   51.95   51.96
+    1 2016-05-25 13:30:00.038   MSFT   51.95       155   51.97   51.98
+    2 2016-05-25 13:30:00.048   AAPL   98.00       100     NaN     NaN
+    3 2016-05-25 13:30:00.048   GOOG  720.77       100  720.50  720.93
+    4 2016-05-25 13:30:00.048   GOOG  720.92       100  720.50  720.93
+
+    We only asof within 2ms between the quote time and the trade time
+
+    >>> ps.merge_asof(
+    ...     trades,
+    ...     quotes,
+    ...     on="time",
+    ...     by="ticker",
+    ...     tolerance=F.expr("INTERVAL 2 MILLISECONDS")  # pd.Timedelta("2ms")
+    ... ).sort_values(["time", "ticker", "price"]).reset_index(drop=True)
+                         time ticker   price  quantity     bid     ask
+    0 2016-05-25 13:30:00.023   MSFT   51.95        75   51.95   51.96
+    1 2016-05-25 13:30:00.038   MSFT   51.95       155     NaN     NaN
+    2 2016-05-25 13:30:00.048   AAPL   98.00       100     NaN     NaN
+    3 2016-05-25 13:30:00.048   GOOG  720.77       100  720.50  720.93
+    4 2016-05-25 13:30:00.048   GOOG  720.92       100  720.50  720.93
+
+    We only asof within 10ms between the quote time and the trade time
+    and we exclude exact matches on time. However *prior* data will
+    propagate forward
+
+    >>> ps.merge_asof(
+    ...     trades,
+    ...     quotes,
+    ...     on="time",
+    ...     by="ticker",
+    ...     tolerance=F.expr("INTERVAL 10 MILLISECONDS"),  # pd.Timedelta("10ms")

Review comment:
       Yes, it should be handled in pyspark side, but it's still WIP [SPARK-33054](https://issues.apache.org/jira/browse/SPARK-33054). We can remove this workaround when it's done.

##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
##########
@@ -1597,3 +1597,119 @@ case class LateralJoin(
     copy(left = newChild)
   }
 }
+
+/**
+ * A logical plan for as-of join.
+ */
+case class AsOfJoin(
+    left: LogicalPlan,
+    right: LogicalPlan,
+    asOfCondition: Expression,
+    condition: Option[Expression],
+    joinType: JoinType,
+    orderExpression: Expression,
+    toleranceAssertion: Option[Expression]) extends BinaryNode {
+
+  require(Seq(Inner, LeftOuter).contains(joinType),
+    s"Unsupported as-of join type $joinType")
+
+  override protected def stringArgs: Iterator[Any] = super.stringArgs.take(5)
+
+  override def output: Seq[Attribute] = {
+    joinType match {
+      case LeftOuter =>
+        left.output ++ right.output.map(_.withNullability(true))
+      case _ =>
+        left.output ++ right.output
+    }
+  }
+
+  def duplicateResolved: Boolean = left.outputSet.intersect(right.outputSet).isEmpty
+
+  override lazy val resolved: Boolean = {
+    childrenResolved &&
+      expressions.forall(_.resolved) &&
+      duplicateResolved &&
+      asOfCondition.dataType == BooleanType &&
+      condition.forall(_.dataType == BooleanType) &&
+      toleranceAssertion.forall { assertion =>
+        assertion.foldable && assertion.eval().asInstanceOf[Boolean]
+      }
+  }
+
+  final override val nodePatterns: Seq[TreePattern] = Seq(AS_OF_JOIN)
+
+  override protected def withNewChildrenInternal(
+      newLeft: LogicalPlan, newRight: LogicalPlan): AsOfJoin = {
+    copy(left = newLeft, right = newRight)
+  }
+}
+
+object AsOfJoin {
+
+  def apply(
+      left: LogicalPlan,
+      right: LogicalPlan,
+      leftAsOf: Expression,
+      rightAsOf: Expression,
+      condition: Option[Expression],
+      joinType: JoinType,
+      tolerance: Option[Expression],
+      allowExactMatches: Boolean,
+      direction: AsOfJoinDirection): AsOfJoin = {
+    val asOfCond = makeAsOfCond(leftAsOf, rightAsOf, tolerance, allowExactMatches, direction)
+    val orderingExpr = makeOrderingExpr(leftAsOf, rightAsOf, direction)
+    AsOfJoin(left, right, asOfCond, condition, joinType,
+      orderingExpr, tolerance.map(t => GreaterThanOrEqual(t, Literal.default(t.dataType))))
+  }
+
+  private def makeAsOfCond(
+      leftAsOf: Expression,
+      rightAsOf: Expression,
+      tolerance: Option[Expression],
+      allowExactMatches: Boolean,
+      direction: AsOfJoinDirection): Expression = {
+    val base = (allowExactMatches, direction) match {
+      case (true, Backward) => GreaterThanOrEqual(leftAsOf, rightAsOf)
+      case (false, Backward) => GreaterThan(leftAsOf, rightAsOf)
+      case (true, Forward) => LessThanOrEqual(leftAsOf, rightAsOf)
+      case (false, Forward) => LessThan(leftAsOf, rightAsOf)
+      case (true, Nearest) => Literal.TrueLiteral
+      case (false, Nearest) => Not(EqualTo(leftAsOf, rightAsOf))
+    }
+    tolerance match {
+      case Some(tolerance) =>
+        (allowExactMatches, direction) match {
+          case (true, Backward) =>
+            And(base, GreaterThanOrEqual(rightAsOf, Subtract(leftAsOf, tolerance)))
+          case (false, Backward) =>
+            And(base, GreaterThan(rightAsOf, Subtract(leftAsOf, tolerance)))
+          case (true, Forward) =>
+            And(base, LessThanOrEqual(rightAsOf, Add(leftAsOf, tolerance)))

Review comment:
       I think we can leave it as-is because `Analyzer` should replace it with a proper expression; otherwise we have to manage by hand here as well.

##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
##########
@@ -1597,3 +1597,119 @@ case class LateralJoin(
     copy(left = newChild)
   }
 }
+
+/**
+ * A logical plan for as-of join.
+ */
+case class AsOfJoin(
+    left: LogicalPlan,
+    right: LogicalPlan,
+    asOfCondition: Expression,
+    condition: Option[Expression],
+    joinType: JoinType,
+    orderExpression: Expression,
+    toleranceAssertion: Option[Expression]) extends BinaryNode {
+
+  require(Seq(Inner, LeftOuter).contains(joinType),
+    s"Unsupported as-of join type $joinType")
+
+  override protected def stringArgs: Iterator[Any] = super.stringArgs.take(5)
+
+  override def output: Seq[Attribute] = {
+    joinType match {
+      case LeftOuter =>
+        left.output ++ right.output.map(_.withNullability(true))
+      case _ =>
+        left.output ++ right.output
+    }
+  }
+
+  def duplicateResolved: Boolean = left.outputSet.intersect(right.outputSet).isEmpty
+
+  override lazy val resolved: Boolean = {
+    childrenResolved &&
+      expressions.forall(_.resolved) &&
+      duplicateResolved &&
+      asOfCondition.dataType == BooleanType &&
+      condition.forall(_.dataType == BooleanType) &&
+      toleranceAssertion.forall { assertion =>
+        assertion.foldable && assertion.eval().asInstanceOf[Boolean]
+      }
+  }
+
+  final override val nodePatterns: Seq[TreePattern] = Seq(AS_OF_JOIN)
+
+  override protected def withNewChildrenInternal(
+      newLeft: LogicalPlan, newRight: LogicalPlan): AsOfJoin = {
+    copy(left = newLeft, right = newRight)
+  }
+}
+
+object AsOfJoin {
+
+  def apply(
+      left: LogicalPlan,
+      right: LogicalPlan,
+      leftAsOf: Expression,
+      rightAsOf: Expression,
+      condition: Option[Expression],
+      joinType: JoinType,
+      tolerance: Option[Expression],
+      allowExactMatches: Boolean,
+      direction: AsOfJoinDirection): AsOfJoin = {
+    val asOfCond = makeAsOfCond(leftAsOf, rightAsOf, tolerance, allowExactMatches, direction)
+    val orderingExpr = makeOrderingExpr(leftAsOf, rightAsOf, direction)
+    AsOfJoin(left, right, asOfCond, condition, joinType,
+      orderingExpr, tolerance.map(t => GreaterThanOrEqual(t, Literal.default(t.dataType))))
+  }
+
+  private def makeAsOfCond(
+      leftAsOf: Expression,
+      rightAsOf: Expression,
+      tolerance: Option[Expression],
+      allowExactMatches: Boolean,
+      direction: AsOfJoinDirection): Expression = {
+    val base = (allowExactMatches, direction) match {
+      case (true, Backward) => GreaterThanOrEqual(leftAsOf, rightAsOf)
+      case (false, Backward) => GreaterThan(leftAsOf, rightAsOf)
+      case (true, Forward) => LessThanOrEqual(leftAsOf, rightAsOf)
+      case (false, Forward) => LessThan(leftAsOf, rightAsOf)
+      case (true, Nearest) => Literal.TrueLiteral
+      case (false, Nearest) => Not(EqualTo(leftAsOf, rightAsOf))
+    }
+    tolerance match {
+      case Some(tolerance) =>
+        (allowExactMatches, direction) match {
+          case (true, Backward) =>
+            And(base, GreaterThanOrEqual(rightAsOf, Subtract(leftAsOf, tolerance)))
+          case (false, Backward) =>
+            And(base, GreaterThan(rightAsOf, Subtract(leftAsOf, tolerance)))
+          case (true, Forward) =>
+            And(base, LessThanOrEqual(rightAsOf, Add(leftAsOf, tolerance)))

Review comment:
       I think we can leave it as-is because `Analyzer` should replace it with a proper expression; otherwise we have to manage it here as well.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a change in pull request #34053: [SPARK-36813][SQL][PYTHON] Propose an infrastructure of as-of join and imlement ps.merge_asof

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on a change in pull request #34053:
URL: https://github.com/apache/spark/pull/34053#discussion_r716513941



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
##########
@@ -249,6 +249,20 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
               s"join condition '${condition.sql}' " +
                 s"of type ${condition.dataType.catalogString} is not a boolean.")
 
+          case j @ AsOfJoin(_, _, _, Some(condition), _, _, _)
+              if condition.dataType != BooleanType =>
+            failAnalysis(
+              s"join condition '${condition.sql}' " +
+                s"of type ${condition.dataType.catalogString} is not a boolean.")
+
+          case j @ AsOfJoin(_, _, _, _, _, _, Some(toleranceAssertion)) =>
+            if (!toleranceAssertion.foldable) {
+              failAnalysis("Input argument tolerance must be a constant.")
+            }
+            if (!toleranceAssertion.eval().asInstanceOf[Boolean]) {

Review comment:
       I think `.asInstanceOf[Number].longValue >= 0` is clearer.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ueshin commented on a change in pull request #34053: [SPARK-36813][SQL][PYTHON] Propose an infrastructure of as-of join and imlement ps.merge_asof

Posted by GitBox <gi...@apache.org>.
ueshin commented on a change in pull request #34053:
URL: https://github.com/apache/spark/pull/34053#discussion_r716999204



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
##########
@@ -1597,3 +1597,119 @@ case class LateralJoin(
     copy(left = newChild)
   }
 }
+
+/**
+ * A logical plan for as-of join.
+ */
+case class AsOfJoin(
+    left: LogicalPlan,
+    right: LogicalPlan,
+    asOfCondition: Expression,
+    condition: Option[Expression],
+    joinType: JoinType,
+    orderExpression: Expression,
+    toleranceAssertion: Option[Expression]) extends BinaryNode {
+
+  require(Seq(Inner, LeftOuter).contains(joinType),
+    s"Unsupported as-of join type $joinType")
+
+  override protected def stringArgs: Iterator[Any] = super.stringArgs.take(5)
+
+  override def output: Seq[Attribute] = {
+    joinType match {
+      case LeftOuter =>
+        left.output ++ right.output.map(_.withNullability(true))
+      case _ =>
+        left.output ++ right.output
+    }
+  }
+
+  def duplicateResolved: Boolean = left.outputSet.intersect(right.outputSet).isEmpty
+
+  override lazy val resolved: Boolean = {
+    childrenResolved &&
+      expressions.forall(_.resolved) &&
+      duplicateResolved &&
+      asOfCondition.dataType == BooleanType &&
+      condition.forall(_.dataType == BooleanType) &&
+      toleranceAssertion.forall { assertion =>
+        assertion.foldable && assertion.eval().asInstanceOf[Boolean]
+      }
+  }
+
+  final override val nodePatterns: Seq[TreePattern] = Seq(AS_OF_JOIN)
+
+  override protected def withNewChildrenInternal(
+      newLeft: LogicalPlan, newRight: LogicalPlan): AsOfJoin = {
+    copy(left = newLeft, right = newRight)
+  }
+}
+
+object AsOfJoin {
+
+  def apply(
+      left: LogicalPlan,
+      right: LogicalPlan,
+      leftAsOf: Expression,
+      rightAsOf: Expression,
+      condition: Option[Expression],
+      joinType: JoinType,
+      tolerance: Option[Expression],
+      allowExactMatches: Boolean,
+      direction: AsOfJoinDirection): AsOfJoin = {
+    val asOfCond = makeAsOfCond(leftAsOf, rightAsOf, tolerance, allowExactMatches, direction)
+    val orderingExpr = makeOrderingExpr(leftAsOf, rightAsOf, direction)
+    AsOfJoin(left, right, asOfCond, condition, joinType,
+      orderingExpr, tolerance.map(t => GreaterThanOrEqual(t, Literal.default(t.dataType))))
+  }
+
+  private def makeAsOfCond(
+      leftAsOf: Expression,
+      rightAsOf: Expression,
+      tolerance: Option[Expression],
+      allowExactMatches: Boolean,
+      direction: AsOfJoinDirection): Expression = {
+    val base = (allowExactMatches, direction) match {
+      case (true, Backward) => GreaterThanOrEqual(leftAsOf, rightAsOf)
+      case (false, Backward) => GreaterThan(leftAsOf, rightAsOf)
+      case (true, Forward) => LessThanOrEqual(leftAsOf, rightAsOf)
+      case (false, Forward) => LessThan(leftAsOf, rightAsOf)
+      case (true, Nearest) => Literal.TrueLiteral
+      case (false, Nearest) => Not(EqualTo(leftAsOf, rightAsOf))
+    }
+    tolerance match {
+      case Some(tolerance) =>
+        (allowExactMatches, direction) match {
+          case (true, Backward) =>
+            And(base, GreaterThanOrEqual(rightAsOf, Subtract(leftAsOf, tolerance)))
+          case (false, Backward) =>
+            And(base, GreaterThan(rightAsOf, Subtract(leftAsOf, tolerance)))
+          case (true, Forward) =>
+            And(base, LessThanOrEqual(rightAsOf, Add(leftAsOf, tolerance)))

Review comment:
       I think we can leave it as-is because `Analyzer` should replace it with a proper expression; otherwise we have to manage it here as well.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a change in pull request #34053: [SPARK-36813][SQL][PYTHON] Propose an infrastructure of as-of join and imlement ps.merge_asof

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on a change in pull request #34053:
URL: https://github.com/apache/spark/pull/34053#discussion_r717305177



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
##########
@@ -249,6 +249,20 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
               s"join condition '${condition.sql}' " +
                 s"of type ${condition.dataType.catalogString} is not a boolean.")
 
+          case j @ AsOfJoin(_, _, _, Some(condition), _, _, _)
+              if condition.dataType != BooleanType =>
+            failAnalysis(
+              s"join condition '${condition.sql}' " +
+                s"of type ${condition.dataType.catalogString} is not a boolean.")
+
+          case j @ AsOfJoin(_, _, _, _, _, _, Some(toleranceAssertion)) =>
+            if (!toleranceAssertion.foldable) {
+              failAnalysis("Input argument tolerance must be a constant.")
+            }
+            if (!toleranceAssertion.eval().asInstanceOf[Boolean]) {

Review comment:
       ah that's why it's called `toleranceAssertion`, it's a predicate!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34053: [SPARK-36813][SQL][PYTHON] Propose an infrastructure of as-of join and imlement ps.merge_asof

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34053:
URL: https://github.com/apache/spark/pull/34053#issuecomment-923645302


   **[Test build #143468 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/143468/testReport)** for PR 34053 at commit [`c8237f4`](https://github.com/apache/spark/commit/c8237f46c325a0f95d4793a809d2bcc53cb948c2).
    * This patch passes all tests.
    * This patch merges cleanly.
    * This patch adds the following public classes _(experimental)_:
     * `case class AsOfJoin(`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #34053: [SPARK-36813][SQL][PYTHON] Propose an infrastructure of as-of join and imlement ps.merge_asof

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #34053:
URL: https://github.com/apache/spark/pull/34053#issuecomment-926252459


   **[Test build #143575 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/143575/testReport)** for PR 34053 at commit [`cd0f707`](https://github.com/apache/spark/commit/cd0f7070b4a504d2aba57d7e4b71fcc225731603).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #34053: [SPARK-36813][SQL][PYTHON] Propose an infrastructure of as-of join and imlement ps.merge_asof

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #34053:
URL: https://github.com/apache/spark/pull/34053#issuecomment-926353009


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/143575/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #34053: [SPARK-36813][SQL][PYTHON] Propose an infrastructure of as-of join and imlement ps.merge_asof

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #34053:
URL: https://github.com/apache/spark/pull/34053#issuecomment-926288171


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/48085/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #34053: [SPARK-36813][SQL][PYTHON] Propose an infrastructure of as-of join and imlement ps.merge_asof

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #34053:
URL: https://github.com/apache/spark/pull/34053#issuecomment-926223898


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/48078/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34053: [SPARK-36813][SQL][PYTHON] Propose an infrastructure of as-of join and imlement ps.merge_asof

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34053:
URL: https://github.com/apache/spark/pull/34053#issuecomment-926196326


   Kubernetes integration test starting
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/48078/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #34053: [SPARK-36813][SQL][PYTHON] Propose an infrastructure of as-of join and imlement ps.merge_asof

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #34053:
URL: https://github.com/apache/spark/pull/34053#issuecomment-926269627


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/48080/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #34053: [SPARK-36813][SQL][PYTHON] Propose an infrastructure of as-of join and imlement ps.merge_asof

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #34053:
URL: https://github.com/apache/spark/pull/34053#issuecomment-926269627


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/48080/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a change in pull request #34053: [SPARK-36813][SQL][PYTHON] Propose an infrastructure of as-of join and imlement ps.merge_asof

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on a change in pull request #34053:
URL: https://github.com/apache/spark/pull/34053#discussion_r715020883



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
##########
@@ -2122,6 +2125,68 @@ object RewriteIntersectAll extends Rule[LogicalPlan] {
   }
 }
 
+/**
+ * Replaces logical [[AsOfJoin]] operator using a combination of Join and Aggregate operator.
+ *
+ * Input Pseudo-Query:
+ * {{{
+ *    SELECT * FROM left ASOF JOIN right ON (condition, as_of on(left.t, right.t), tolerance)
+ * }}}
+ *
+ * Rewritten Query:
+ * {{{
+ *   SELECT left.*, __right__.*
+ *   FROM (
+ *        SELECT
+ *             left.*,
+ *             (
+ *                  SELECT MIN_BY(STRUCT(right.*), left.t - right.t)
+ *                  FROM right
+ *                  WHERE condition AND left.t >= right.t AND right.t >= left.t - tolerance
+ *             ) as __right__
+ *        FROM left
+ *        )
+ * }}}
+ */
+object RewriteAsOfJoin extends Rule[LogicalPlan] {

Review comment:
       nit: move it to a new file?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ueshin commented on a change in pull request #34053: [SPARK-36813][SQL][PYTHON] Propose an infrastructure of as-of join and imlement ps.merge_asof

Posted by GitBox <gi...@apache.org>.
ueshin commented on a change in pull request #34053:
URL: https://github.com/apache/spark/pull/34053#discussion_r715051356



##########
File path: sql/core/src/test/scala/org/apache/spark/sql/DataFrameAsOfJoinSuite.scala
##########
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
+import org.apache.spark.sql.functions._
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types._
+
+class DataFrameAsOfJoinSuite extends QueryTest
+  with SharedSparkSession
+  with AdaptiveSparkPlanHelper {
+
+  def prepareForAsOfJoin(): (DataFrame, DataFrame) = {
+    val schema1 = StructType(
+      StructField("a", IntegerType, false) ::
+        StructField("b", StringType, false) ::
+        StructField("left_val", StringType, false) :: Nil)
+    val rowSeq1: List[Row] = List(Row(1, "x", "a"), Row(5, "y", "b"), Row(10, "z", "c"))
+    val df1 = spark.createDataFrame(rowSeq1.asJava, schema1)
+
+    val schema2 = StructType(
+      StructField("a", IntegerType) ::
+        StructField("b", StringType) ::
+        StructField("right_val", IntegerType) :: Nil)
+    val rowSeq2: List[Row] = List(Row(1, "v", 1), Row(2, "w", 2), Row(3, "x", 3),
+      Row(6, "y", 6), Row(7, "z", 7))
+    val df2 = spark.createDataFrame(rowSeq2.asJava, schema2)
+
+    (df1, df2)
+  }
+
+  test("as-of join - simple") {
+    val (df1, df2) = prepareForAsOfJoin()
+    checkAnswer(
+      df1.joinAsOf(
+        df2, df1.col("a"), df2.col("a"), usingColumns = Seq.empty,
+        joinType = "left", tolerance = null, allowExactMatches = true, direction = "backward"),
+      Seq(
+        Row(1, "x", "a", 1, "v", 1),
+        Row(5, "y", "b", 3, "x", 3),
+        Row(10, "z", "c", 7, "z", 7)
+      )
+    )
+  }
+
+  test("as-of join - usingColumns") {
+    val (df1, df2) = prepareForAsOfJoin()
+    checkAnswer(
+      df1.joinAsOf(df2, df1.col("a"), df2.col("a"), usingColumns = Seq("b"),
+        joinType = "left", tolerance = null, allowExactMatches = true, direction = "backward"),
+      Seq(
+        Row(1, "x", "a", null, null, null),
+        Row(5, "y", "b", null, null, null),
+        Row(10, "z", "c", 7, "z", 7)
+      )
+    )
+  }
+
+  test("as-of join - usingColumns, inner") {
+    val (df1, df2) = prepareForAsOfJoin()
+    checkAnswer(
+      df1.joinAsOf(df2, df1.col("a"), df2.col("a"), usingColumns = Seq("b"),
+        joinType = "inner", tolerance = null, allowExactMatches = true, direction = "backward"),
+      Seq(
+        Row(10, "z", "c", 7, "z", 7)
+      )
+    )
+  }
+
+  test("as-of join - tolerance = 1") {
+    val (df1, df2) = prepareForAsOfJoin()
+    checkAnswer(
+      df1.joinAsOf(df2, df1.col("a"), df2.col("a"), usingColumns = Seq.empty,
+        joinType = "left", tolerance = lit(1), allowExactMatches = true, direction = "backward"),
+      Seq(
+        Row(1, "x", "a", 1, "v", 1),
+        Row(5, "y", "b", null, null, null),
+        Row(10, "z", "c", null, null, null)
+      )
+    )
+  }
+
+  test("as-of join - allowExactMatches = false") {
+    val (df1, df2) = prepareForAsOfJoin()
+    checkAnswer(
+      df1.joinAsOf(df2, df1.col("a"), df2.col("a"), usingColumns = Seq.empty,
+        joinType = "left", tolerance = null, allowExactMatches = false, direction = "backward"),
+      Seq(
+        Row(1, "x", "a", null, null, null),

Review comment:
       As for the pandas-on-Spark example: https://github.com/apache/spark/blob/c8237f46c325a0f95d4793a809d2bcc53cb948c2/python/pyspark/pandas/namespace.py#L2863-L2872
   
   pandas handles `null`-ish values as `NaN` in numeric columns, so the example shows `NaN`, but non-pandas-on-Spark DataFrame-wise, we can leave it as `null`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a change in pull request #34053: [SPARK-36813][SQL][PYTHON] Propose an infrastructure of as-of join and imlement ps.merge_asof

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on a change in pull request #34053:
URL: https://github.com/apache/spark/pull/34053#discussion_r717305177



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
##########
@@ -249,6 +249,20 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
               s"join condition '${condition.sql}' " +
                 s"of type ${condition.dataType.catalogString} is not a boolean.")
 
+          case j @ AsOfJoin(_, _, _, Some(condition), _, _, _)
+              if condition.dataType != BooleanType =>
+            failAnalysis(
+              s"join condition '${condition.sql}' " +
+                s"of type ${condition.dataType.catalogString} is not a boolean.")
+
+          case j @ AsOfJoin(_, _, _, _, _, _, Some(toleranceAssertion)) =>
+            if (!toleranceAssertion.foldable) {
+              failAnalysis("Input argument tolerance must be a constant.")
+            }
+            if (!toleranceAssertion.eval().asInstanceOf[Boolean]) {

Review comment:
       ah that's why it's called `toleranceAssertion`, it's a predicate!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on pull request #34053: [SPARK-36813][SQL][PYTHON] Propose an infrastructure of as-of join and imlement ps.merge_asof

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on pull request #34053:
URL: https://github.com/apache/spark/pull/34053#issuecomment-929720849


   Merged to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] allisonwang-db commented on a change in pull request #34053: [SPARK-36813][SQL][PYTHON] Propose an infrastructure of as-of join and imlement ps.merge_asof

Posted by GitBox <gi...@apache.org>.
allisonwang-db commented on a change in pull request #34053:
URL: https://github.com/apache/spark/pull/34053#discussion_r715076106



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala
##########
@@ -41,14 +41,18 @@ case class ReferenceEqualPlanWrapper(plan: LogicalPlan) {
 object DeduplicateRelations extends Rule[LogicalPlan] {
   override def apply(plan: LogicalPlan): LogicalPlan = {
     renewDuplicatedRelations(mutable.HashSet.empty, plan)._1.resolveOperatorsUpWithPruning(
-      _.containsAnyPattern(JOIN, LATERAL_JOIN, INTERSECT, EXCEPT, UNION, COMMAND), ruleId) {
+      _.containsAnyPattern(JOIN, LATERAL_JOIN, AS_OF_JOIN, INTERSECT, EXCEPT, UNION, COMMAND),
+      ruleId) {
       case p: LogicalPlan if !p.childrenResolved => p
       // To resolve duplicate expression IDs for Join.
       case j @ Join(left, right, _, _, _) if !j.duplicateResolved =>
         j.copy(right = dedupRight(left, right))
       // Resolve duplicate output for LateralJoin.
       case j @ LateralJoin(left, right, _, _) if right.resolved && !j.duplicateResolved =>
         j.copy(right = right.withNewPlan(dedupRight(left, right.plan)))
+      // Resolve duplicate output for AsOfJoin.
+      case j @ AsOfJoin(left, right, _, _, _, _) if right.resolved && !j.duplicateResolved =>

Review comment:
       Here we only need `!j.duplicateResolved` (LateralJoin needs it because it's a UnaryNode)

##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
##########
@@ -2122,6 +2125,68 @@ object RewriteIntersectAll extends Rule[LogicalPlan] {
   }
 }
 
+/**
+ * Replaces logical [[AsOfJoin]] operator using a combination of Join and Aggregate operator.
+ *
+ * Input Pseudo-Query:
+ * {{{
+ *    SELECT * FROM left ASOF JOIN right ON (condition, as_of on(left.t, right.t), tolerance)
+ * }}}
+ *
+ * Rewritten Query:
+ * {{{
+ *   SELECT left.*, __right__.*
+ *   FROM (
+ *        SELECT
+ *             left.*,
+ *             (
+ *                  SELECT MIN_BY(STRUCT(right.*), left.t - right.t)
+ *                  FROM right
+ *                  WHERE condition AND left.t >= right.t AND right.t >= left.t - tolerance

Review comment:
       Should we make sure tolerance here is non-negative?

##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
##########
@@ -2122,6 +2125,68 @@ object RewriteIntersectAll extends Rule[LogicalPlan] {
   }
 }
 
+/**
+ * Replaces logical [[AsOfJoin]] operator using a combination of Join and Aggregate operator.
+ *
+ * Input Pseudo-Query:
+ * {{{
+ *    SELECT * FROM left ASOF JOIN right ON (condition, as_of on(left.t, right.t), tolerance)
+ * }}}
+ *
+ * Rewritten Query:
+ * {{{
+ *   SELECT left.*, __right__.*
+ *   FROM (
+ *        SELECT
+ *             left.*,
+ *             (
+ *                  SELECT MIN_BY(STRUCT(right.*), left.t - right.t)

Review comment:
       `SELECT MIN_BY(STRUCT(right.*), left.t - right.t) AS __nearest_right__`
   
   
   

##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/joinTypes.scala
##########
@@ -121,3 +121,24 @@ object LeftSemiOrAnti {
     case _ => None
   }
 }
+
+object AsOfJoinDirection {
+
+  def apply(direction: String): AsOfJoinDirection = {
+    direction.toLowerCase(Locale.ROOT).replace("_", "") match {

Review comment:
       Why do we need to replace `_` with ""?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ueshin commented on a change in pull request #34053: [SPARK-36813][SQL][PYTHON] Propose an infrastructure of as-of join and imlement ps.merge_asof

Posted by GitBox <gi...@apache.org>.
ueshin commented on a change in pull request #34053:
URL: https://github.com/apache/spark/pull/34053#discussion_r715072689



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
##########
@@ -2122,6 +2125,68 @@ object RewriteIntersectAll extends Rule[LogicalPlan] {
   }
 }
 
+/**
+ * Replaces logical [[AsOfJoin]] operator using a combination of Join and Aggregate operator.
+ *
+ * Input Pseudo-Query:
+ * {{{
+ *    SELECT * FROM left ASOF JOIN right ON (condition, as_of on(left.t, right.t), tolerance)
+ * }}}
+ *
+ * Rewritten Query:
+ * {{{
+ *   SELECT left.*, __right__.*
+ *   FROM (
+ *        SELECT
+ *             left.*,
+ *             (
+ *                  SELECT MIN_BY(STRUCT(right.*), left.t - right.t)
+ *                  FROM right
+ *                  WHERE condition AND left.t >= right.t AND right.t >= left.t - tolerance
+ *             ) as __right__
+ *        FROM left
+ *        )
+ * }}}
+ */
+object RewriteAsOfJoin extends Rule[LogicalPlan] {
+  def apply(plan: LogicalPlan): LogicalPlan = plan.transformWithPruning(
+    _.containsPattern(AS_OF_JOIN), ruleId) {
+    case AsOfJoin(left, right, asOfCondition, condition, orderExpression, joinType) =>
+      val conditionWithOuterReference =
+        condition.map(And(_, asOfCondition)).getOrElse(asOfCondition).transformUp {
+          case a: AttributeReference if left.outputSet.contains(a) =>
+            OuterReference(a)
+      }
+      val filtered = Filter(conditionWithOuterReference, right)
+
+      val orderExpressionWithOuterReference = orderExpression.transformUp {
+          case a: AttributeReference if left.outputSet.contains(a) =>
+            OuterReference(a)
+        }
+      val rightStruct = CreateStruct(right.output)
+      val nearestRight = MinBy(rightStruct, orderExpressionWithOuterReference)
+        .toAggregateExpression()
+      val aggExpr = Alias(nearestRight, "__nearest_right__")()
+      val aggregate = Aggregate(Seq.empty, Seq(aggExpr), filtered)
+
+      val scalarSubquery = Project(

Review comment:
       sure, updated.

##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
##########
@@ -2122,6 +2125,68 @@ object RewriteIntersectAll extends Rule[LogicalPlan] {
   }
 }
 
+/**
+ * Replaces logical [[AsOfJoin]] operator using a combination of Join and Aggregate operator.
+ *
+ * Input Pseudo-Query:
+ * {{{
+ *    SELECT * FROM left ASOF JOIN right ON (condition, as_of on(left.t, right.t), tolerance)
+ * }}}
+ *
+ * Rewritten Query:
+ * {{{
+ *   SELECT left.*, __right__.*
+ *   FROM (
+ *        SELECT
+ *             left.*,
+ *             (
+ *                  SELECT MIN_BY(STRUCT(right.*), left.t - right.t)
+ *                  FROM right
+ *                  WHERE condition AND left.t >= right.t AND right.t >= left.t - tolerance
+ *             ) as __right__
+ *        FROM left
+ *        )
+ * }}}
+ */
+object RewriteAsOfJoin extends Rule[LogicalPlan] {

Review comment:
       Sure, moved it to a new file.

##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
##########
@@ -185,6 +185,8 @@ abstract class Optimizer(catalogManager: CatalogManager)
       // PropagateEmptyRelation can change the nullability of an attribute from nullable to
       // non-nullable when an empty relation child of a Union is removed
       UpdateAttributeNullability) ::
+    Batch("Rewrite with Correlated Expressions", Once,
+      RewriteAsOfJoin) ::

Review comment:
       Sounds good. Let me try and move it there.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ueshin commented on a change in pull request #34053: [SPARK-36813][SQL][PYTHON] Propose an infrastructure of as-of join and imlement ps.merge_asof

Posted by GitBox <gi...@apache.org>.
ueshin commented on a change in pull request #34053:
URL: https://github.com/apache/spark/pull/34053#discussion_r715131589



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
##########
@@ -2122,6 +2125,68 @@ object RewriteIntersectAll extends Rule[LogicalPlan] {
   }
 }
 
+/**
+ * Replaces logical [[AsOfJoin]] operator using a combination of Join and Aggregate operator.
+ *
+ * Input Pseudo-Query:
+ * {{{
+ *    SELECT * FROM left ASOF JOIN right ON (condition, as_of on(left.t, right.t), tolerance)
+ * }}}
+ *
+ * Rewritten Query:
+ * {{{
+ *   SELECT left.*, __right__.*
+ *   FROM (
+ *        SELECT
+ *             left.*,
+ *             (
+ *                  SELECT MIN_BY(STRUCT(right.*), left.t - right.t)
+ *                  FROM right
+ *                  WHERE condition AND left.t >= right.t AND right.t >= left.t - tolerance

Review comment:
       That's a good point.
   Should we raise an exception in that case?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #34053: [SPARK-36813][SQL][PYTHON] Propose an infrastructure of as-of join and imlement ps.merge_asof

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #34053:
URL: https://github.com/apache/spark/pull/34053#issuecomment-926277456


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/143569/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a change in pull request #34053: [SPARK-36813][SQL][PYTHON] Propose an infrastructure of as-of join and imlement ps.merge_asof

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on a change in pull request #34053:
URL: https://github.com/apache/spark/pull/34053#discussion_r716509623



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
##########
@@ -1597,3 +1597,119 @@ case class LateralJoin(
     copy(left = newChild)
   }
 }
+
+/**
+ * A logical plan for as-of join.
+ */
+case class AsOfJoin(
+    left: LogicalPlan,
+    right: LogicalPlan,
+    asOfCondition: Expression,
+    condition: Option[Expression],
+    joinType: JoinType,
+    orderExpression: Expression,
+    toleranceAssertion: Option[Expression]) extends BinaryNode {
+
+  require(Seq(Inner, LeftOuter).contains(joinType),
+    s"Unsupported as-of join type $joinType")
+
+  override protected def stringArgs: Iterator[Any] = super.stringArgs.take(5)

Review comment:
       why do we hide the `orderExpression` and `toleranceAssertion`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #34053: [SPARK-36813][SQL][PYTHON] Propose an infrastructure of as-of join and imlement ps.merge_asof

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #34053:
URL: https://github.com/apache/spark/pull/34053#issuecomment-923653718


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/143468/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ueshin commented on a change in pull request #34053: [SPARK-36813][SQL][PYTHON] Propose an infrastructure of as-of join and imlement ps.merge_asof

Posted by GitBox <gi...@apache.org>.
ueshin commented on a change in pull request #34053:
URL: https://github.com/apache/spark/pull/34053#discussion_r715126559



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/joinTypes.scala
##########
@@ -121,3 +121,24 @@ object LeftSemiOrAnti {
     case _ => None
   }
 }
+
+object AsOfJoinDirection {
+
+  def apply(direction: String): AsOfJoinDirection = {
+    direction.toLowerCase(Locale.ROOT).replace("_", "") match {

Review comment:
       I just followed what `JoinType` does, but yeah, I don't think we need it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #34053: [SPARK-36813][SQL][PYTHON] Propose an infrastructure of as-of join and imlement ps.merge_asof

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #34053:
URL: https://github.com/apache/spark/pull/34053#issuecomment-923653718


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/143468/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a change in pull request #34053: [SPARK-36813][SQL][PYTHON] Propose an infrastructure of as-of join and imlement ps.merge_asof

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on a change in pull request #34053:
URL: https://github.com/apache/spark/pull/34053#discussion_r715025212



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
##########
@@ -185,6 +185,8 @@ abstract class Optimizer(catalogManager: CatalogManager)
       // PropagateEmptyRelation can change the nullability of an attribute from nullable to
       // non-nullable when an empty relation child of a Union is removed
       UpdateAttributeNullability) ::
+    Batch("Rewrite with Correlated Expressions", Once,
+      RewriteAsOfJoin) ::

Review comment:
       can we put it in the `Finish Analysis` batch?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34053: [SPARK-36813][SQL][PYTHON] Propose an infrastructure of as-of join and imlement ps.merge_asof

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34053:
URL: https://github.com/apache/spark/pull/34053#issuecomment-926269120


   Kubernetes integration test starting
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/48085/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org