You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Jiang Xin (Jira)" <ji...@apache.org> on 2022/12/14 15:35:00 UTC
[jira] [Created] (FLINK-30420) NPE thrown when using window time in Table API
Jiang Xin created FLINK-30420:
---------------------------------
Summary: NPE thrown when using window time in Table API
Key: FLINK-30420
URL: https://issues.apache.org/jira/browse/FLINK-30420
Project: Flink
Issue Type: Bug
Components: Table SQL / Runtime
Reporter: Jiang Xin
Run the following unit test and it would fail.
{code:java}
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource;
import org.apache.flink.streaming.api.functions.source.datagen.SequenceGenerator;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.Tumble;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.AggregateFunction;
import org.apache.flink.table.functions.TableFunction;
import org.junit.Before;
import org.junit.Test;
import java.sql.Timestamp;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import static org.apache.flink.table.api.DataTypes.TIMESTAMP;
import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.call;
import static org.apache.flink.table.api.Expressions.currentTimestamp;
import static org.apache.flink.table.api.Expressions.lit;
public class TestAggWithSourceWatermark {
private StreamTableEnvironment tEnv;
private StreamExecutionEnvironment env;
@Before
public void before() {
Configuration config = new Configuration();
env = StreamExecutionEnvironment.getExecutionEnvironment(config);
env.setParallelism(1);
tEnv = StreamTableEnvironment.create(env);
}
@Test
public void testWindowTime() {
DataStream<Integer> stream =
env.addSource(
new DataGeneratorSource<>(
SequenceGenerator.intGenerator(0, 30), 1, 30l))
.returns(Integer.class);
DataStream<Tuple2<Integer, Long>> streamWithTime =
stream.map(x -> Tuple2.of(x, System.currentTimeMillis()))
.returns(Types.TUPLE(Types.INT, Types.LONG))
.assignTimestampsAndWatermarks(
WatermarkStrategy.<Tuple2<Integer, Long>>forBoundedOutOfOrderness(
Duration.ofSeconds(2))
.withTimestampAssigner(
(ctx) -> (element, recordTimestamp) -> element.f1));
Schema schema =
Schema.newBuilder()
.column("f0", DataTypes.INT())
.column("f1", DataTypes.BIGINT())
.columnByMetadata("rowtime", "TIMESTAMP_LTZ(3)")
.watermark("rowtime", "SOURCE_WATERMARK()")
.build();
Table table = tEnv.fromDataStream(streamWithTime, schema);
table = table.select($("rowtime"));
Table windowedTable =
table.window(Tumble.over("5.seconds").on("rowtime").as("w"))
.groupBy($("w"))
.select(
call(UDAF.class, $("rowtime")).as("row_times"),
$("w").rowtime().as("window_time"),
currentTimestamp().as("current_timestamp"));
windowedTable =
windowedTable
.joinLateral(call(SplitFunction.class, $("row_times")).as("rowtime"))
.select(
$("rowtime").cast(TIMESTAMP(3)).as("rowtime"),
$("window_time"),
$("current_timestamp"));
windowedTable.printSchema();
windowedTable.execute().print();
}
public static class SplitFunction extends TableFunction<Timestamp> {
public void eval(List<Timestamp> times) {
for (int i = 0; i < times.size(); i++) {
collect(times.get(i));
}
}
}
public static class UDAF extends AggregateFunction<List<Timestamp>, List<Timestamp>> {
public UDAF() {}
@Override
public List<Timestamp> createAccumulator() {
return new ArrayList<>();
}
public void accumulate(List<Timestamp> accumulator, Timestamp num) {
accumulator.add(num);
}
@Override
public List<Timestamp> getValue(List<Timestamp> accumulator) {
return accumulator;
}
}
} {code}
Then the following exception occurs
{code:java}
java.lang.NullPointerException
at org.apache.calcite.sql2rel.RelDecorrelator.getNewForOldInputRef(RelDecorrelator.java:1359)
at org.apache.calcite.sql2rel.RelDecorrelator.access$400(RelDecorrelator.java:122)
at org.apache.calcite.sql2rel.RelDecorrelator$DecorrelateRexShuttle.visitInputRef(RelDecorrelator.java:1638)
at org.apache.calcite.sql2rel.RelDecorrelator$DecorrelateRexShuttle.visitInputRef(RelDecorrelator.java:1595)
at org.apache.calcite.rex.RexInputRef.accept(RexInputRef.java:112)
at org.apache.calcite.rex.RexShuttle.visitList(RexShuttle.java:158)
at org.apache.calcite.rex.RexShuttle.visitCall(RexShuttle.java:110)
at org.apache.calcite.rex.RexShuttle.visitCall(RexShuttle.java:33)
at org.apache.calcite.rex.RexCall.accept(RexCall.java:174)
at org.apache.calcite.sql2rel.RelDecorrelator.decorrelateExpr(RelDecorrelator.java:348)
at org.apache.calcite.sql2rel.RelDecorrelator.decorrelateRel(RelDecorrelator.java:759)
at org.apache.calcite.sql2rel.RelDecorrelator.decorrelateRel(RelDecorrelator.java:723)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.calcite.util.ReflectUtil$2.invoke(ReflectUtil.java:525)
at org.apache.calcite.sql2rel.RelDecorrelator.getInvoke(RelDecorrelator.java:687)
at org.apache.calcite.sql2rel.RelDecorrelator.decorrelateRel(RelDecorrelator.java:1170)
at org.apache.calcite.sql2rel.RelDecorrelator.decorrelateRel(RelDecorrelator.java:1153)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.calcite.util.ReflectUtil$2.invoke(ReflectUtil.java:525)
at org.apache.calcite.sql2rel.RelDecorrelator.getInvoke(RelDecorrelator.java:687)
at org.apache.calcite.sql2rel.RelDecorrelator.decorrelateRel(RelDecorrelator.java:734)
at org.apache.calcite.sql2rel.RelDecorrelator.decorrelateRel(RelDecorrelator.java:723)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.calcite.util.ReflectUtil$2.invoke(ReflectUtil.java:525)
at org.apache.calcite.sql2rel.RelDecorrelator.getInvoke(RelDecorrelator.java:687)
at org.apache.calcite.sql2rel.RelDecorrelator.decorrelateRel(RelDecorrelator.java:391)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.calcite.util.ReflectUtil$2.invoke(ReflectUtil.java:525)
at org.apache.calcite.sql2rel.RelDecorrelator.getInvoke(RelDecorrelator.java:687)
at org.apache.calcite.sql2rel.RelDecorrelator.decorrelate(RelDecorrelator.java:276)
at org.apache.calcite.sql2rel.RelDecorrelator.decorrelateQuery(RelDecorrelator.java:200)
at org.apache.calcite.sql2rel.RelDecorrelator.decorrelateQuery(RelDecorrelator.java:165)
at org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram.optimize(FlinkDecorrelateProgram.scala:41)
at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$2(FlinkGroupProgram.scala:59)
at scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
at scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
at scala.collection.Iterator.foreach(Iterator.scala:937)
at scala.collection.Iterator.foreach$(Iterator.scala:937)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
at scala.collection.IterableLike.foreach(IterableLike.scala:70)
at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156)
at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154)
at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1(FlinkGroupProgram.scala:56)
at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1$adapted(FlinkGroupProgram.scala:51)
at scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
at scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
at scala.collection.immutable.Range.foreach(Range.scala:155)
at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156)
at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154)
at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:51)
at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:59)
at scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156)
at scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156)
at scala.collection.Iterator.foreach(Iterator.scala:937)
at scala.collection.Iterator.foreach$(Iterator.scala:937)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
at scala.collection.IterableLike.foreach(IterableLike.scala:70)
at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156)
at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154)
at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:55)
at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:175)
at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:82)
at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:75)
at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:307)
at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:187)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1656)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:828)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1317)
at org.apache.flink.table.api.internal.TableImpl.execute(TableImpl.java:605)
at org.apache.flink.ml.clustering.TestAggWithSourceWatermark.testWindowTime(TestAggWithSourceWatermark.java:109)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:220)
at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:53)
{code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)