You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by Evan <ch...@foxmail.com> on 2020/01/10 01:53:52 UTC
回复:flink消费Kafka没有数据问题
第一,查看神策的kafka的配置项advertised.host.name
------------------ 原始邮件 ------------------
发件人: "sunfulin"<sunfulin0321@163.com>;
发送时间: 2020年1月10日(星期五) 上午9:51
收件人: "user-zh@flink.apache.org"<user-zh@flink.apache.org>;
主题: flink消费Kafka没有数据问题
我有一个作业,通过Flink去消费神策的Kafka消息数据,参见(https://manual.sensorsdata.cn/sa/latest/page-1573828.html)。但是任务启动之后,Flink任务没有消费任何Kafka消息,我通过神策的kafka console consumer能看到源源不断的数据。
本身flink作业job里没有任何报错,想问下大家这种情况该如何排查?
Re:回复:flink消费Kafka没有数据问题
Posted by sunfulin <su...@163.com>.
感谢回复,排查后确实是hostname的配置问题。
任务还遇到了另外一个问题。下面是读取的Kafka连接配置,使用JSON SCHEMA来解析。不过实际运行时却抛出了如下异常,请问有大神知道是啥原因么?
Caused by: java.lang.NullPointerException: Null result cannot be used for atomic types.
at DataStreamSinkConversion$5.map(Unknown Source)
at org.apache.flink.table.runtime.CRowMapRunner.map(CRowMapRunner.scala:55)
at org.apache.flink.table.runtime.CRowMapRunner.map(CRowMapRunner.scala:34)
at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
at org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:37)
at org.apache.flink.table.runtime.CRowWrappingCollector.collect(CRowWrappingCollector.scala:28)
at DataStreamSourceConversion$2.processElement(Unknown Source)
at org.apache.flink.table.runtime.CRowOutputProcessRunner.processElement(CRowOutputProcessRunner.scala:70)
at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
at org.apache.flink.streaming.
tableEnv.connect(
new Kafka()
.version(kafkaInstance.getVersion())
.topic(chooseKafkaTopic(initPack.clusterMode))
.property("bootstrap.servers", kafkaInstance.getBrokerList())
.property("group.id", initPack.jobName)
.startFromEarliest() // 测试用,上生产可以去掉
).withSchema(
new Schema()
// 时间戳字段
.field("rowtime", Types.SQL_TIMESTAMP).rowtime(
new Rowtime()
.timestampsFromField("time")
.watermarksPeriodicBounded(1000)
)
.field("type", Types.STRING)
.field("event", Types.STRING)
.field("user_id", Types.STRING)
.field("distinct_id", Types.STRING)
.field("project", Types.STRING)
.field("recv_time", Types.SQL_TIMESTAMP)
.field("properties", Types.ROW_NAMED(
new String[] { "BROWSER_VERSION", "pathname", "search", "eventType", "message", "stack", "componentStack" },
Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING)
)
).withFormat(
new Json().failOnMissingField(false)
.deriveSchema()
)
.inAppendMode()
.registerTableSource(getTableName());
在 2020-01-10 09:53:52,"Evan" <ch...@foxmail.com> 写道:
>第一,查看神策的kafka的配置项advertised.host.name
>
>
>
>
>------------------ 原始邮件 ------------------
>发件人: "sunfulin"<sunfulin0321@163.com>;
>发送时间: 2020年1月10日(星期五) 上午9:51
>收件人: "user-zh@flink.apache.org"<user-zh@flink.apache.org>;
>
>主题: flink消费Kafka没有数据问题
>
>
>
>我有一个作业,通过Flink去消费神策的Kafka消息数据,参见(https://manual.sensorsdata.cn/sa/latest/page-1573828.html)。但是任务启动之后,Flink任务没有消费任何Kafka消息,我通过神策的kafka console consumer能看到源源不断的数据。
>本身flink作业job里没有任何报错,想问下大家这种情况该如何排查?