You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by dim5b <dm...@gmail.com> on 2018/03/21 15:40:21 UTC

Confluent Schema Registry DeserializationSchema

I trying to connect to schema registry and deserialize the project. 

I am building my project and on mvn build i get the  error

 class file for kafka.utils.VerifiableProperties not found...


import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.serializers.KafkaAvroDecoder;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.TypeExtractor;


public class ConfluentAvroDeserializationSchema implements
DeserializationSchema<CelloAvro> {

    private final String schemaRegistryUrl;
    private final int identityMapCapacity;
    private KafkaAvroDecoder kafkaAvroDecoder;

    public ConfluentAvroDeserializationSchema(String schemaRegistyUrl) {
        this(schemaRegistyUrl, 1000);
    }

    public ConfluentAvroDeserializationSchema(String schemaRegistryUrl, int
identityMapCapacity) {
        this.schemaRegistryUrl = schemaRegistryUrl;
        this.identityMapCapacity = identityMapCapacity;
    }

    @Override
    public CelloAvro deserialize(byte[] bytes) throws IOException {
        if (kafkaAvroDecoder == null) {
            SchemaRegistryClient schemaRegistry = new
CachedSchemaRegistryClient(this.schemaRegistryUrl,
this.identityMapCapacity);
            this.kafkaAvroDecoder = new KafkaAvroDecoder(schemaRegistry);
        }
        return (CelloAvro) this.kafkaAvroDecoder.fromBytes(bytes);
    }

    @Override
    public boolean isEndOfStream(CelloAvro celloAvro) {
        return false;
    }

    @Override
    public TypeInformation<CelloAvro> getProducedType() {
        return TypeExtractor.getForClass(CelloAvro.class);
    }
}

My dependencies are:

<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-avro</artifactId>
			<version>${flink.version}</version>
		</dependency>

		<dependency>
			<groupId>io.confluent</groupId>
			<artifactId>kafka-avro-serializer</artifactId>
			<version>4.0.0</version>
		</dependency>


Could someone please help I see there is an open issue for an end to end
test with  Confluent's Schema Registry

https://issues.apache.org/jira/browse/FLINK-8970






--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Confluent Schema Registry DeserializationSchema

Posted by dim5b <dm...@gmail.com>.
I added kafka tomy  dependencies although i am not sure why this would be
required... seems to work

		<dependency>
			<groupId>org.apache.kafka</groupId>
			<artifactId>kafka_${kafka.scala.version}</artifactId>
			<version>${kafka.version}</version>
		</dependency>

This is my full dependency list...

<dependencies>
		
		
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-java</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>

		
		<dependency>
			<groupId>org.apache.flink</groupId>
		
<artifactId>flink-connector-kafka-0.11_${scala.binary.version}</artifactId>
			<version>${flink.version}</version>
		</dependency>

		<dependency>
			<groupId>org.apache.flink</groupId>
		
<artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId>
			<version>${flink.version}</version>
		</dependency>

		
		
		<dependency>
			<groupId>org.slf4j</groupId>
			<artifactId>slf4j-log4j12</artifactId>
			<version>1.7.7</version>
			<scope>runtime</scope>
		</dependency>
		<dependency>
			<groupId>log4j</groupId>
			<artifactId>log4j</artifactId>
			<version>1.2.17</version>
			<scope>runtime</scope>
		</dependency>
		<dependency>
			<groupId>eu.neurocom</groupId>
			<artifactId>mip-model-poc</artifactId>
			<version>1.0</version>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-avro</artifactId>
			<version>${flink.version}</version>
		</dependency>
		<dependency>
			<groupId>io.confluent</groupId>
			<artifactId>kafka-avro-serializer</artifactId>
			<version>4.0.0</version>
		</dependency>
		<dependency>
			<groupId>org.apache.kafka</groupId>
			<artifactId>kafka_${kafka.scala.version}</artifactId>
			<version>${kafka.version}</version>
		</dependency>
	</dependencies>

This does solve the issue but now i am getting the folowing error...


java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record
cannot be cast to eu.neurocom.avro.CelloAvro
	at
eu.neurocom.schema.ConfluentAvroDeserializationSchema.deserialize(ConfluentAvroDeserializationSchema.java:37)
	at
eu.neurocom.schema.ConfluentAvroDeserializationSchema.deserialize(ConfluentAvroDeserializationSchema.java:16)
	at
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper.deserialize(KeyedDeserializationSchemaWrapper.java:42)
	at
org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:139)
	at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:652)





--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Confluent Schema Registry DeserializationSchema

Posted by Piotr Nowojski <pi...@data-artisans.com>.
Hi,

It looks like to me that kafka.utils.VerifiableProperties comes  from org.apache.kafka:kafka package - please check and solve (if possible) dependency conflicts in your pom.xml regarding this package. Probably there is some version collision.

Piotrek

> On 21 Mar 2018, at 16:40, dim5b <dm...@gmail.com> wrote:
> 
> I trying to connect to schema registry and deserialize the project. 
> 
> I am building my project and on mvn build i get the  error
> 
> class file for kafka.utils.VerifiableProperties not found...
> 
> 
> import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
> import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
> import io.confluent.kafka.serializers.KafkaAvroDecoder;
> import org.apache.flink.api.common.serialization.DeserializationSchema;
> import org.apache.flink.api.common.typeinfo.TypeInformation;
> import org.apache.flink.api.java.typeutils.TypeExtractor;
> 
> 
> public class ConfluentAvroDeserializationSchema implements
> DeserializationSchema<CelloAvro> {
> 
>    private final String schemaRegistryUrl;
>    private final int identityMapCapacity;
>    private KafkaAvroDecoder kafkaAvroDecoder;
> 
>    public ConfluentAvroDeserializationSchema(String schemaRegistyUrl) {
>        this(schemaRegistyUrl, 1000);
>    }
> 
>    public ConfluentAvroDeserializationSchema(String schemaRegistryUrl, int
> identityMapCapacity) {
>        this.schemaRegistryUrl = schemaRegistryUrl;
>        this.identityMapCapacity = identityMapCapacity;
>    }
> 
>    @Override
>    public CelloAvro deserialize(byte[] bytes) throws IOException {
>        if (kafkaAvroDecoder == null) {
>            SchemaRegistryClient schemaRegistry = new
> CachedSchemaRegistryClient(this.schemaRegistryUrl,
> this.identityMapCapacity);
>            this.kafkaAvroDecoder = new KafkaAvroDecoder(schemaRegistry);
>        }
>        return (CelloAvro) this.kafkaAvroDecoder.fromBytes(bytes);
>    }
> 
>    @Override
>    public boolean isEndOfStream(CelloAvro celloAvro) {
>        return false;
>    }
> 
>    @Override
>    public TypeInformation<CelloAvro> getProducedType() {
>        return TypeExtractor.getForClass(CelloAvro.class);
>    }
> }
> 
> My dependencies are:
> 
> <dependency>
> 			<groupId>org.apache.flink</groupId>
> 			<artifactId>flink-avro</artifactId>
> 			<version>${flink.version}</version>
> 		</dependency>
> 
> 		<dependency>
> 			<groupId>io.confluent</groupId>
> 			<artifactId>kafka-avro-serializer</artifactId>
> 			<version>4.0.0</version>
> 		</dependency>
> 
> 
> Could someone please help I see there is an open issue for an end to end
> test with  Confluent's Schema Registry
> 
> https://issues.apache.org/jira/browse/FLINK-8970
> 
> 
> 
> 
> 
> 
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/