You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by sunfulin <su...@163.com> on 2020/01/10 05:50:08 UTC
Null result cannot be used for atomic types
Hi, I am running a Flink app while reading Kafka records with JSON format. And the connect code is like the following:
tableEnv.connect(
new Kafka()
.version(kafkaInstance.getVersion())
.topic(chooseKafkaTopic(initPack.clusterMode))
.property("bootstrap.servers", kafkaInstance.getBrokerList())
.property("group.id", initPack.jobName)
.startFromEarliest()
).withSchema(
new Schema()
// EVENT_TIME
.field("rowtime", Types.SQL_TIMESTAMP).rowtime(
new Rowtime()
.timestampsFromField("time")
.watermarksPeriodicBounded(1000)
)
.field("type", Types.STRING)
.field("event", Types.STRING)
.field("user_id", Types.STRING)
.field("distinct_id", Types.STRING)
.field("project", Types.STRING)
.field("recv_time", Types.SQL_TIMESTAMP)
.field("properties", Types.ROW_NAMED(
new String[] { "BROWSER_VERSION", "pathname", "search", "eventType", "message", "stack", "componentStack" },
Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING)
)
).withFormat(
new Json().failOnMissingField(false)
.deriveSchema()
)
.inAppendMode()
.registerTableSource(getTableName());
However, the application throws the following Exception which really confused me. From the code above, the field types are only Types.STRING or Types.SQL_TIMESTAMP.
Not sure which data field can run to this. Wanner some help from community.
Caused by: java.lang.NullPointerException: Null result cannot be used for atomic types.
at DataStreamSinkConversion$5.map(Unknown Source)
at org.apache.flink.table.runtime.CRowMapRunner.map(CRowMapRunner.scala:55)
at org.apache.flink.table.runtime.CRowMapRunner.map(CRowMapRunner.scala:34)
at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
at org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:37)
at org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:28)
at DataStreamSourceConversion$2.processElement(Unknown Source)
at org.apache.flink.table.runtime.CRowOutputProcessRunner.processElement(CRowOutputProcessRunner.scala:70)
at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
at org.apache.flink.streaming.
Re:Re: Null result cannot be used for atomic types
Posted by sunfulin <su...@163.com>.
Hi,
Thanks for the reply. Tends out that I am using table2datastream and tableEnv.sqlUpdate in the seem time and the exception thus is thrown. My mistake.
At 2020-01-10 17:11:02, "Jingsong Li" <ji...@gmail.com> wrote:
Hi sunfulin,
Looks like the error is happened in sink instead of source.
Caused by: java.lang.NullPointerException: Null result cannot be used for atomic types.
at DataStreamSinkConversion$5.map(Unknown Source)
So the point is how did you write to sink. Can you share these codes?
Best,
Jingsong Lee
On Fri, Jan 10, 2020 at 2:58 PM godfrey he <go...@gmail.com> wrote:
hi sunfulin,
which flink version are you using ?
best,
godfrey
sunfulin <su...@163.com> 于2020年1月10日周五 下午1:50写道:
Hi, I am running a Flink app while reading Kafka records with JSON format. And the connect code is like the following:
tableEnv.connect(
new Kafka()
.version(kafkaInstance.getVersion())
.topic(chooseKafkaTopic(initPack.clusterMode))
.property("bootstrap.servers", kafkaInstance.getBrokerList())
.property("group.id", initPack.jobName)
.startFromEarliest()
).withSchema(
new Schema()
// EVENT_TIME
.field("rowtime", Types.SQL_TIMESTAMP).rowtime(
new Rowtime()
.timestampsFromField("time")
.watermarksPeriodicBounded(1000)
)
.field("type", Types.STRING)
.field("event", Types.STRING)
.field("user_id", Types.STRING)
.field("distinct_id", Types.STRING)
.field("project", Types.STRING)
.field("recv_time", Types.SQL_TIMESTAMP)
.field("properties", Types.ROW_NAMED(
new String[] { "BROWSER_VERSION", "pathname", "search", "eventType", "message", "stack", "componentStack" },
Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING)
)
).withFormat(
new Json().failOnMissingField(false)
.deriveSchema()
)
.inAppendMode()
.registerTableSource(getTableName());
However, the application throws the following Exception which really confused me. From the code above, the field types are only Types.STRING or Types.SQL_TIMESTAMP.
Not sure which data field can run to this. Wanner some help from community.
Caused by: java.lang.NullPointerException: Null result cannot be used for atomic types.
at DataStreamSinkConversion$5.map(Unknown Source)
at org.apache.flink.table.runtime.CRowMapRunner.map(CRowMapRunner.scala:55)
at org.apache.flink.table.runtime.CRowMapRunner.map(CRowMapRunner.scala:34)
at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
at org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:37)
at org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:28)
at DataStreamSourceConversion$2.processElement(Unknown Source)
at org.apache.flink.table.runtime.CRowOutputProcessRunner.processElement(CRowOutputProcessRunner.scala:70)
at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
at org.apache.flink.streaming.
--
Best, Jingsong Lee
Re: Null result cannot be used for atomic types
Posted by Jingsong Li <ji...@gmail.com>.
Hi sunfulin,
Looks like the error is happened in sink instead of source.
Caused by: java.lang.NullPointerException: Null result cannot be used for
atomic types.
at DataStreamSinkConversion$5.map(Unknown Source)
So the point is how did you write to sink. Can you share these codes?
Best,
Jingsong Lee
On Fri, Jan 10, 2020 at 2:58 PM godfrey he <go...@gmail.com> wrote:
> hi sunfulin,
>
> which flink version are you using ?
>
> best,
> godfrey
>
> sunfulin <su...@163.com> 于2020年1月10日周五 下午1:50写道:
>
>> Hi, I am running a Flink app while reading Kafka records with JSON
>> format. And the connect code is like the following:
>>
>>
>> tableEnv.connect(
>>
>> new Kafka()
>>
>> .version(kafkaInstance.getVersion())
>>
>> .topic(chooseKafkaTopic(initPack.clusterMode))
>>
>> .property("bootstrap.servers",
>> kafkaInstance.getBrokerList())
>>
>> .property("group.id", initPack.jobName)
>>
>> .startFromEarliest()
>>
>> ).withSchema(
>>
>> new Schema()
>>
>> // EVENT_TIME
>>
>> .field("rowtime", Types.SQL_TIMESTAMP).rowtime(
>>
>> new Rowtime()
>>
>> .timestampsFromField("time")
>>
>> .watermarksPeriodicBounded(1000)
>>
>> )
>>
>> .field("type", Types.STRING)
>>
>> .field("event", Types.STRING)
>>
>> .field("user_id", Types.STRING)
>>
>> .field("distinct_id", Types.STRING)
>>
>> .field("project", Types.STRING)
>>
>> .field("recv_time", Types.SQL_TIMESTAMP)
>>
>> .field("properties", Types.ROW_NAMED(
>>
>> new String[] { "BROWSER_VERSION", "pathname",
>> "search", "eventType", "message", "stack", "componentStack" },
>>
>> Types.STRING, Types.STRING, Types.STRING,
>> Types.STRING, Types.STRING, Types.STRING, Types.STRING)
>>
>> )
>>
>> ).withFormat(
>>
>> new Json().failOnMissingField(false)
>>
>> .deriveSchema()
>>
>> )
>>
>> .inAppendMode()
>>
>> .registerTableSource(getTableName());
>>
>>
>>
>> However, the application throws the following Exception which really
>> confused me. From the code above, the field types are only *Types.STRING*
>> or *Types.SQL_TIMESTAMP. *
>>
>> *Not sure which data field can run to this. Wanner some help from
>> community.*
>>
>>
>> Caused by: java.lang.NullPointerException: Null result cannot be used for
>> atomic types.
>>
>> at DataStreamSinkConversion$5.map(Unknown Source)
>>
>> at
>> org.apache.flink.table.runtime.CRowMapRunner.map(CRowMapRunner.scala:55)
>>
>> at
>> org.apache.flink.table.runtime.CRowMapRunner.map(CRowMapRunner.scala:34)
>>
>> at
>> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
>>
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
>>
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
>>
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
>>
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
>>
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
>>
>> at
>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>>
>> at
>> org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:37)
>>
>> at
>> org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:28)
>>
>> at DataStreamSourceConversion$2.processElement(Unknown Source)
>>
>> at
>> org.apache.flink.table.runtime.CRowOutputProcessRunner.processElement(CRowOutputProcessRunner.scala:70)
>>
>> at
>> org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
>>
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
>>
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
>>
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
>>
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
>>
>> at org.apache.flink.streaming.
>>
>>
>>
>>
>>
>
--
Best, Jingsong Lee
Re: Null result cannot be used for atomic types
Posted by godfrey he <go...@gmail.com>.
hi sunfulin,
which flink version are you using ?
best,
godfrey
sunfulin <su...@163.com> 于2020年1月10日周五 下午1:50写道:
> Hi, I am running a Flink app while reading Kafka records with JSON format.
> And the connect code is like the following:
>
>
> tableEnv.connect(
>
> new Kafka()
>
> .version(kafkaInstance.getVersion())
>
> .topic(chooseKafkaTopic(initPack.clusterMode))
>
> .property("bootstrap.servers",
> kafkaInstance.getBrokerList())
>
> .property("group.id", initPack.jobName)
>
> .startFromEarliest()
>
> ).withSchema(
>
> new Schema()
>
> // EVENT_TIME
>
> .field("rowtime", Types.SQL_TIMESTAMP).rowtime(
>
> new Rowtime()
>
> .timestampsFromField("time")
>
> .watermarksPeriodicBounded(1000)
>
> )
>
> .field("type", Types.STRING)
>
> .field("event", Types.STRING)
>
> .field("user_id", Types.STRING)
>
> .field("distinct_id", Types.STRING)
>
> .field("project", Types.STRING)
>
> .field("recv_time", Types.SQL_TIMESTAMP)
>
> .field("properties", Types.ROW_NAMED(
>
> new String[] { "BROWSER_VERSION", "pathname",
> "search", "eventType", "message", "stack", "componentStack" },
>
> Types.STRING, Types.STRING, Types.STRING,
> Types.STRING, Types.STRING, Types.STRING, Types.STRING)
>
> )
>
> ).withFormat(
>
> new Json().failOnMissingField(false)
>
> .deriveSchema()
>
> )
>
> .inAppendMode()
>
> .registerTableSource(getTableName());
>
>
>
> However, the application throws the following Exception which really
> confused me. From the code above, the field types are only *Types.STRING*
> or *Types.SQL_TIMESTAMP. *
>
> *Not sure which data field can run to this. Wanner some help from
> community.*
>
>
> Caused by: java.lang.NullPointerException: Null result cannot be used for
> atomic types.
>
> at DataStreamSinkConversion$5.map(Unknown Source)
>
> at
> org.apache.flink.table.runtime.CRowMapRunner.map(CRowMapRunner.scala:55)
>
> at
> org.apache.flink.table.runtime.CRowMapRunner.map(CRowMapRunner.scala:34)
>
> at
> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
>
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
>
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
>
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
>
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
>
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
>
> at
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
>
> at
> org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:37)
>
> at
> org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:28)
>
> at DataStreamSourceConversion$2.processElement(Unknown Source)
>
> at
> org.apache.flink.table.runtime.CRowOutputProcessRunner.processElement(CRowOutputProcessRunner.scala:70)
>
> at
> org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
>
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
>
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
>
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
>
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
>
> at org.apache.flink.streaming.
>
>
>
>
>