You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Jark Wu (Jira)" <ji...@apache.org> on 2020/10/22 03:19:00 UTC
[jira] [Closed] (FLINK-19655) NPE when using blink planner and
TemporalTableFunction after setting IdleStateRetentionTime
[ https://issues.apache.org/jira/browse/FLINK-19655?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jark Wu closed FLINK-19655.
---------------------------
Fix Version/s: 1.12.0
Resolution: Fixed
Fixed in master: daeda68edf3466a3f9347c25bdf866ef4f620396
> NPE when using blink planner and TemporalTableFunction after setting IdleStateRetentionTime
> --------------------------------------------------------------------------------------------
>
> Key: FLINK-19655
> URL: https://issues.apache.org/jira/browse/FLINK-19655
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Planner
> Affects Versions: 1.10.0, 1.11.0
> Reporter: seunjjs
> Assignee: seunjjs
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.12.0
>
>
> My Code here:
> {code:java}
> EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, bsSettings);
> tableEnv.getConfig().setIdleStateRetentionTime(Time.seconds(60), Time.seconds(600));
> final Table table = tableEnv.from("tableName");
> final TableFunction<?> function = table.createTemporalTableFunction(
> temporalTableEntry.getTimeAttribute(),
> String.join(",", temporalTableEntry.getPrimaryKeyFields()));
> tableEnv.registerFunction(temporalTableEntry.getName(), function);
> {code}
> And NPE throwed when I executed my program.
> {code:java}
> java.lang.NullPointerException
> at org.apache.flink.table.runtime.operators.join.temporal.BaseTwoInputStreamOperatorWithStateRetention.registerProcessingCleanupTimer(BaseTwoInputStreamOperatorWithStateRetention.java:109)
> at org.apache.flink.table.runtime.operators.join.temporal.TemporalProcessTimeJoinOperator.processElement2(TemporalProcessTimeJoinOperator.java:98)
> at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processRecord2(StreamTwoInputProcessor.java:145)
> at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.lambda$new$1(StreamTwoInputProcessor.java:107)
> at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessor.java:362)
> at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)
> at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)
> at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:185)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311)
> at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
> at java.lang.Thread.run(Thread.java:748)
> {code}
>
> And When I changed to useOldPlanner, it worked fine.And when I debuged the code ,I found BaseTwoInputStreamOperatorWithStateRetention#open did not be executed.
> Here is BaseTwoInputStreamOperatorWithStateRetention#open code.
> {code:java}
> public void open() throws Exception {
> initializeTimerService();
> if (stateCleaningEnabled) {
> ValueStateDescriptor<Long> cleanupStateDescriptor =
> new ValueStateDescriptor<>(CLEANUP_TIMESTAMP, Types.LONG);
> latestRegisteredCleanupTimer = getRuntimeContext().getState(cleanupStateDescriptor);
> }
> }
> {code}
> Here is TemporalProcessTimeJoinOperator#open code.
> {code:java}
> public void open() throws Exception {
> this.joinCondition = generatedJoinCondition.newInstance(getRuntimeContext().getUserCodeClassLoader());
> FunctionUtils.setFunctionRuntimeContext(joinCondition, getRuntimeContext());
> FunctionUtils.openFunction(joinCondition, new Configuration());
> ValueStateDescriptor<BaseRow> rightStateDesc = new ValueStateDescriptor<>("right", rightType);
> this.rightState = getRuntimeContext().getState(rightStateDesc);
> this.collector = new TimestampedCollector<>(output);
> this.outRow = new JoinedRow();
> // consider watermark from left stream only.
> super.processWatermark2(Watermark.MAX_WATERMARK);
> }
> {code}
> I compared the code with oldplaner(TemporalProcessTimeJoin#open).May be TemporalProcessTimeJoinOperator#open should add super.open()?
> Here is TemporalProcessTimeJoin#open code.
> {code:scala}
> override def open(): Unit = {
> LOG.debug(s"Compiling FlatJoinFunction: $genJoinFuncName \n\n Code:\n$genJoinFuncCode")
> val clazz = compile(
> getRuntimeContext.getUserCodeClassLoader,
> genJoinFuncName,
> genJoinFuncCode)
> LOG.debug("Instantiating FlatJoinFunction.")
> joinFunction = clazz.newInstance()
> FunctionUtils.setFunctionRuntimeContext(joinFunction, getRuntimeContext)
> FunctionUtils.openFunction(joinFunction, new Configuration())
> val rightStateDescriptor = new ValueStateDescriptor[Row]("right", rightType)
> rightState = getRuntimeContext.getState(rightStateDescriptor)
> collector = new TimestampedCollector[CRow](output)
> cRowWrapper = new CRowWrappingCollector()
> cRowWrapper.out = collector
> super.open()
> }
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)