You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Jeff Zhang <zj...@gmail.com> on 2018/11/13 12:49:46 UTC
Field could not be resolved by the field mapping when using kafka connector
Hi,
I hit the following error when I try to use kafka connector in flink table
api. There's very little document about how to use kafka connector in flink
table api, could anyone help me on that ? Thanks
Exception in thread "main" org.apache.flink.table.api.ValidationException:
Field 'event_ts' could not be resolved by the field mapping.
at
org.apache.flink.table.sources.TableSourceUtil$.org$apache$flink$table$sources$TableSourceUtil$$resolveInputField(TableSourceUtil.scala:491)
at
org.apache.flink.table.sources.TableSourceUtil$$anonfun$org$apache$flink$table$sources$TableSourceUtil$$resolveInputFields$1.apply(TableSourceUtil.scala:521)
at
org.apache.flink.table.sources.TableSourceUtil$$anonfun$org$apache$flink$table$sources$TableSourceUtil$$resolveInputFields$1.apply(TableSourceUtil.scala:521)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
at
org.apache.flink.table.sources.TableSourceUtil$.org$apache$flink$table$sources$TableSourceUtil$$resolveInputFields(TableSourceUtil.scala:521)
at
org.apache.flink.table.sources.TableSourceUtil$.validateTableSource(TableSourceUtil.scala:127)
at
org.apache.flink.table.plan.schema.StreamTableSourceTable.<init>(StreamTableSourceTable.scala:33)
at
org.apache.flink.table.api.StreamTableEnvironment.registerTableSourceInternal(StreamTableEnvironment.scala:150)
at
org.apache.flink.table.api.TableEnvironment.registerTableSource(TableEnvironment.scala:541)
at
org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.scala:47)
at
org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSourceAndSink(ConnectTableDescriptor.scala:68)
And here's the source code:
case class Record(status: String, direction: String, var event_ts: Timestamp)
def main(args: Array[String]): Unit = {
val senv = StreamExecutionEnvironment.getExecutionEnvironment
senv.setParallelism(1)
senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val data: DataStream[Record] = ...
val tEnv = TableEnvironment.getTableEnvironment(senv)
tEnv
// declare the external system to connect to
.connect(
new Kafka()
.version("0.11")
.topic("processed5.events")
.startFromEarliest()
.property("zookeeper.connect", "localhost:2181")
.property("bootstrap.servers", "localhost:9092"))
.withFormat(new Json()
.failOnMissingField(false)
.deriveSchema()
)
.withSchema(
new Schema()
.field("status", Types.STRING)
.field("direction", Types.STRING)
.field("event_ts", Types.SQL_TIMESTAMP).rowtime(
new Rowtime().timestampsFromField("event_ts").watermarksPeriodicAscending())
)
// specify the update-mode for streaming tables
.inAppendMode()
// register as source, sink, or both and under a name
.registerTableSourceAndSink("MyUserTable");
tEnv.fromDataStream(data).insertInto("MyUserTable")
Re: Field could not be resolved by the field mapping when using kafka connector
Posted by Dominik Wosiński <wo...@gmail.com>.
Hey,
Thanks for the info, I haven't noticed that.
I was just going through older messages with no responses.
Best Regards,
Dom.
Re: Field could not be resolved by the field mapping when using kafka
connector
Posted by Chesnay Schepler <ch...@apache.org>.
This issue was already resolved in another thread by the same author.
On 15.11.2018 10:52, Dominik Wosiński wrote:
> Hey,
>
> Could You please show a sample data that You want to process? This
> would help in verifying the issue.
>
> Best Regards,
> Dom.
>
> wt., 13 lis 2018 o 13:58 Jeff Zhang <zjffdu@gmail.com
> <ma...@gmail.com>> napisał(a):
>
> Hi,
>
> I hit the following error when I try to use kafka connector in
> flink table api. There's very little document about how to use
> kafka connector in flink table api, could anyone help me on that ?
> Thanks
>
> Exception in thread "main"
> org.apache.flink.table.api.ValidationException: Field 'event_ts'
> could not be resolved by the field mapping.
> at
> org.apache.flink.table.sources.TableSourceUtil$.org$apache$flink$table$sources$TableSourceUtil$$resolveInputField(TableSourceUtil.scala:491)
> at
> org.apache.flink.table.sources.TableSourceUtil$$anonfun$org$apache$flink$table$sources$TableSourceUtil$$resolveInputFields$1.apply(TableSourceUtil.scala:521)
> at
> org.apache.flink.table.sources.TableSourceUtil$$anonfun$org$apache$flink$table$sources$TableSourceUtil$$resolveInputFields$1.apply(TableSourceUtil.scala:521)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
> at
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
> at
> org.apache.flink.table.sources.TableSourceUtil$.org$apache$flink$table$sources$TableSourceUtil$$resolveInputFields(TableSourceUtil.scala:521)
> at
> org.apache.flink.table.sources.TableSourceUtil$.validateTableSource(TableSourceUtil.scala:127)
> at
> org.apache.flink.table.plan.schema.StreamTableSourceTable.<init>(StreamTableSourceTable.scala:33)
> at
> org.apache.flink.table.api.StreamTableEnvironment.registerTableSourceInternal(StreamTableEnvironment.scala:150)
> at
> org.apache.flink.table.api.TableEnvironment.registerTableSource(TableEnvironment.scala:541)
> at
> org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.scala:47)
> at
> org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSourceAndSink(ConnectTableDescriptor.scala:68)
>
> And here's the source code:
>
>
>
> case class Record(status:String, direction:String,var event_ts: Timestamp)
>
>
> def main(args: Array[String]): Unit = {
> val senv = StreamExecutionEnvironment.getExecutionEnvironment senv.setParallelism(1)
> senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>
> val data: DataStream[Record] = ...
> val tEnv = TableEnvironment.getTableEnvironment(senv)tEnv
> // declare the external system to connect to .connect(
> new Kafka()
> .version("0.11")
> .topic("processed5.events")
> .startFromEarliest()
> .property("zookeeper.connect","localhost:2181")
> .property("bootstrap.servers","localhost:9092"))
> .withFormat(new Json()
> .failOnMissingField(false)
> .deriveSchema()
> )
> .withSchema(
> new Schema()
> .field("status", Types.STRING)
> .field("direction", Types.STRING)
> .field("event_ts", Types.SQL_TIMESTAMP).rowtime(
> new Rowtime().timestampsFromField("event_ts").watermarksPeriodicAscending())
> )
>
> // specify the update-mode for streaming tables .inAppendMode()
>
> // register as source, sink, or both and under a name .registerTableSourceAndSink("MyUserTable");
>
> tEnv.fromDataStream(data).insertInto("MyUserTable")
>
Re: Field could not be resolved by the field mapping when using kafka connector
Posted by Dominik Wosiński <wo...@gmail.com>.
Hey,
Could You please show a sample data that You want to process? This would
help in verifying the issue.
Best Regards,
Dom.
wt., 13 lis 2018 o 13:58 Jeff Zhang <zj...@gmail.com> napisał(a):
> Hi,
>
> I hit the following error when I try to use kafka connector in flink table
> api. There's very little document about how to use kafka connector in flink
> table api, could anyone help me on that ? Thanks
>
> Exception in thread "main" org.apache.flink.table.api.ValidationException:
> Field 'event_ts' could not be resolved by the field mapping.
> at
> org.apache.flink.table.sources.TableSourceUtil$.org$apache$flink$table$sources$TableSourceUtil$$resolveInputField(TableSourceUtil.scala:491)
> at
> org.apache.flink.table.sources.TableSourceUtil$$anonfun$org$apache$flink$table$sources$TableSourceUtil$$resolveInputFields$1.apply(TableSourceUtil.scala:521)
> at
> org.apache.flink.table.sources.TableSourceUtil$$anonfun$org$apache$flink$table$sources$TableSourceUtil$$resolveInputFields$1.apply(TableSourceUtil.scala:521)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186)
> at
> org.apache.flink.table.sources.TableSourceUtil$.org$apache$flink$table$sources$TableSourceUtil$$resolveInputFields(TableSourceUtil.scala:521)
> at
> org.apache.flink.table.sources.TableSourceUtil$.validateTableSource(TableSourceUtil.scala:127)
> at
> org.apache.flink.table.plan.schema.StreamTableSourceTable.<init>(StreamTableSourceTable.scala:33)
> at
> org.apache.flink.table.api.StreamTableEnvironment.registerTableSourceInternal(StreamTableEnvironment.scala:150)
> at
> org.apache.flink.table.api.TableEnvironment.registerTableSource(TableEnvironment.scala:541)
> at
> org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.scala:47)
> at
> org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSourceAndSink(ConnectTableDescriptor.scala:68)
>
> And here's the source code:
>
>
>
> case class Record(status: String, direction: String, var event_ts: Timestamp)
>
>
> def main(args: Array[String]): Unit = {
> val senv = StreamExecutionEnvironment.getExecutionEnvironment
> senv.setParallelism(1)
> senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>
> val data: DataStream[Record] = ...
> val tEnv = TableEnvironment.getTableEnvironment(senv)
> tEnv
> // declare the external system to connect to
> .connect(
> new Kafka()
> .version("0.11")
> .topic("processed5.events")
> .startFromEarliest()
> .property("zookeeper.connect", "localhost:2181")
> .property("bootstrap.servers", "localhost:9092"))
> .withFormat(new Json()
> .failOnMissingField(false)
> .deriveSchema()
> )
> .withSchema(
> new Schema()
> .field("status", Types.STRING)
> .field("direction", Types.STRING)
> .field("event_ts", Types.SQL_TIMESTAMP).rowtime(
> new Rowtime().timestampsFromField("event_ts").watermarksPeriodicAscending())
> )
>
> // specify the update-mode for streaming tables
> .inAppendMode()
>
> // register as source, sink, or both and under a name
> .registerTableSourceAndSink("MyUserTable");
>
> tEnv.fromDataStream(data).insertInto("MyUserTable")
>
>