You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2017/11/13 13:27:48 UTC

[2/2] flink git commit: [FLINK-7657] [table] Add all basic types to RexProgramExtractor

[FLINK-7657] [table] Add all basic types to RexProgramExtractor


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ce1cb8fd
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ce1cb8fd
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ce1cb8fd

Branch: refs/heads/release-1.4
Commit: ce1cb8fd6d667713be9b5f9ec8f1c394b9ca4644
Parents: bb04187
Author: twalthr <tw...@apache.org>
Authored: Mon Nov 13 14:09:45 2017 +0100
Committer: twalthr <tw...@apache.org>
Committed: Mon Nov 13 14:22:27 2017 +0100

----------------------------------------------------------------------
 .../flink/table/expressions/literals.scala      |  9 +++--
 .../table/plan/util/RexProgramExtractor.scala   | 33 +++++++++++++----
 .../flink/table/api/TableSourceTest.scala       | 25 ++++++-------
 .../flink/table/plan/RexProgramTestBase.scala   |  1 -
 .../runtime/batch/table/TableSourceITCase.scala |  2 +-
 .../table/utils/TestFilterableTableSource.scala | 39 ++++++++++----------
 .../flink/table/utils/testTableSources.scala    |  6 +--
 7 files changed, 63 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ce1cb8fd/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala
