You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/09/05 12:22:44 UTC

[5/5] flink git commit: [FLINK-7227] [table] Fix push-down of disjunctive predicates with more than two terms.

[FLINK-7227] [table] Fix push-down of disjunctive predicates with more than two terms.

This closes #4608.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7c11bd7f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7c11bd7f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7c11bd7f

Branch: refs/heads/master
Commit: 7c11bd7f4127bc550a8fd17fc0fd843b929e3cb2
Parents: 4cf737c
Author: Usman Younas <us...@Usmans-MBP.fritz.box>
Authored: Mon Aug 28 13:44:02 2017 +0000
Committer: Fabian Hueske <fh...@apache.org>
Committed: Tue Sep 5 14:21:49 2017 +0200

----------------------------------------------------------------------
 .../table/plan/util/RexProgramExtractor.scala   |  7 +-
 .../table/plan/RexProgramExtractorTest.scala    | 73 +++++++++++++++++---
 .../table/utils/TestFilterableTableSource.scala |  1 +
 3 files changed, 72 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7c11bd7f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExtractor.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExtractor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExtractor.scala
index bf9a688..53bf8e7 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExtractor.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExtractor.scala
@@ -20,10 +20,11 @@ package org.apache.flink.table.plan.util
 
 import org.apache.calcite.plan.RelOptUtil
 import org.apache.calcite.rex._
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
 import org.apache.calcite.sql.{SqlFunction, SqlPostfixOperator}
 import org.apache.flink.table.api.TableException
 import org.apache.flink.table.calcite.FlinkTypeFactory
-import org.apache.flink.table.expressions.{Expression, Literal, ResolvedFieldReference}
+import org.apache.flink.table.expressions.{And, Expression, Literal, Or, ResolvedFieldReference}
 import org.apache.flink.table.validate.FunctionCatalog
 import org.apache.flink.util.Preconditions
 
