You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2018/02/03 08:02:07 UTC

spark git commit: [SPARK-23311][SQL][TEST] add FilterFunction test case for test CombineTypedFilters

Repository: spark
Updated Branches:
  refs/heads/master fe73cb4b4 -> 63b49fa2e


[SPARK-23311][SQL][TEST] add FilterFunction test case for test CombineTypedFilters

## What changes were proposed in this pull request?

In the current test case for CombineTypedFilters, we lack the test of FilterFunction, so let's add it.
In addition, in TypedFilterOptimizationSuite's existing test cases, Let's extract a common LocalRelation.

## How was this patch tested?

add new test cases.

Author: caoxuewen <ca...@zte.com.cn>

Closes #20482 from heary-cao/TypedFilterOptimizationSuite.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/63b49fa2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/63b49fa2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/63b49fa2

Branch: refs/heads/master
Commit: 63b49fa2e599080c2ba7d5189f9dde20a2e01fb4
Parents: fe73cb4
Author: caoxuewen <ca...@zte.com.cn>
Authored: Sat Feb 3 00:02:03 2018 -0800
Committer: gatorsmile <ga...@gmail.com>
Committed: Sat Feb 3 00:02:03 2018 -0800

----------------------------------------------------------------------
 .../apache/spark/sql/catalyst/dsl/package.scala |  3 +
 .../TypedFilterOptimizationSuite.scala          | 95 +++++++++++++++++---
 2 files changed, 84 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/63b49fa2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
index 59cb26d..efb2eba 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
@@ -21,6 +21,7 @@ import java.sql.{Date, Timestamp}
 
 import scala.language.implicitConversions
 
+import org.apache.spark.api.java.function.FilterFunction
 import org.apache.spark.sql.Encoder
 import org.apache.spark.sql.catalyst.analysis._
 import org.apache.spark.sql.catalyst.expressions._
