You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Lehuede sebastien <le...@gmail.com> on 2018/04/25 08:57:28 UTC

Kafka to Flink Avro Deserializer

 Hi Guys,

I tried to implement my Avro Deserializer following these link :

   -
   https://github.com/okkam-it/flink-examples/blob/master/src/main/java/org/okkam/flink/avro/AvroDeserializationSchema.java
   -
   https://stackoverflow.com/questions/38715286/how-to-decode-kafka-messages-using-avro-and-flink


Actually compilation is good, but when i send a Avro event from Kafka to
Flink, i got the following error :

org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
    at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp
(JobManager.scala:897)
    at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply
(JobManager.scala:840)
    at
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply
(JobManager.scala:840)
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1
(Future.scala:24)
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.run
(Future.scala:24)
    at akka.dispatch.TaskInvocation.run (AbstractDispatcher.scala:39)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec
(AbstractDispatcher.scala:415)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec (ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask
(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker
(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run
(ForkJoinWorkerThread.java:107)
Caused by: org.apache.avro.AvroRuntimeException:
avro.shaded.com.google.common.util.concurrent.UncheckedExecutionException:
java.lang.NullPointerException
    at org.apache.avro.specific.SpecificData.getSchema
(SpecificData.java:227)
    at org.apache.avro.specific.SpecificDatumReader.<init>
(SpecificDatumReader.java:37)
    at com.nybble.alpha.AvroDeserializationSchema.ensureInitialized
(AvroDeserializationSchema.java:66)
    at com.nybble.alpha.AvroDeserializationSchema.deserialize
(AvroDeserializationSchema.java:44)
    at
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper.deserialize
(KeyedDeserializationSchemaWrapper.java:42)
    at
org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop
(Kafka09Fetcher.java:139)
    at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run
(FlinkKafkaConsumerBase.java:652)
    at org.apache.flink.streaming.api.operators.StreamSource.run
(StreamSource.java:86)
    at org.apache.flink.streaming.api.operators.StreamSource.run
(StreamSource.java:55)
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run
(SourceStreamTask.java:94)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke
(StreamTask.java:264)
    at org.apache.flink.runtime.taskmanager.Task.run (Task.java:718)
    at java.lang.Thread.run (Thread.java:748)

Anyone have already encounter this error with Avro Deserialization ?

I can't find much information about "
avro.shaded.com.google.common.util.concurrent.UncheckedExecutionException"


Regards,
Sebastien.

Re: Kafka to Flink Avro Deserializer

Posted by Lehuede sebastien <le...@gmail.com>.
Hi Timo,

Thanks for your response !

I have define my Avro schema in "toKafka.avsc" and create my "toKafka.java"
file with :

*#java -jar avro-tools-1.8.2.jar compile schema toKafka.avsc*

Then i import Avro Serialize Schema and my "toKafka.java" generated file :

*import com.nybble.alpha.AvroDeserializationSchema;*
*import com.nybble.alpha.toKafka;*

Do i miss something ?

Regards,
Sebastien.


2018-04-25 11:32 GMT+02:00 Timo Walther <tw...@apache.org>:

> Hi Sebastien,
>
> for me this seems more an Avro issue than a Flink issue. You can ignore
> the shaded exception, we shade Google utilities for avoiding depencency
> conflicts.
>
> The root cause is this:
>
> java.lang.NullPointerException
>     at org.apache.avro.specific.SpecificData.getSchema
> (SpecificData.java:227)
>
> And the corresponding lines look like this:
>
>   /** Find the schema for a Java type. */
>   public Schema getSchema(java.lang.reflect.Type type) {
>     try {
>       return schemaCache.get(type);
>     } catch (Exception e) {
>       throw (e instanceof AvroRuntimeException) ? // line 227
>           (AvroRuntimeException)e.getCause() : new
> AvroRuntimeException(e);
>     }
>   }
>
> So I guess your schema is missing.
>
> I hope this helps.
>
> Regards,
> Timo
>
> Am 25.04.18 um 10:57 schrieb Lehuede sebastien:
>
>> ava.lang.NullPointerException
>>     at org.apache.avro.specific.SpecificData.getSchema
>>
>
>
>

Re: Kafka to Flink Avro Deserializer

Posted by Timo Walther <tw...@apache.org>.
Hi Sebastien,

for me this seems more an Avro issue than a Flink issue. You can ignore 
the shaded exception, we shade Google utilities for avoiding depencency 
conflicts.

The root cause is this:

java.lang.NullPointerException
     at org.apache.avro.specific.SpecificData.getSchema 
(SpecificData.java:227)

And the corresponding lines look like this:

   /** Find the schema for a Java type. */
   public Schema getSchema(java.lang.reflect.Type type) {
     try {
       return schemaCache.get(type);
     } catch (Exception e) {
       throw (e instanceof AvroRuntimeException) ? // line 227
           (AvroRuntimeException)e.getCause() : new AvroRuntimeException(e);
     }
   }

So I guess your schema is missing.

I hope this helps.

Regards,
Timo

Am 25.04.18 um 10:57 schrieb Lehuede sebastien:
> ava.lang.NullPointerException
>     at org.apache.avro.specific.SpecificData.getSchema