@@ -170,6 +171,10 @@ class RexNodeToExpressionConverter(
       None
     } else {
         call.getOperator match {
+          case SqlStdOperatorTable.OR =>
+            Option(operands.reduceLeft(Or))
+          case SqlStdOperatorTable.AND =>
+            Option(operands.reduceLeft(And))
           case function: SqlFunction =>
             lookupFunction(replace(function.getName), operands)
           case postfix: SqlPostfixOperator =>

http://git-wip-us.apache.org/repos/asf/flink/blob/7c11bd7f/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/RexProgramExtractorTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/RexProgramExtractorTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/RexProgramExtractorTest.scala
index 840be17..c2a01c6 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/RexProgramExtractorTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/RexProgramExtractorTest.scala
@@ -20,12 +20,13 @@ package org.apache.flink.table.plan
 
 import java.math.BigDecimal
 
-import org.apache.calcite.rex.{RexBuilder, RexProgram, RexProgramBuilder}
+import org.apache.calcite.plan.RelOptUtil
+import org.apache.calcite.rex._
 import org.apache.calcite.sql.SqlPostfixOperator
 import org.apache.calcite.sql.`type`.SqlTypeName.{BIGINT, INTEGER, VARCHAR}
 import org.apache.calcite.sql.fun.SqlStdOperatorTable
 import org.apache.flink.table.expressions._
-import org.apache.flink.table.plan.util.RexProgramExtractor
+import org.apache.flink.table.plan.util.{RexNodeToExpressionConverter, RexProgramExtractor}
 import org.apache.flink.table.utils.InputTypeBuilder.inputOf
 import org.apache.flink.table.validate.FunctionCatalog
 import org.hamcrest.CoreMatchers.is
@@ -33,6 +34,7 @@ import org.junit.Assert.{assertArrayEquals, assertEquals, assertThat}
 import org.junit.Test
 
 import scala.collection.JavaConverters._
+import scala.collection.mutable
 
 class RexProgramExtractorTest extends RexProgramTestBase {
 
@@ -104,6 +106,8 @@ class RexProgramExtractorTest extends RexProgramTestBase {
     val t2 = rexBuilder.makeInputRef(allFieldTypes.get(3), 3)
     // 100
     val t3 = rexBuilder.makeExactLiteral(BigDecimal.valueOf(100L))
+    // 200
+    val t4 = rexBuilder.makeExactLiteral(BigDecimal.valueOf(200L))
 
     // a = amount < 100
     val a = builder.addExpr(rexBuilder.makeCall(SqlStdOperatorTable.LESS_THAN, t0, t3))
@@ -113,15 +117,17 @@ class RexProgramExtractorTest extends RexProgramTestBase {
     val c = builder.addExpr(rexBuilder.makeCall(SqlStdOperatorTable.EQUALS, t2, t3))
     // d = amount <= id
     val d = builder.addExpr(rexBuilder.makeCall(SqlStdOperatorTable.LESS_THAN_OR_EQUAL, t0, t1))
+    // e = price == 200
+    val e = builder.addExpr(rexBuilder.makeCall(SqlStdOperatorTable.EQUALS, t2, t4))
 
     // a AND b
     val and = builder.addExpr(rexBuilder.makeCall(SqlStdOperatorTable.AND, List(a, b).asJava))
-    // (a AND b) or c
-    val or = builder.addExpr(rexBuilder.makeCall(SqlStdOperatorTable.OR, List(and, c).asJava))
-    // not d
+    // (a AND b) OR c OR e
+    val or = builder.addExpr(rexBuilder.makeCall(SqlStdOperatorTable.OR, List(and, c, e).asJava))
+    // NOT d
     val not = builder.addExpr(rexBuilder.makeCall(SqlStdOperatorTable.NOT, List(d).asJava))
 
-    // (a AND b) OR c) AND (NOT d)
+    // (a AND b) OR c OR e) AND (NOT d)
     builder.addCondition(builder.addExpr(
       rexBuilder.makeCall(SqlStdOperatorTable.AND, List(or, not).asJava)))
 
@@ -134,14 +140,65 @@ class RexProgramExtractorTest extends RexProgramTestBase {
         functionCatalog)
 
     val expected: Array[Expression] = Array(
-      ExpressionParser.parseExpression("amount < 100 || price == 100"),
-      ExpressionParser.parseExpression("id > 100 || price == 100"),
+      ExpressionParser.parseExpression("amount < 100 || price == 100 || price === 200"),
+      ExpressionParser.parseExpression("id > 100 || price == 100 || price === 200"),
       ExpressionParser.parseExpression("!(amount <= id)"))
     assertExpressionArrayEquals(expected, convertedExpressions)
     assertEquals(0, unconvertedRexNodes.length)
   }
 
   @Test
+  def testExtractANDExpressions(): Unit = {
+    val inputRowType = typeFactory.createStructType(allFieldTypes, allFieldNames)
+    val builder = new RexProgramBuilder(inputRowType, rexBuilder)
+
+    // amount
+    val t0 = rexBuilder.makeInputRef(allFieldTypes.get(2), 2)
+    // id
+    val t1 = rexBuilder.makeInputRef(allFieldTypes.get(1), 1)
+    // price
+    val t2 = rexBuilder.makeInputRef(allFieldTypes.get(3), 3)
+    // 100
+    val t3 = rexBuilder.makeExactLiteral(BigDecimal.valueOf(100L))
+
+    // a = amount < 100
+    val a = builder.addExpr(rexBuilder.makeCall(SqlStdOperatorTable.LESS_THAN, t0, t3))
+    // b = id > 100
+    val b = builder.addExpr(rexBuilder.makeCall(SqlStdOperatorTable.GREATER_THAN, t1, t3))
+    // c = price == 100
+    val c = builder.addExpr(rexBuilder.makeCall(SqlStdOperatorTable.EQUALS, t2, t3))
+    // d = amount <= id
+    val d = builder.addExpr(rexBuilder.makeCall(SqlStdOperatorTable.LESS_THAN_OR_EQUAL, t0, t1))
+
+    // a AND b AND c AND d
+    val and = builder.addExpr(rexBuilder.makeCall(SqlStdOperatorTable.AND, List(a, b, c, d).asJava))
+
+    builder.addCondition(builder.addExpr(and))
+
+    val program = builder.getProgram
+    val relBuilder: RexBuilder = new RexBuilder(typeFactory)
+
+    val expanded = program.expandLocalRef(program.getCondition)
+
+    var convertedExpressions = new mutable.ArrayBuffer[Expression]
+    val unconvertedRexNodes = new mutable.ArrayBuffer[RexNode]
+    val inputNames = program.getInputRowType.getFieldNames.asScala.toArray
+    val converter = new RexNodeToExpressionConverter(inputNames, functionCatalog)
+
+    expanded.accept(converter) match {
+      case Some(expression) =>
+        convertedExpressions += expression
+      case None => unconvertedRexNodes += expanded
+    }
+
+    val expected: Array[Expression] = Array(
+      ExpressionParser.parseExpression("amount < 100 && id > 100 && price === 100 && amount <= id"))
+
+    assertExpressionArrayEquals(expected, convertedExpressions.toArray)
+    assertEquals(0, unconvertedRexNodes.length)
+  }
+
+  @Test
   def testExtractArithmeticConditions(): Unit = {
     val inputRowType = typeFactory.createStructType(allFieldTypes, allFieldNames)
     val builder = new RexProgramBuilder(inputRowType, rexBuilder)

http://git-wip-us.apache.org/repos/asf/flink/blob/7c11bd7f/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TestFilterableTableSource.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TestFilterableTableSource.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TestFilterableTableSource.scala
index dcf2acd..fb99864 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TestFilterableTableSource.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TestFilterableTableSource.scala
@@ -89,6 +89,7 @@ class TestFilterableTableSource(
               iterator.remove()
             case (_, _) =>
           }
+        case _ =>
       }
     }