You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Satyam Shekhar <sa...@gmail.com> on 2020/11/06 07:27:23 UTC

Failure to execute streaming SQL query

Hello,

I have a table T0 with the following schema -

root
      |-- amount: BIGINT
      |-- timestamp: TIMESTAMP(3)

The following two queries fail execution on the above table when executed
in streaming mode using the Blink planner.

WITH A AS (
  SELECT COUNT(*) AS ct, tumble_end(`timestamp`, INTERVAL '1' MINUTE) as tm
    FROM T0 GROUP BY tumble(`timestamp`, INTERVAL '1' MINUTE))
select L.ct, R.ct, L.tm, R.tm from A as L left join A as R on L.tm = R.tm

WITH A AS (
  SELECT COUNT(*) AS ct, tumble_rowtime(`timestamp`, INTERVAL '1' MINUTE)
as tm
    FROM T0 GROUP BY tumble(`timestamp`, INTERVAL '1' MINUTE))
select L.ct, R.ct, L.tm, R.tm from A as L left join A as R on L.tm = R.tm

The two queries are very similar and only differ in their use of tumble_end
and tumble_rowtime operator. Both queries use the timestamp column as their
rowtime attribute. Casting "tm" column to timestamp makes both queries work
-

WITH A AS (
  SELECT COUNT(*) AS ct, CAST(tumble_end(`timestamp`, INTERVAL '1' MINUTE)
as TIMESTAMP(3)) as tm
    FROM T0 GROUP BY tumble(`timestamp`, INTERVAL '1' MINUTE))
select L.ct, R.ct, L.tm, R.tm from A as L left join A as R on L.tm = R.tm

This workaround, however, loses the rowtime attribute from the output
resultset for the second query.

The first query fails with the following exception -

java.lang.RuntimeException: class java.sql.Timestamp cannot be cast to
class java.lang.Long (java.sql.Timestamp is in module java.sql of loader
'platform'; java.lang.Long is in module java.base of loader 'bootstrap')
at
org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
at
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
at
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
at SinkConversion$166.processElement(Unknown Source)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:626)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:603)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:563)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
at StreamExecCalc$163.processElement(Unknown Source)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:626)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:603)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:563)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
at
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)
at
org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.outputNullPadding(StreamingJoinOperator.java:314)
at
org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.processElement(StreamingJoinOperator.java:206)
at
org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.processElement1(StreamingJoinOperator.java:115)
at
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processRecord1(StreamTwoInputProcessor.java:132)
at
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.lambda$new$0(StreamTwoInputProcessor.java:99)
at
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessor.java:364)
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178)
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153)
at
org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:179)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:345)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.ClassCastException: class java.sql.Timestamp cannot be
cast to class java.lang.Long (java.sql.Timestamp is in module java.sql of
loader 'platform'; java.lang.Long is in module java.base of loader
'bootstrap')
at
org.apache.flink.api.common.typeutils.base.LongSerializer.serialize(LongSerializer.java:32)
at
org.apache.flink.api.java.typeutils.runtime.RowSerializer.serialize(RowSerializer.java:192)
at
org.apache.flink.api.java.typeutils.runtime.RowSerializer.serialize(RowSerializer.java:58)
at
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:140)
at
org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:37)
at
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:175)
at
org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:46)
at
org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
at
org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.serializeRecord(SpanningRecordSerializer.java:71)
at
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:117)
at
org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:60)
at
org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)

The second one fails with -

java.lang.RuntimeException: Error while applying rule
StreamExecIntervalJoinRule(in:LOGICAL,out:STREAM_PHYSICAL), args
[rel#606:FlinkLogicalJoin.LOGICAL.any.None:
0.[NONE].[NONE](left=RelSubset#602,right=RelSubset#602,condition==($1,
$3),joinType=left)]
at
org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:244)
at
org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:636)
at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:327)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
at
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at
scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
at
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
at
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:164)
at
org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:80)
at
org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
at
org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:279)
at
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:164)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1264)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.translateAndClearBuffer(TableEnvironmentImpl.java:1256)
at
org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1213)
at
io.netspring.blaze.eval.FlinkQueryEngine$StreamQuery.start(FlinkQueryEngine.java:353)
....
Caused by: java.util.NoSuchElementException: None.get
at scala.None$.get(Option.scala:347)
at scala.None$.get(Option.scala:345)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecIntervalJoin.<init>(StreamExecIntervalJoin.scala:72)
at
org.apache.flink.table.planner.plan.rules.physical.stream.StreamExecIntervalJoinRule.convert(StreamExecIntervalJoinRule.scala:122)
at
org.apache.calcite.rel.convert.ConverterRule.onMatch(ConverterRule.java:144)
at
org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:217)

I have filed a JIR for the issue here -
https://issues.apache.org/jira/browse/FLINK-20015#.

Looking for insights into how to work around the issue.

Regards,
Satyam

Re: Failure to execute streaming SQL query

Posted by Danny Chan <da...@apache.org>.
Hi, Satyam ~

What version of Flink release did you use? I tested your first SQL
statements in local and they both works great.

Your second SQL statement fails because currently we does not support
stream-stream join on time attributes because the join would breaks the
semantic of time attribute (it does not keeps any order). But the error
stacktrace is indeed mis-leading, i have logged an issue here [1]

You may need to declare the watermark strategy in the DDL so that the
`timestamp` column can be recognized as a time attribute, like this:

CREATE TABLE T0 (
  amount BIGINT,
  ts TIMESTAMP(3),
  watermark for ts as ts - INTERVAL '5' SECOND
) WITH (
  ...
)

