You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "seunjjs (Jira)" <ji...@apache.org> on 2020/10/15 11:02:00 UTC

[jira] [Created] (FLINK-19655) NPE when using blink planner and TemporalTableFunction after setting IdleStateRetentionTime

seunjjs created FLINK-19655:
-------------------------------

             Summary: NPE when using blink planner and TemporalTableFunction after setting IdleStateRetentionTime 
                 Key: FLINK-19655
                 URL: https://issues.apache.org/jira/browse/FLINK-19655
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / Planner
    Affects Versions: 1.10.0
            Reporter: seunjjs


My Code here:
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, bsSettings);
tableEnv.getConfig().setIdleStateRetentionTime(Time.seconds(60), Time.seconds(600));

final Table table = tableEnv.from("tableName");
final TableFunction<?> function = table.createTemporalTableFunction(
                    temporalTableEntry.getTimeAttribute(),
                    String.join(",", temporalTableEntry.getPrimaryKeyFields()));
tableEnv.registerFunction(temporalTableEntry.getName(), function);

And NPE throwed when I executed my program.
java.lang.NullPointerException
	at org.apache.flink.table.runtime.operators.join.temporal.BaseTwoInputStreamOperatorWithStateRetention.registerProcessingCleanupTimer(BaseTwoInputStreamOperatorWithStateRetention.java:109)
	at org.apache.flink.table.runtime.operators.join.temporal.TemporalProcessTimeJoinOperator.processElement2(TemporalProcessTimeJoinOperator.java:98)
	at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processRecord2(StreamTwoInputProcessor.java:145)
	at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.lambda$new$1(StreamTwoInputProcessor.java:107)
	at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessor.java:362)
	at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)
	at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)
	at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:185)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311)
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
	at java.lang.Thread.run(Thread.java:748)
    
And When I changed to useOldPlanner, it worked fine.And when I debuged the code ,I found BaseTwoInputStreamOperatorWithStateRetention#open did not be executed.
Here is BaseTwoInputStreamOperatorWithStateRetention#open code.
public void open() throws Exception {
		initializeTimerService();

		if (stateCleaningEnabled) {
			ValueStateDescriptor<Long> cleanupStateDescriptor =
				new ValueStateDescriptor<>(CLEANUP_TIMESTAMP, Types.LONG);
			latestRegisteredCleanupTimer = getRuntimeContext().getState(cleanupStateDescriptor);
		}
	}
Here is TemporalProcessTimeJoinOperator#open code.
public void open() throws Exception {
		this.joinCondition = generatedJoinCondition.newInstance(getRuntimeContext().getUserCodeClassLoader());
		FunctionUtils.setFunctionRuntimeContext(joinCondition, getRuntimeContext());
		FunctionUtils.openFunction(joinCondition, new Configuration());

		ValueStateDescriptor<BaseRow> rightStateDesc = new ValueStateDescriptor<>("right", rightType);
		this.rightState = getRuntimeContext().getState(rightStateDesc);
		this.collector = new TimestampedCollector<>(output);
		this.outRow = new JoinedRow();
		// consider watermark from left stream only.
		super.processWatermark2(Watermark.MAX_WATERMARK);
	}
I compared the code with oldplaner(TemporalProcessTimeJoin#open).May be TemporalProcessTimeJoinOperator#open should add super.open()?
Here is TemporalProcessTimeJoin#open code.
override def open(): Unit = {
    LOG.debug(s"Compiling FlatJoinFunction: $genJoinFuncName \n\n Code:\n$genJoinFuncCode")
    val clazz = compile(
      getRuntimeContext.getUserCodeClassLoader,
      genJoinFuncName,
      genJoinFuncCode)

    LOG.debug("Instantiating FlatJoinFunction.")
    joinFunction = clazz.newInstance()
    FunctionUtils.setFunctionRuntimeContext(joinFunction, getRuntimeContext)
    FunctionUtils.openFunction(joinFunction, new Configuration())

    val rightStateDescriptor = new ValueStateDescriptor[Row]("right", rightType)
    rightState = getRuntimeContext.getState(rightStateDescriptor)

    collector = new TimestampedCollector[CRow](output)
    cRowWrapper = new CRowWrappingCollector()
    cRowWrapper.out = collector

    super.open()
  }



--
This message was sent by Atlassian Jira
(v8.3.4#803005)