You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2017/11/13 13:27:48 UTC
[2/2] flink git commit: [FLINK-7657] [table] Add all basic types to
RexProgramExtractor
[FLINK-7657] [table] Add all basic types to RexProgramExtractor
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ce1cb8fd
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ce1cb8fd
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ce1cb8fd
Branch: refs/heads/release-1.4
Commit: ce1cb8fd6d667713be9b5f9ec8f1c394b9ca4644
Parents: bb04187
Author: twalthr <tw...@apache.org>
Authored: Mon Nov 13 14:09:45 2017 +0100
Committer: twalthr <tw...@apache.org>
Committed: Mon Nov 13 14:22:27 2017 +0100
----------------------------------------------------------------------
.../flink/table/expressions/literals.scala | 9 +++--
.../table/plan/util/RexProgramExtractor.scala | 33 +++++++++++++----
.../flink/table/api/TableSourceTest.scala | 25 ++++++-------
.../flink/table/plan/RexProgramTestBase.scala | 1 -
.../runtime/batch/table/TableSourceITCase.scala | 2 +-
.../table/utils/TestFilterableTableSource.scala | 39 ++++++++++----------
.../flink/table/utils/testTableSources.scala | 6 +--
7 files changed, 63 insertions(+), 52 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/ce1cb8fd/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala
index d797cc4..e6905ef 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala
@@ -108,10 +108,11 @@ case class Literal(value: Any, resultType: TypeInformation[_]) extends LeafExpre
}
/**
- * Convert a date value to a calendar. Calcite fromCalendarField functions use the Calendar.get
- * methods, so the raw values of the individual fields are preserved when converted to the
- * string formats.
- * @return Get the Calendar value
+ * Convert a Date value to a Calendar. Calcite's fromCalendarField functions use the
+ * Calendar.get methods, so the raw values of the individual fields are preserved when
+ * converted to the String formats.
+ *
+ * @return get the Calendar value
*/
private def valueAsCalendar: Calendar = {
val date = value.asInstanceOf[java.util.Date]
http://git-wip-us.apache.org/repos/asf/flink/blob/ce1cb8fd/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExtractor.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExtractor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExtractor.scala
index d11a43d..9c06135 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExtractor.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExtractor.scala
@@ -165,25 +165,42 @@ class RexNodeToExpressionConverter(
val literalType = FlinkTypeFactory.toTypeInfo(literal.getType)
val literalValue = literalType match {
- // Chrono use cases.
+
case _@SqlTimeTypeInfo.DATE =>
val rexValue = literal.getValueAs(classOf[DateString])
Date.valueOf(rexValue.toString)
+
case _@SqlTimeTypeInfo.TIME =>
val rexValue = literal.getValueAs(classOf[TimeString])
Time.valueOf(rexValue.toString(0))
+
case _@SqlTimeTypeInfo.TIMESTAMP =>
val rexValue = literal.getValueAs(classOf[TimestampString])
Timestamp.valueOf(rexValue.toString(3))
+ case _@BasicTypeInfo.BYTE_TYPE_INFO =>
+ // convert from BigDecimal to Byte
+ literal.getValueAs(classOf[java.lang.Byte])
+
+ case _@BasicTypeInfo.SHORT_TYPE_INFO =>
+ // convert from BigDecimal to Short
+ literal.getValueAs(classOf[java.lang.Short])
+
case _@BasicTypeInfo.INT_TYPE_INFO =>
- /*
- Force integer conversion. RelDataType is INTEGER and SqlTypeName is DECIMAL,
- meaning that it will assume that we are using a BigDecimal
- and refuse to convert to Integer.
- */
- val rexValue = literal.getValueAs(classOf[java.math.BigDecimal])
- rexValue.intValue()
+ // convert from BigDecimal to Integer
+ literal.getValueAs(classOf[java.lang.Integer])
+
+ case _@BasicTypeInfo.LONG_TYPE_INFO =>
+ // convert from BigDecimal to Long
+ literal.getValueAs(classOf[java.lang.Long])
+
+ case _@BasicTypeInfo.FLOAT_TYPE_INFO =>
+ // convert from BigDecimal to Float
+ literal.getValueAs(classOf[java.lang.Float])
+
+ case _@BasicTypeInfo.DOUBLE_TYPE_INFO =>
+ // convert from BigDecimal to Double
+ literal.getValueAs(classOf[java.lang.Double])
case _ => literal.getValue
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ce1cb8fd/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/TableSourceTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/TableSourceTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/TableSourceTest.scala
index dc84c19..42f0769 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/TableSourceTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/TableSourceTest.scala
@@ -18,19 +18,18 @@
package org.apache.flink.table.api
+import _root_.java.sql.{Date, Time, Timestamp}
+
import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, SqlTimeTypeInfo, TypeInformation}
+import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.table.api.scala._
-import org.apache.flink.table.expressions.{BinaryComparison, Expression, Literal}
import org.apache.flink.table.expressions.utils._
import org.apache.flink.table.runtime.utils.CommonTestData
import org.apache.flink.table.sources.{CsvTableSource, TableSource}
import org.apache.flink.table.utils.TableTestUtil._
import org.apache.flink.table.utils.{TableTestBase, TestFilterableTableSource}
-import org.junit.{Assert, Test}
-import _root_.java.sql.{Date, Time, Timestamp}
-
-import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.types.Row
+import org.junit.{Assert, Test}
class TableSourceTest extends TableTestBase {
@@ -225,7 +224,8 @@ class TableSourceTest extends TableTestBase {
val result = tableEnv
.scan(tableName)
.select('price, 'id, 'amount)
- .where("amount > 2 && (amount < 32 || amount.cast(LONG) > 10)") // cast can not be converted
+ .where("amount > 2 && id < 1.2 && " +
+ "(amount < 32 || amount.cast(LONG) > 10)") // cast can not be converted
val expected = unaryNode(
"DataSetCalc",
@@ -234,7 +234,7 @@ class TableSourceTest extends TableTestBase {
Array("price", "id", "amount"),
"'amount > 2"),
term("select", "price", "id", "amount"),
- term("where", "OR(<(amount, 32), >(CAST(amount), 10))")
+ term("where", "AND(<(id, 1.2E0), OR(<(amount, 32), >(CAST(amount), 10)))")
)
util.verifyTable(result, expected)
}
@@ -403,13 +403,10 @@ class TableSourceTest extends TableTestBase {
"'tv > 14:25:02.toTime && " +
"'dv > 2017-02-03.toDate && " +
"'tsv > 2017-02-03 14:25:02.0.toTimestamp"
- val expected = unaryNode(
- "DataSetCalc",
- batchFilterableSourceTableNode(
- tableName,
- Array("id", "dv", "tv", "tsv"),
- expectedFilter),
- term("select", "id")
+ val expected = batchFilterableSourceTableNode(
+ tableName,
+ Array("id"),
+ expectedFilter
)
util.verifyTable(result, expected)
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ce1cb8fd/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/RexProgramTestBase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/RexProgramTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/RexProgramTestBase.scala
index 728694f..05870ca 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/RexProgramTestBase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/RexProgramTestBase.scala
@@ -80,5 +80,4 @@ abstract class RexProgramTestBase {
protected def makeTypes(fieldTypes: SqlTypeName*): java.util.List[RelDataType] = {
fieldTypes.toList.map(typeFactory.createSqlType).asJava
}
-
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ce1cb8fd/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableSourceITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableSourceITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableSourceITCase.scala
index f0fe896..eb73f1b 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableSourceITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/TableSourceITCase.scala
@@ -528,7 +528,7 @@ class TableSourceITCase(
new TestProjectableTableSource(tableSchema, returnType, data, "rtime", "ptime"))
val results = tEnv.scan("T")
- .select('ptime > 0)
+ .select('ptime.cast(Types.LONG) > 0)
.select(1.count)
.collect()
http://git-wip-us.apache.org/repos/asf/flink/blob/ce1cb8fd/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TestFilterableTableSource.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TestFilterableTableSource.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TestFilterableTableSource.scala
index ae2b1d6..da8220d 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TestFilterableTableSource.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/TestFilterableTableSource.scala
@@ -35,6 +35,7 @@ import scala.collection.JavaConverters._
import scala.collection.mutable
object TestFilterableTableSource {
+
/**
* @return The default filterable table source.
*/
@@ -44,15 +45,17 @@ object TestFilterableTableSource {
/**
* A filterable data source with custom data.
- * @param rowTypeInfo The type of the data.
- * Its expected that both types and field names are provided
+ * @param rowTypeInfo The type of the data. Its expected that both types and field
+ * names are provided.
* @param rows The data as a sequence of rows.
* @param filterableFields The fields that are allowed to be filtered on.
* @return The table source.
*/
- def apply(rowTypeInfo: RowTypeInfo,
- rows: Seq[Row],
- filterableFields: Set[String]): TestFilterableTableSource = {
+ def apply(
+ rowTypeInfo: RowTypeInfo,
+ rows: Seq[Row],
+ filterableFields: Set[String])
+ : TestFilterableTableSource = {
new TestFilterableTableSource(rowTypeInfo, rows, filterableFields)
}
@@ -64,24 +67,20 @@ object TestFilterableTableSource {
new RowTypeInfo(fieldTypes, fieldNames)
}
-
private lazy val defaultRows: Seq[Row] = {
for {
cnt <- 0 until 33
} yield {
Row.of(
s"Record_$cnt",
- cnt.toLong.asInstanceOf[Object],
- cnt.toInt.asInstanceOf[Object],
- cnt.toDouble.asInstanceOf[Object])
+ cnt.toLong.asInstanceOf[AnyRef],
+ cnt.toInt.asInstanceOf[AnyRef],
+ cnt.toDouble.asInstanceOf[AnyRef])
}
}
}
-
/**
- *
- *
* A data source that implements some very basic filtering in-memory in order to test
* expression push-down logic.
*
@@ -91,11 +90,12 @@ object TestFilterableTableSource {
* @param filterPredicates The predicates that should be used to filter.
* @param filterPushedDown Whether predicates have been pushed down yet.
*/
-class TestFilterableTableSource(rowTypeInfo: RowTypeInfo,
- data: Seq[Row],
- filterableFields: Set[String] = Set(),
- filterPredicates: Seq[Expression] = Seq(),
- val filterPushedDown: Boolean = false)
+class TestFilterableTableSource(
+ rowTypeInfo: RowTypeInfo,
+ data: Seq[Row],
+ filterableFields: Set[String] = Set(),
+ filterPredicates: Seq[Expression] = Seq(),
+ val filterPushedDown: Boolean = false)
extends BatchTableSource[Row]
with StreamTableSource[Row]
with FilterableTableSource[Row] {
@@ -195,8 +195,9 @@ class TestFilterableTableSource(rowTypeInfo: RowTypeInfo,
}
}
- private def extractValues(expr: BinaryComparison,
- row: Row): (Comparable[Any], Comparable[Any]) = {
+ private def extractValues(expr: BinaryComparison, row: Row)
+ : (Comparable[Any], Comparable[Any]) = {
+
(expr.left, expr.right) match {
case (l: ResolvedFieldReference, r: Literal) =>
val idx = rowTypeInfo.getFieldIndex(l.name)
http://git-wip-us.apache.org/repos/asf/flink/blob/ce1cb8fd/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/testTableSources.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/testTableSources.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/testTableSources.scala
index c2eba32..a546919 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/testTableSources.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/testTableSources.scala
@@ -22,21 +22,17 @@ import java.util
import java.util.{Collections, List => JList}
import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
-import org.apache.flink.table.api.Types.{DOUBLE, INT, LONG, STRING}
import org.apache.flink.table.api.TableSchema
-import org.apache.flink.table.expressions._
import org.apache.flink.table.sources._
import org.apache.flink.table.sources.tsextractors.ExistingField
import org.apache.flink.table.sources.wmstrategies.AscendingTimestamps
import org.apache.flink.types.Row
-import org.apache.flink.util.Preconditions
import scala.collection.JavaConverters._
-import scala.collection.mutable
class TestTableSourceWithTime[T](
tableSchema: TableSchema,