You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2017/10/04 11:24:00 UTC
[jira] [Commented] (FLINK-7657) SQL Timestamps Converted To Wrong
Type By Optimizer Causing ClassCastException
[ https://issues.apache.org/jira/browse/FLINK-7657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16191135#comment-16191135 ]
ASF GitHub Bot commented on FLINK-7657:
---------------------------------------
Github user twalthr commented on a diff in the pull request:
https://github.com/apache/flink/pull/4746#discussion_r142633894
--- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/CheckExpressionsTableSource.scala ---
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.utils
+
+import org.apache.flink.api.java.typeutils.RowTypeInfo
+import org.apache.flink.api.java.{DataSet, ExecutionEnvironment}
+import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+import org.apache.flink.table.expressions.Expression
+import org.apache.flink.table.sources.{BatchTableSource, FilterableTableSource, StreamTableSource}
+import org.apache.flink.types.Row
+
+import java.util
+import java.util.Collections
+
+import scala.collection.JavaConverters._
+
+/**
+ * A table source that takes in assertions and applies them when applyPredicate is called.
+ * Allows for testing that expression push downs are handled properly
+ * @param typeInfo The type info.
+ * @param assertions A set of assertions as a function reference
+ * @param pushedDown Whether this has been pushed down/
+ */
+class CheckExpressionsTableSource(typeInfo: RowTypeInfo,
--- End diff --
Maybe it would be better to generalize `TestFilterableTableSource`.
> SQL Timestamps Converted To Wrong Type By Optimizer Causing ClassCastException
> ------------------------------------------------------------------------------
>
> Key: FLINK-7657
> URL: https://issues.apache.org/jira/browse/FLINK-7657
> Project: Flink
> Issue Type: Bug
> Components: Table API & SQL
> Affects Versions: 1.3.1, 1.3.2
> Reporter: Kent Murra
> Assignee: Kent Murra
> Priority: Critical
>
> I have a SQL statement using the Tables API that has a timestamp in it. When the execution environment tries to optimize the SQL, it causes an exception (attached below). The result is any SQL query with a timestamp, date, or time literal is unexecutable if any table source is marked with FilterableTableSource.
> {code:none}
> Exception in thread "main" java.lang.RuntimeException: Error while applying rule PushFilterIntoTableSourceScanRule, args [rel#30:FlinkLogicalCalc.LOGICAL(input=rel#29:Subset#0.LOGICAL,expr#0..1={inputs},expr#2=2017-05-01,expr#3=>($t1, $t2),data=$t0,last_updated=$t1,$condition=$t3), Scan(table:[test_table], fields:(data, last_updated))]
> at org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:235)
> at org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:650)
> at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:368)
> at org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:266)
> at org.apache.flink.table.api.BatchTableEnvironment.optimize(BatchTableEnvironment.scala:298)
> at org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:328)
> at org.apache.flink.table.api.BatchTableEnvironment.writeToSink(BatchTableEnvironment.scala:135)
> at org.apache.flink.table.api.Table.writeToSink(table.scala:800)
> at org.apache.flink.table.api.Table.writeToSink(table.scala:773)
> at com.remitly.flink.TestReproductionApp$.delayedEndpoint$com$remitly$flink$TestReproductionApp$1(TestReproductionApp.scala:27)
> at com.remitly.flink.TestReproductionApp$delayedInit$body.apply(TestReproductionApp.scala:22)
> at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
> at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
> at scala.App$$anonfun$main$1.apply(App.scala:76)
> at scala.App$$anonfun$main$1.apply(App.scala:76)
> at scala.collection.immutable.List.foreach(List.scala:381)
> at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
> at scala.App$class.main(App.scala:76)
> at com.remitly.flink.TestReproductionApp$.main(TestReproductionApp.scala:22)
> at com.remitly.flink.TestReproductionApp.main(TestReproductionApp.scala)
> Caused by: java.lang.ClassCastException: java.util.GregorianCalendar cannot be cast to java.util.Date
> at org.apache.flink.table.expressions.Literal.dateToCalendar(literals.scala:107)
> at org.apache.flink.table.expressions.Literal.toRexNode(literals.scala:80)
> at org.apache.flink.table.expressions.BinaryComparison$$anonfun$toRexNode$1.apply(comparison.scala:35)
> at org.apache.flink.table.expressions.BinaryComparison$$anonfun$toRexNode$1.apply(comparison.scala:35)
> at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.immutable.List.foreach(List.scala:381)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.immutable.List.map(List.scala:285)
> at org.apache.flink.table.expressions.BinaryComparison.toRexNode(comparison.scala:35)
> at org.apache.flink.table.plan.rules.logical.PushFilterIntoTableSourceScanRule$$anonfun$1.apply(PushFilterIntoTableSourceScanRule.scala:92)
> at org.apache.flink.table.plan.rules.logical.PushFilterIntoTableSourceScanRule$$anonfun$1.apply(PushFilterIntoTableSourceScanRule.scala:92)
> at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.Iterator$class.foreach(Iterator.scala:893)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> at org.apache.flink.table.plan.rules.logical.PushFilterIntoTableSourceScanRule.pushFilterIntoScan(PushFilterIntoTableSourceScanRule.scala:92)
> at org.apache.flink.table.plan.rules.logical.PushFilterIntoTableSourceScanRule.onMatch(PushFilterIntoTableSourceScanRule.scala:56)
> at org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:211)
> ... 19 more
> {code}
> I've done quite a bit of debugging on this and tracked it down to a problem with the way a Calcite AST is translated into an Expression tree for the predicates. Calcite parses timestamps as Calendar values, and you'll note in [RegNodeToExpressionConverter|https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExtractor.scala#L160] that a Calendar value is being passed as-is to the [Literal|https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala#L54] which does no conversion of the value. The Literal, in turn, [expects the value to be a java.sql.Date subclass|https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala#L106], which is where the exception arises.
> I've done some informal testing of a bugfix where I convert the calendars to java.sql.Date/java.sql.Time/java.sql.Timestamp in RegNodeToExpressionConverter and had good results. Here is some reproduction code in Scala. I am using Flink version 1.3.2 and running it in local mode (Right-click + Run-as in IntelliJ).
> {code:none}
> package kmurra
> import java.sql.Date
> import java.util
> import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, SqlTimeTypeInfo, TypeInformation}
> import org.apache.flink.api.java
> import org.apache.flink.api.java.DataSet
> import org.apache.flink.api.java.typeutils.RowTypeInfo
> import org.apache.flink.api.scala.ExecutionEnvironment
> import org.apache.flink.table.api.TableEnvironment
> import org.apache.flink.table.api.scala.BatchTableEnvironment
> import org.apache.flink.table.expressions.Expression
> import org.apache.flink.table.sinks.{BatchTableSink, TableSinkBase}
> import org.apache.flink.table.sources.{BatchTableSource, FilterableTableSource, TableSource}
> import org.apache.flink.types.Row
> import scala.collection.mutable.ListBuffer
> import scala.collection.JavaConversions._
> object TestReproductionApp extends App {
> val tables: BatchTableEnvironment = TableEnvironment.getTableEnvironment(ExecutionEnvironment.getExecutionEnvironment)
> val source = new TestTableSource
> val sink = new PrintTableSink()
> tables.registerTableSource("test_table", source)
> tables.sql("SELECT * FROM test_table WHERE last_updated > DATE '2017-05-01'").writeToSink(sink)
> }
> class PrintTableSink() extends TableSinkBase[Row] with BatchTableSink[Row] {
> def emitDataSet(dataSet: DataSet[Row]): Unit = dataSet.print()
> def getOutputType: TypeInformation[Row] = new RowTypeInfo(getFieldTypes, getFieldNames)
> protected def copy: TableSinkBase[Row] = new PrintTableSink()
> }
> class TestTableSource(val isFilterPushedDown: Boolean = false) extends BatchTableSource[Row] with FilterableTableSource[Row] {
> val getReturnType: RowTypeInfo = {
> val typeInfo = Array[TypeInformation[_]](BasicTypeInfo.STRING_TYPE_INFO, SqlTimeTypeInfo.DATE)
> val fieldNames = Array("data", "last_updated")
> new RowTypeInfo(typeInfo, fieldNames)
> }
> def applyPredicate(predicates: util.List[Expression]): TableSource[Row] = new TestTableSource(true)
> def getDataSet(execEnv: java.ExecutionEnvironment): java.DataSet[Row] = {
> execEnv.fromCollection({
> val data = ListBuffer[Row]()
> data += row("Success!", Date.valueOf("2017-09-01"))
> data += row("Failure!", Date.valueOf("2017-01-01"))
> data
> })
> }
> def row(data: String, lastUpdated: Date): Row = {
> val row = new Row(2)
> row.setField(0, data)
> row.setField(1, lastUpdated)
> row
> }
> }
> {code}
> Build system is SBT
> {code:none}
> name := "kmurra-flink-reproduction"
> organization := "kmurra"
> version := "1.0"
> scalaVersion := "2.11.8"
> resolvers ++= Seq("Apache Development Snapshot Repository" at "https://repository.apache.org/content/repositories/snapshots/", Resolver.mavenLocal)
> val flinkVersion = "1.3.2"
> libraryDependencies ++= Seq(
> "org.apache.flink" %% "flink-scala" % flinkVersion,// % "provided",
> "org.apache.flink" %% "flink-table" % flinkVersion,// % "provided",
> "org.apache.flink" %% "flink-avro" % flinkVersion,// % "provided",
> "org.apache.flink" %% "flink-streaming-scala" % flinkVersion,// % "provided",
> "org.apache.flink" % "flink-jdbc" % flinkVersion,// % "provided"
> )
> assemblyMergeStrategy in assembly := {
> case PathList("META-INF", xs @ _*) => MergeStrategy.discard
> case x => MergeStrategy.first
> }
> // exclude Scala library from assembly
> assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala = false)
> {code}
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)