You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "padavan (Jira)" <ji...@apache.org> on 2023/04/28 08:08:00 UTC
[jira] [Updated] (FLINK-31967) SQL with LAG function NullPointerException
[ https://issues.apache.org/jira/browse/FLINK-31967?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
padavan updated FLINK-31967:
----------------------------
Description:
I want to make a query with the LAG function. And get Job Exception without any explanations.
*Code:*
{{private static void t1_LeadLag(DataStream<UserModel> ds, StreamExecutionEnvironment env) {
StreamTableEnvironment te = StreamTableEnvironment.create(env);
Table t = te.fromDataStream(ds, Schema.newBuilder().columnByExpression("proctime", "proctime()").build());
te.createTemporaryView("users", t);
Table res = te.sqlQuery("SELECT userId, `count`,\n" +
" LAG(`count`) OVER (PARTITION BY userId ORDER BY proctime) AS prev_quantity\n" +
" FROM users");
te.toChangelogStream(res).print();
}}}
*Input:*
{quote}Unknown macro: \{"userId"}{quote}
*Exception:* I remove part about basic JobExecutionException and kept the important(i think)
{{Caused by: java.lang.NullPointerException
at org.apache.flink.table.data.GenericRowData.getInt(GenericRowData.java:149)
at org.apache.flink.table.data.RowData.lambda$createFieldGetter$245ca7d1$6(RowData.java:245)
at org$apache$flink$table$runtime$functions$aggregate$LagAggFunction$LagAcc$2$Converter.toExternal(Unknown Source)
at org.apache.flink.table.data.conversion.StructuredObjectConverter.toExternal(StructuredObjectConverter.java:101)
at UnboundedOverAggregateHelper$15.setAccumulators(Unknown Source)
at org.apache.flink.table.runtime.operators.over.ProcTimeUnboundedPrecedingFunction.processElement(ProcTimeUnboundedPrecedingFunction.java:92)
at org.apache.flink.table.runtime.operators.over.ProcTimeUnboundedPrecedingFunction.processElement(ProcTimeUnboundedPrecedingFunction.java:42)
at org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83)
at org.apache.flink.streaming.runtime.io.RecordProcessorUtils.lambda$getRecordProcessor$0(RecordProcessorUtils.java:60)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:237)
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:146)
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
at java.base/java.lang.Thread.run(Thread.java:829)}}
was:
I want to make a query with the LAG function. And get Job Exception without any explanations.
*Code:*
{{private static void t1_LeadLag(DataStream<UserModel> ds, StreamExecutionEnvironment env) \{
StreamTableEnvironment te = StreamTableEnvironment.create(env);
Table t = te.fromDataStream(ds, Schema.newBuilder().columnByExpression("proctime", "proctime()").build());
te.createTemporaryView("users", t);
Table res = te.sqlQuery("SELECT userId, `count`,\n" +
" LAG(`count`) OVER (PARTITION BY userId ORDER BY proctime) AS prev_quantity\n" +
" FROM users");
te.toChangelogStream(res).print();
}}}
*Input:*
{quote}{"userId":3,"count":0,"dt":"2023-04-28T07:44:21.551Z"}
{quote}
*Exception:* I remove part about basic JobExecutionException and kept the important(i think)
{{Caused by: java.lang.NullPointerException
at org.apache.flink.table.data.GenericRowData.getInt(GenericRowData.java:149)
at org.apache.flink.table.data.RowData.lambda$createFieldGetter$245ca7d1$6(RowData.java:245)
at org$apache$flink$table$runtime$functions$aggregate$LagAggFunction$LagAcc$2$Converter.toExternal(Unknown Source)
at org.apache.flink.table.data.conversion.StructuredObjectConverter.toExternal(StructuredObjectConverter.java:101)
at UnboundedOverAggregateHelper$15.setAccumulators(Unknown Source)
at org.apache.flink.table.runtime.operators.over.ProcTimeUnboundedPrecedingFunction.processElement(ProcTimeUnboundedPrecedingFunction.java:92)
at org.apache.flink.table.runtime.operators.over.ProcTimeUnboundedPrecedingFunction.processElement(ProcTimeUnboundedPrecedingFunction.java:42)
at org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83)
at org.apache.flink.streaming.runtime.io.RecordProcessorUtils.lambda$getRecordProcessor$0(RecordProcessorUtils.java:60)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:237)
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:146)
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
at java.base/java.lang.Thread.run(Thread.java:829)}}
> SQL with LAG function NullPointerException
> ------------------------------------------
>
> Key: FLINK-31967
> URL: https://issues.apache.org/jira/browse/FLINK-31967
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / API
> Reporter: padavan
> Priority: Major
>
> I want to make a query with the LAG function. And get Job Exception without any explanations.
>
> *Code:*
>
> {{private static void t1_LeadLag(DataStream<UserModel> ds, StreamExecutionEnvironment env) {
> StreamTableEnvironment te = StreamTableEnvironment.create(env);
> Table t = te.fromDataStream(ds, Schema.newBuilder().columnByExpression("proctime", "proctime()").build());
> te.createTemporaryView("users", t);
> Table res = te.sqlQuery("SELECT userId, `count`,\n" +
> " LAG(`count`) OVER (PARTITION BY userId ORDER BY proctime) AS prev_quantity\n" +
> " FROM users");
> te.toChangelogStream(res).print();
> }}}
> *Input:*
> {quote}Unknown macro: \{"userId"}{quote}
> *Exception:* I remove part about basic JobExecutionException and kept the important(i think)
>
>
>
> {{Caused by: java.lang.NullPointerException
> at org.apache.flink.table.data.GenericRowData.getInt(GenericRowData.java:149)
> at org.apache.flink.table.data.RowData.lambda$createFieldGetter$245ca7d1$6(RowData.java:245)
> at org$apache$flink$table$runtime$functions$aggregate$LagAggFunction$LagAcc$2$Converter.toExternal(Unknown Source)
> at org.apache.flink.table.data.conversion.StructuredObjectConverter.toExternal(StructuredObjectConverter.java:101)
> at UnboundedOverAggregateHelper$15.setAccumulators(Unknown Source)
> at org.apache.flink.table.runtime.operators.over.ProcTimeUnboundedPrecedingFunction.processElement(ProcTimeUnboundedPrecedingFunction.java:92)
> at org.apache.flink.table.runtime.operators.over.ProcTimeUnboundedPrecedingFunction.processElement(ProcTimeUnboundedPrecedingFunction.java:42)
> at org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83)
> at org.apache.flink.streaming.runtime.io.RecordProcessorUtils.lambda$getRecordProcessor$0(RecordProcessorUtils.java:60)
> at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:237)
> at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:146)
> at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
> at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550)
> at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839)
> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788)
> at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952)
> at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
> at java.base/java.lang.Thread.run(Thread.java:829)}}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)