[1] https://issues.apache.org/jira/browse/FLINK-20017

Satyam Shekhar <sa...@gmail.com> 于2020年11月6日周五 下午3:27写道:

> Hello,
>
> I have a table T0 with the following schema -
>
> root
>       |-- amount: BIGINT
>       |-- timestamp: TIMESTAMP(3)
>
> The following two queries fail execution on the above table when executed
> in streaming mode using the Blink planner.
>
> WITH A AS (
>   SELECT COUNT(*) AS ct, tumble_end(`timestamp`, INTERVAL '1' MINUTE) as tm
>     FROM T0 GROUP BY tumble(`timestamp`, INTERVAL '1' MINUTE))
> select L.ct, R.ct, L.tm, R.tm from A as L left join A as R on L.tm = R.tm
>
> WITH A AS (
>   SELECT COUNT(*) AS ct, tumble_rowtime(`timestamp`, INTERVAL '1' MINUTE)
> as tm
>     FROM T0 GROUP BY tumble(`timestamp`, INTERVAL '1' MINUTE))
> select L.ct, R.ct, L.tm, R.tm from A as L left join A as R on L.tm = R.tm
>
> The two queries are very similar and only differ in their use of
> tumble_end and tumble_rowtime operator. Both queries use the
> timestamp column as their rowtime attribute. Casting "tm" column to
> timestamp makes both queries work -
>
> WITH A AS (
>   SELECT COUNT(*) AS ct, CAST(tumble_end(`timestamp`, INTERVAL '1' MINUTE)
> as TIMESTAMP(3)) as tm
>     FROM T0 GROUP BY tumble(`timestamp`, INTERVAL '1' MINUTE))
> select L.ct, R.ct, L.tm, R.tm from A as L left join A as R on L.tm = R.tm
>
> This workaround, however, loses the rowtime attribute from the output
> resultset for the second query.
>
> The first query fails with the following exception -
>
> java.lang.RuntimeException: class java.sql.Timestamp cannot be cast to
> class java.lang.Long (java.sql.Timestamp is in module java.sql of loader
> 'platform'; java.lang.Long is in module java.base of loader 'bootstrap')
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:110)
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
> at SinkConversion$166.processElement(Unknown Source)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:626)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:603)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:563)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
> at StreamExecCalc$163.processElement(Unknown Source)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.pushToOperator(OperatorChain.java:626)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:603)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.collect(OperatorChain.java:563)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
> at
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)
> at
> org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.outputNullPadding(StreamingJoinOperator.java:314)
> at
> org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.processElement(StreamingJoinOperator.java:206)
> at
> org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator.processElement1(StreamingJoinOperator.java:115)
> at
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processRecord1(StreamTwoInputProcessor.java:132)
> at
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.lambda$new$0(StreamTwoInputProcessor.java:99)
> at
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessor.java:364)
> at
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178)
> at
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153)
> at
> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:179)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:345)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
> at java.base/java.lang.Thread.run(Thread.java:834)
> Caused by: java.lang.ClassCastException: class java.sql.Timestamp cannot
> be cast to class java.lang.Long (java.sql.Timestamp is in module java.sql
> of loader 'platform'; java.lang.Long is in module java.base of loader
> 'bootstrap')
> at
> org.apache.flink.api.common.typeutils.base.LongSerializer.serialize(LongSerializer.java:32)
> at
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.serialize(RowSerializer.java:192)
> at
> org.apache.flink.api.java.typeutils.runtime.RowSerializer.serialize(RowSerializer.java:58)
> at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:140)
> at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:37)
> at
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:175)
> at
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.serialize(StreamElementSerializer.java:46)
> at
> org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
> at
> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.serializeRecord(SpanningRecordSerializer.java:71)
> at
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:117)
> at
> org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:60)
> at
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
>
> The second one fails with -
>
> java.lang.RuntimeException: Error while applying rule
> StreamExecIntervalJoinRule(in:LOGICAL,out:STREAM_PHYSICAL), args
> [rel#606:FlinkLogicalJoin.LOGICAL.any.None:
> 0.[NONE].[NONE](left=RelSubset#602,right=RelSubset#602,condition==($1,
> $3),joinType=left)]
> at
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:244)
> at
> org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:636)
> at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:327)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkVolcanoProgram.optimize(FlinkVolcanoProgram.scala:64)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
> at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> at
> scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at
> scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
> at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
> at
> org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
> at
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:164)
> at
> org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:80)
> at
> org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:279)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:164)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1264)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translateAndClearBuffer(TableEnvironmentImpl.java:1256)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1213)
> at
> io.netspring.blaze.eval.FlinkQueryEngine$StreamQuery.start(FlinkQueryEngine.java:353)
> ....
> Caused by: java.util.NoSuchElementException: None.get
> at scala.None$.get(Option.scala:347)
> at scala.None$.get(Option.scala:345)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecIntervalJoin.<init>(StreamExecIntervalJoin.scala:72)
> at
> org.apache.flink.table.planner.plan.rules.physical.stream.StreamExecIntervalJoinRule.convert(StreamExecIntervalJoinRule.scala:122)
> at
> org.apache.calcite.rel.convert.ConverterRule.onMatch(ConverterRule.java:144)
> at
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:217)
>
> I have filed a JIR for the issue here -
> https://issues.apache.org/jira/browse/FLINK-20015#.
>
> Looking for insights into how to work around the issue.
>
> Regards,
> Satyam
>
>