@@ -301,6 +302,8 @@ package object dsl {
 
       def filter[T : Encoder](func: T => Boolean): LogicalPlan = TypedFilter(func, logicalPlan)
 
+      def filter[T : Encoder](func: FilterFunction[T]): LogicalPlan = TypedFilter(func, logicalPlan)
+
       def serialize[T : Encoder]: LogicalPlan = CatalystSerde.serialize[T](logicalPlan)
 
       def deserialize[T : Encoder]: LogicalPlan = CatalystSerde.deserialize[T](logicalPlan)

http://git-wip-us.apache.org/repos/asf/spark/blob/63b49fa2/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/TypedFilterOptimizationSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/TypedFilterOptimizationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/TypedFilterOptimizationSuite.scala
index 56f096f..5fc99a3 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/TypedFilterOptimizationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/TypedFilterOptimizationSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.optimizer
 
 import scala.reflect.runtime.universe.TypeTag
 
+import org.apache.spark.api.java.function.FilterFunction
 import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.dsl.plans._
 import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
@@ -38,18 +39,19 @@ class TypedFilterOptimizationSuite extends PlanTest {
 
   implicit private def productEncoder[T <: Product : TypeTag] = ExpressionEncoder[T]()
 
+  val testRelation = LocalRelation('_1.int, '_2.int)
+
   test("filter after serialize with the same object type") {
-    val input = LocalRelation('_1.int, '_2.int)
     val f = (i: (Int, Int)) => i._1 > 0
 
-    val query = input
+    val query = testRelation
       .deserialize[(Int, Int)]
       .serialize[(Int, Int)]
       .filter(f).analyze
 
     val optimized = Optimize.execute(query)
 
-    val expected = input
+    val expected = testRelation
       .deserialize[(Int, Int)]
       .where(callFunction(f, BooleanType, 'obj))
       .serialize[(Int, Int)].analyze
@@ -58,10 +60,9 @@ class TypedFilterOptimizationSuite extends PlanTest {
   }
 
   test("filter after serialize with different object types") {
-    val input = LocalRelation('_1.int, '_2.int)
     val f = (i: OtherTuple) => i._1 > 0
 
-    val query = input
+    val query = testRelation
       .deserialize[(Int, Int)]
       .serialize[(Int, Int)]
       .filter(f).analyze
@@ -70,17 +71,16 @@ class TypedFilterOptimizationSuite extends PlanTest {
   }
 
   test("filter before deserialize with the same object type") {
-    val input = LocalRelation('_1.int, '_2.int)
     val f = (i: (Int, Int)) => i._1 > 0
 
-    val query = input
+    val query = testRelation
       .filter(f)
       .deserialize[(Int, Int)]
       .serialize[(Int, Int)].analyze
 
     val optimized = Optimize.execute(query)
 
-    val expected = input
+    val expected = testRelation
       .deserialize[(Int, Int)]
       .where(callFunction(f, BooleanType, 'obj))
       .serialize[(Int, Int)].analyze
@@ -89,10 +89,9 @@ class TypedFilterOptimizationSuite extends PlanTest {
   }
 
   test("filter before deserialize with different object types") {
-    val input = LocalRelation('_1.int, '_2.int)
     val f = (i: OtherTuple) => i._1 > 0
 
-    val query = input
+    val query = testRelation
       .filter(f)
       .deserialize[(Int, Int)]
       .serialize[(Int, Int)].analyze
@@ -101,21 +100,89 @@ class TypedFilterOptimizationSuite extends PlanTest {
   }
 
   test("back to back filter with the same object type") {
-    val input = LocalRelation('_1.int, '_2.int)
     val f1 = (i: (Int, Int)) => i._1 > 0
     val f2 = (i: (Int, Int)) => i._2 > 0
 
-    val query = input.filter(f1).filter(f2).analyze
+    val query = testRelation.filter(f1).filter(f2).analyze
     val optimized = Optimize.execute(query)
     assert(optimized.collect { case t: TypedFilter => t }.length == 1)
   }
 
   test("back to back filter with different object types") {
-    val input = LocalRelation('_1.int, '_2.int)
     val f1 = (i: (Int, Int)) => i._1 > 0
     val f2 = (i: OtherTuple) => i._2 > 0
 
-    val query = input.filter(f1).filter(f2).analyze
+    val query = testRelation.filter(f1).filter(f2).analyze
+    val optimized = Optimize.execute(query)
+    assert(optimized.collect { case t: TypedFilter => t }.length == 2)
+  }
+
+  test("back to back FilterFunction with the same object type") {
+    val f1 = new FilterFunction[(Int, Int)] {
+      override def call(value: (Int, Int)): Boolean = value._1 > 0
+    }
+    val f2 = new FilterFunction[(Int, Int)] {
+      override def call(value: (Int, Int)): Boolean = value._2 > 0
+    }
+
+    val query = testRelation.filter(f1).filter(f2).analyze
+    val optimized = Optimize.execute(query)
+    assert(optimized.collect { case t: TypedFilter => t }.length == 1)
+  }
+
+  test("back to back FilterFunction with different object types") {
+    val f1 = new FilterFunction[(Int, Int)] {
+      override def call(value: (Int, Int)): Boolean = value._1 > 0
+    }
+    val f2 = new FilterFunction[OtherTuple] {
+      override def call(value: OtherTuple): Boolean = value._2 > 0
+    }
+
+    val query = testRelation.filter(f1).filter(f2).analyze
+    val optimized = Optimize.execute(query)
+    assert(optimized.collect { case t: TypedFilter => t }.length == 2)
+  }
+
+  test("FilterFunction and filter with the same object type") {
+    val f1 = new FilterFunction[(Int, Int)] {
+      override def call(value: (Int, Int)): Boolean = value._1 > 0
+    }
+    val f2 = (i: (Int, Int)) => i._2 > 0
+
+    val query = testRelation.filter(f1).filter(f2).analyze
+    val optimized = Optimize.execute(query)
+    assert(optimized.collect { case t: TypedFilter => t }.length == 1)
+  }
+
+  test("FilterFunction and filter with different object types") {
+    val f1 = new FilterFunction[(Int, Int)] {
+      override def call(value: (Int, Int)): Boolean = value._1 > 0
+    }
+    val f2 = (i: OtherTuple) => i._2 > 0
+
+    val query = testRelation.filter(f1).filter(f2).analyze
+    val optimized = Optimize.execute(query)
+    assert(optimized.collect { case t: TypedFilter => t }.length == 2)
+  }
+
+  test("filter and FilterFunction with the same object type") {
+    val f2 = (i: (Int, Int)) => i._1 > 0
+    val f1 = new FilterFunction[(Int, Int)] {
+      override def call(value: (Int, Int)): Boolean = value._2 > 0
+    }
+
+    val query = testRelation.filter(f1).filter(f2).analyze
+    val optimized = Optimize.execute(query)
+    assert(optimized.collect { case t: TypedFilter => t }.length == 1)
+  }
+
+  test("filter and FilterFunction with different object types") {
+    val f2 = (i: (Int, Int)) => i._1 > 0
+    val f1 = new FilterFunction[OtherTuple] {
+      override def call(value: OtherTuple): Boolean = value._2 > 0
+    }
+
+    val query = testRelation.filter(f1).filter(f2).analyze
     val optimized = Optimize.execute(query)
     assert(optimized.collect { case t: TypedFilter => t }.length == 2)
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org