You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Lehuede sebastien <le...@gmail.com> on 2018/04/25 08:57:28 UTC
Kafka to Flink Avro Deserializer
Hi Guys,
I tried to implement my Avro Deserializer following these link :
-
https://github.com/okkam-it/flink-examples/blob/master/src/main/java/org/okkam/flink/avro/AvroDeserializationSchema.java
-
https://stackoverflow.com/questions/38715286/how-to-decode-kafka-messages-using-avro-and-flink
Actually compilation is good, but when i send a Avro event from Kafka to
Flink, i got the following error :
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp
(JobManager.scala:897)
at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply
(JobManager.scala:840)
at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply
(JobManager.scala:840)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1
(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run
(Future.scala:24)
at akka.dispatch.TaskInvocation.run (AbstractDispatcher.scala:39)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec
(AbstractDispatcher.scala:415)
at scala.concurrent.forkjoin.ForkJoinTask.doExec (ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask
(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker
(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run
(ForkJoinWorkerThread.java:107)
Caused by: org.apache.avro.AvroRuntimeException:
avro.shaded.com.google.common.util.concurrent.UncheckedExecutionException:
java.lang.NullPointerException
at org.apache.avro.specific.SpecificData.getSchema
(SpecificData.java:227)
at org.apache.avro.specific.SpecificDatumReader.<init>
(SpecificDatumReader.java:37)
at com.nybble.alpha.AvroDeserializationSchema.ensureInitialized
(AvroDeserializationSchema.java:66)
at com.nybble.alpha.AvroDeserializationSchema.deserialize
(AvroDeserializationSchema.java:44)
at
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper.deserialize
(KeyedDeserializationSchemaWrapper.java:42)
at
org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop
(Kafka09Fetcher.java:139)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run
(FlinkKafkaConsumerBase.java:652)
at org.apache.flink.streaming.api.operators.StreamSource.run
(StreamSource.java:86)
at org.apache.flink.streaming.api.operators.StreamSource.run
(StreamSource.java:55)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run
(SourceStreamTask.java:94)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke
(StreamTask.java:264)
at org.apache.flink.runtime.taskmanager.Task.run (Task.java:718)
at java.lang.Thread.run (Thread.java:748)
Anyone have already encounter this error with Avro Deserialization ?
I can't find much information about "
avro.shaded.com.google.common.util.concurrent.UncheckedExecutionException"
Regards,
Sebastien.
Re: Kafka to Flink Avro Deserializer
Posted by Lehuede sebastien <le...@gmail.com>.
Hi Timo,
Thanks for your response !
I have define my Avro schema in "toKafka.avsc" and create my "toKafka.java"
file with :
*#java -jar avro-tools-1.8.2.jar compile schema toKafka.avsc*
Then i import Avro Serialize Schema and my "toKafka.java" generated file :
*import com.nybble.alpha.AvroDeserializationSchema;*
*import com.nybble.alpha.toKafka;*
Do i miss something ?
Regards,
Sebastien.
2018-04-25 11:32 GMT+02:00 Timo Walther <tw...@apache.org>:
> Hi Sebastien,
>
> for me this seems more an Avro issue than a Flink issue. You can ignore
> the shaded exception, we shade Google utilities for avoiding depencency
> conflicts.
>
> The root cause is this:
>
> java.lang.NullPointerException
> at org.apache.avro.specific.SpecificData.getSchema
> (SpecificData.java:227)
>
> And the corresponding lines look like this:
>
> /** Find the schema for a Java type. */
> public Schema getSchema(java.lang.reflect.Type type) {
> try {
> return schemaCache.get(type);
> } catch (Exception e) {
> throw (e instanceof AvroRuntimeException) ? // line 227
> (AvroRuntimeException)e.getCause() : new
> AvroRuntimeException(e);
> }
> }
>
> So I guess your schema is missing.
>
> I hope this helps.
>
> Regards,
> Timo
>
> Am 25.04.18 um 10:57 schrieb Lehuede sebastien:
>
>> ava.lang.NullPointerException
>> at org.apache.avro.specific.SpecificData.getSchema
>>
>
>
>
Re: Kafka to Flink Avro Deserializer
Posted by Timo Walther <tw...@apache.org>.
Hi Sebastien,
for me this seems more an Avro issue than a Flink issue. You can ignore
the shaded exception, we shade Google utilities for avoiding depencency
conflicts.
The root cause is this:
java.lang.NullPointerException
at org.apache.avro.specific.SpecificData.getSchema
(SpecificData.java:227)
And the corresponding lines look like this:
/** Find the schema for a Java type. */
public Schema getSchema(java.lang.reflect.Type type) {
try {
return schemaCache.get(type);
} catch (Exception e) {
throw (e instanceof AvroRuntimeException) ? // line 227
(AvroRuntimeException)e.getCause() : new AvroRuntimeException(e);
}
}
So I guess your schema is missing.
I hope this helps.
Regards,
Timo
Am 25.04.18 um 10:57 schrieb Lehuede sebastien:
> ava.lang.NullPointerException
> at org.apache.avro.specific.SpecificData.getSchema