index d797cc4..e6905ef 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala
@@ -108,10 +108,11 @@ case class Literal(value: Any, resultType: TypeInformation[_]) extends LeafExpre
   }
 
   /**
-    * Convert a date value to a calendar.  Calcite fromCalendarField functions use the Calendar.get
-    * methods, so the raw values of the individual fields are preserved when converted to the
-    * string formats.
-    * @return Get the Calendar value
+    * Convert a Date value to a Calendar. Calcite's fromCalendarField functions use the
+    * Calendar.get methods, so the raw values of the individual fields are preserved when
+    * converted to the String formats.
+    *
+    * @return get the Calendar value
     */
   private def valueAsCalendar: Calendar = {
     val date = value.asInstanceOf[java.util.Date]

http://git-wip-us.apache.org/repos/asf/flink/blob/ce1cb8fd/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExtractor.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExtractor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExtractor.scala
index d11a43d..9c06135 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExtractor.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExtractor.scala
@@ -165,25 +165,42 @@ class RexNodeToExpressionConverter(
     val literalType = FlinkTypeFactory.toTypeInfo(literal.getType)
 
     val literalValue = literalType match {
-      // Chrono use cases.
+
       case _@SqlTimeTypeInfo.DATE =>
         val rexValue = literal.getValueAs(classOf[DateString])
         Date.valueOf(rexValue.toString)
+
       case _@SqlTimeTypeInfo.TIME =>
         val rexValue = literal.getValueAs(classOf[TimeString])
         Time.valueOf(rexValue.toString(0))
+
       case _@SqlTimeTypeInfo.TIMESTAMP =>
         val rexValue = literal.getValueAs(classOf[TimestampString])
         Timestamp.valueOf(rexValue.toString(3))
 
+      case _@BasicTypeInfo.BYTE_TYPE_INFO =>
+        // convert from BigDecimal to Byte
+        literal.getValueAs(classOf[java.lang.Byte])
+
+      case _@BasicTypeInfo.SHORT_TYPE_INFO =>
+        // convert from BigDecimal to Short
+        literal.getValueAs(classOf[java.lang.Short])
+
       case _@BasicTypeInfo.INT_TYPE_INFO =>
-        /*
-        Force integer conversion.  RelDataType is INTEGER and SqlTypeName is DECIMAL,
-        meaning that it will assume that we are using a BigDecimal
-        and refuse to convert to Integer.
-         */
-        val rexValue = literal.getValueAs(classOf[java.math.BigDecimal])
-        rexValue.intValue()
+        // convert from BigDecimal to Integer
+        literal.getValueAs(classOf[java.lang.Integer])
+
+      case _@BasicTypeInfo.LONG_TYPE_INFO =>
+        // convert from BigDecimal to Long
+        literal.getValueAs(classOf[java.lang.Long])
+
+      case _@BasicTypeInfo.FLOAT_TYPE_INFO =>
+        // convert from BigDecimal to Float
+        literal.getValueAs(classOf[java.lang.Float])
+
+      case _@BasicTypeInfo.DOUBLE_TYPE_INFO =>
+        // convert from BigDecimal to Double
+        literal.getValueAs(classOf[java.lang.Double])
 
       case _ => literal.getValue
     }

http://git-wip-us.apache.org/repos/asf/flink/blob/ce1cb8fd/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/TableSourceTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/TableSourceTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/TableSourceTest.scala
index dc84c19..42f0769 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/TableSourceTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/TableSourceTest.scala
@@ -18,19 +18,18 @@
 
 package org.apache.flink.table.api
 
+import _root_.java.sql.{Date, Time, Timestamp}
+
 import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, SqlTimeTypeInfo, TypeInformation}
+import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.table.api.scala._
-import org.apache.flink.table.expressions.{BinaryComparison, Expression, Literal}
 import org.apache.flink.table.expressions.utils._
 import org.apache.flink.table.runtime.utils.CommonTestData
 import org.apache.flink.table.sources.{CsvTableSource, TableSource}
 import org.apache.flink.table.utils.TableTestUtil._
 import org.apache.flink.table.utils.{TableTestBase, TestFilterableTableSource}
-import org.junit.{Assert, Test}
-import _root_.java.sql.{Date, Time, Timestamp}
-
-import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.types.Row
+import org.junit.{Assert, Test}
 
 class TableSourceTest extends TableTestBase {
 
@@ -225,7 +224,8 @@ class TableSourceTest extends TableTestBase {
     val result = tableEnv
         .scan(tableName)
         .select('price, 'id, 'amount)
-        .where("amount > 2 && (amount < 32 || amount.cast(LONG) > 10)") // cast can not be converted
+        .where("amount > 2 && id < 1.2 && " +
+          "(amount < 32 || amount.cast(LONG) > 10)") // cast can not be converted
 
     val expected = unaryNode(
       "DataSetCalc",
@@ -234,7 +234,7 @@ class TableSourceTest extends TableTestBase {
         Array("price", "id", "amount"),
         "'amount > 2"),
       term("select", "price", "id", "amount"),
-      term("where", "OR(<(amount, 32), >(CAST(amount), 10))")
+      term("where", "AND(<(id, 1.2E0), OR(<(amount, 32), >(CAST(amount), 10)))")
     )
     util.verifyTable(result, expected)
   }
@@ -403,13 +403,10 @@ class TableSourceTest extends TableTestBase {
         "'tv > 14:25:02.toTime && " +
         "'dv > 2017-02-03.toDate && " +
         "'tsv > 2017-02-03 14:25:02.0.toTimestamp"
-    val expected = unaryNode(
-      "DataSetCalc",
-      batchFilterableSourceTableNode(
-        tableName,
-        Array("id", "dv", "tv", "tsv"),
-        expectedFilter),
-      term("select", "id")
+    val expected = batchFilterableSourceTableNode(
+      tableName,
+      Array("id"),
+      expectedFilter
     )
     util.verifyTable(result, expected)
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/ce1cb8fd/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/RexProgramTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/RexProgramTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/RexProgramTestBase.scala
index 728694f..05870ca 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/RexProgramTestBase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/RexProgramTestBase.scala
@@ -80,5 +80,4 @@ abstract class RexProgramTestBase {
   protected def makeTypes(fieldTypes: SqlTypeName*): java.util.List[RelDataType] = {
     fieldTypes.toList.map(typeFactory.createSqlType).asJava
   }
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ce1cb8fd/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableSourceITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableSourceITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableSourceITCase.scala
index f0fe896..eb73f1b 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableSourceITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableSourceITCase.scala
@@ -528,7 +528,7 @@ class TableSourceITCase(
       new TestProjectableTableSource(tableSchema, returnType, data, "rtime", "ptime"))
 
     val results = tEnv.scan("T")
-      .select('ptime > 0)
+      .select('ptime.cast(Types.LONG) > 0)
       .select(1.count)
       .collect()
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ce1cb8fd/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TestFilterableTableSource.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TestFilterableTableSource.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TestFilterableTableSource.scala
index ae2b1d6..da8220d 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TestFilterableTableSource.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TestFilterableTableSource.scala
@@ -35,6 +35,7 @@ import scala.collection.JavaConverters._
 import scala.collection.mutable
 
 object TestFilterableTableSource {
+
   /**
     * @return The default filterable table source.
     */
@@ -44,15 +45,17 @@ object TestFilterableTableSource {
 
   /**
     * A filterable data source with custom data.
-    * @param rowTypeInfo The type of the data.
-    *                    Its expected that both types and field names are provided
+    * @param rowTypeInfo The type of the data. Its expected that both types and field
+    *                    names are provided.
     * @param rows The data as a sequence of rows.
     * @param filterableFields The fields that are allowed to be filtered on.
     * @return The table source.
     */
-  def apply(rowTypeInfo: RowTypeInfo,
-            rows: Seq[Row],
-            filterableFields: Set[String]): TestFilterableTableSource = {
+  def apply(
+      rowTypeInfo: RowTypeInfo,
+      rows: Seq[Row],
+      filterableFields: Set[String])
+    : TestFilterableTableSource = {
     new TestFilterableTableSource(rowTypeInfo, rows, filterableFields)
   }
 
@@ -64,24 +67,20 @@ object TestFilterableTableSource {
     new RowTypeInfo(fieldTypes, fieldNames)
   }
 
-
   private lazy val defaultRows: Seq[Row] = {
     for {
       cnt <- 0 until 33
     } yield {
       Row.of(
         s"Record_$cnt",
-        cnt.toLong.asInstanceOf[Object],
-        cnt.toInt.asInstanceOf[Object],
-        cnt.toDouble.asInstanceOf[Object])
+        cnt.toLong.asInstanceOf[AnyRef],
+        cnt.toInt.asInstanceOf[AnyRef],
+        cnt.toDouble.asInstanceOf[AnyRef])
     }
   }
 }
 
-
 /**
-  *
-  *
   * A data source that implements some very basic filtering in-memory in order to test
   * expression push-down logic.
   *
@@ -91,11 +90,12 @@ object TestFilterableTableSource {
   * @param filterPredicates The predicates that should be used to filter.
   * @param filterPushedDown Whether predicates have been pushed down yet.
   */
-class TestFilterableTableSource(rowTypeInfo: RowTypeInfo,
-                                data: Seq[Row],
-                                filterableFields: Set[String] = Set(),
-                                filterPredicates: Seq[Expression] = Seq(),
-                                val filterPushedDown: Boolean = false)
+class TestFilterableTableSource(
+    rowTypeInfo: RowTypeInfo,
+    data: Seq[Row],
+    filterableFields: Set[String] = Set(),
+    filterPredicates: Seq[Expression] = Seq(),
+    val filterPushedDown: Boolean = false)
   extends BatchTableSource[Row]
     with StreamTableSource[Row]
     with FilterableTableSource[Row] {
@@ -195,8 +195,9 @@ class TestFilterableTableSource(rowTypeInfo: RowTypeInfo,
     }
   }
 
-  private def extractValues(expr: BinaryComparison,
-                            row: Row): (Comparable[Any], Comparable[Any]) = {
+  private def extractValues(expr: BinaryComparison, row: Row)
+    : (Comparable[Any], Comparable[Any]) = {
+
     (expr.left, expr.right) match {
       case (l: ResolvedFieldReference, r: Literal) =>
         val idx = rowTypeInfo.getFieldIndex(l.name)

http://git-wip-us.apache.org/repos/asf/flink/blob/ce1cb8fd/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/testTableSources.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/testTableSources.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/testTableSources.scala
index c2eba32..a546919 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/testTableSources.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/testTableSources.scala
@@ -22,21 +22,17 @@ import java.util
 import java.util.{Collections, List => JList}
 
 import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
 import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
-import org.apache.flink.table.api.Types.{DOUBLE, INT, LONG, STRING}
 import org.apache.flink.table.api.TableSchema
-import org.apache.flink.table.expressions._
 import org.apache.flink.table.sources._
 import org.apache.flink.table.sources.tsextractors.ExistingField
 import org.apache.flink.table.sources.wmstrategies.AscendingTimestamps
 import org.apache.flink.types.Row
-import org.apache.flink.util.Preconditions
 
 import scala.collection.JavaConverters._
-import scala.collection.mutable
 
 class TestTableSourceWithTime[T](
     tableSchema: TableSchema,