You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Dan Hill <qu...@gmail.com> on 2020/10/09 06:09:02 UTC
Any testing issues when using StreamTableEnvironment.createTemporaryView?
*Summary*
I'm hitting an error when running a test that is related to using
createTemporaryView to convert a Protobuf input stream to Flink Table API.
I'm not sure how to debug "SourceConversion$5.processElement(Unknown
Source)" line. Is this generated code? How can I debug this?
Any help would be appreciated. Thanks! - Dan
*Details*
My current input is a protocol buffer stream. I convert it to the Table
API spec using createTemporaryView. The code is hacky. I want to get some
tests implemented before cleaning it up.
KeyedStream<BatchLog, String> batchLogStream =
env.<BatchLog>fromElements(BatchLog.class, new
LogGenerator.BatchLogIterator().next())
.keyBy((logRequest) -> logRequest.getUserId());
tableEnv.createTemporaryView(
"input_user",
batchLogStream.flatMap(new ToUsers()),
$("userId"),
$("timeEpochMillis"),
$("userTime").rowtime());
This appears to work in my prototype (maybe serialization is broken). In a
Flink test, I hit the following error.
org.apache.flink.runtime.taskmanager.Task: Flat Map -> Map ->
SourceConversion(table=[default.mydb.input_user], fields=[userId,
timeEpochMillis, userTime]) -> Calc(select=[userId, timeEpochMillis]) ->
StreamingFileWriter (2/7) (ae67114dd4175c6fd87063f73706c8ec) switched from
RUNNING to FAILED. java.lang.NullPointerException
at SourceConversion$5.processElement(Unknown Source)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
at
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
at
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
at
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)
at
ai.promoted.metrics.logprocessor.common.functions.ToUsers.flatMap(ToUsers.java:18)
at
ai.promoted.metrics.logprocessor.common.functions.ToUsers.flatMap(ToUsers.java:11)
at
org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50)
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161)
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178)
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:345)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.base/java.lang.Thread.run(Thread.java:834)
I wasn't able to find this exact stacktrace when looking on Google.
Re: Any testing issues when using StreamTableEnvironment.createTemporaryView?
Posted by Dan Hill <qu...@gmail.com>.
I figured out my issue. I needed to assign watermarks (e.g.
assignTimestampsAndWatermarks) after the fromElements. I could not figure
out how the auto-generated code worked. I hooked up a debugger and guessed
at the issue.
On Thu, Oct 8, 2020 at 11:09 PM Dan Hill <qu...@gmail.com> wrote:
> *Summary*
> I'm hitting an error when running a test that is related to using
> createTemporaryView to convert a Protobuf input stream to Flink Table API.
> I'm not sure how to debug "SourceConversion$5.processElement(Unknown
> Source)" line. Is this generated code? How can I debug this?
>
> Any help would be appreciated. Thanks! - Dan
>
> *Details*
> My current input is a protocol buffer stream. I convert it to the Table
> API spec using createTemporaryView. The code is hacky. I want to get some
> tests implemented before cleaning it up.
>
> KeyedStream<BatchLog, String> batchLogStream =
> env.<BatchLog>fromElements(BatchLog.class, new
> LogGenerator.BatchLogIterator().next())
> .keyBy((logRequest) -> logRequest.getUserId());
>
> tableEnv.createTemporaryView(
> "input_user",
> batchLogStream.flatMap(new ToUsers()),
> $("userId"),
> $("timeEpochMillis"),
> $("userTime").rowtime());
>
> This appears to work in my prototype (maybe serialization is broken). In
> a Flink test, I hit the following error.
>
> org.apache.flink.runtime.taskmanager.Task: Flat Map -> Map ->
> SourceConversion(table=[default.mydb.input_user], fields=[userId,
> timeEpochMillis, userTime]) -> Calc(select=[userId, timeEpochMillis]) ->
> StreamingFileWriter (2/7) (ae67114dd4175c6fd87063f73706c8ec) switched from
> RUNNING to FAILED. java.lang.NullPointerException
> at SourceConversion$5.processElement(Unknown Source)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
> at
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
> at
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)
> at
> ai.promoted.metrics.logprocessor.common.functions.ToUsers.flatMap(ToUsers.java:18)
> at
> ai.promoted.metrics.logprocessor.common.functions.ToUsers.flatMap(ToUsers.java:11)
> at
> org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:50)
> at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161)
> at
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178)
> at
> org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153)
> at
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:345)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
> at java.base/java.lang.Thread.run(Thread.java:834)
>
>
> I wasn't able to find this exact stacktrace when looking on Google.
>