You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by aj <aj...@gmail.com> on 2020/01/16 13:52:41 UTC

Flink ParquetAvroWriters Sink

Hi All,

I have a use case where I am getting a different set of Avro records in
Kafka. I am using the schema registry to store Avro schema. One topic can
also have different types of records.

Now I have created a GenericRecord Stream using kafkaAvroDeseralizer by
defining custom
Deserializer class like this

@Override
public GenericRecord deserialize(
byte[] messageKey, byte[] message, String topic, int partition, long
offset) {
checkInitialized();
return (GenericRecord) inner.deserialize(topic, message);
}

private void checkInitialized() {
if (inner == null) {
Map<String, Object> props = new HashMap<>();
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
registryUrl);
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, false);
SchemaRegistryClient client =
new CachedSchemaRegistryClient(
registryUrl, AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT);
inner = new KafkaAvroDeserializer(client, props);
}
}


And this is my consumer code :

DataStreamSource<GenericRecord> input = env
        .addSource(
                new FlinkKafkaConsumer010<GenericRecord>(topics,
                        new
KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
                        config).setStartFromEarliest());

Now I want to write this stream partition on
*event_name="a"/year=/month=/day=* in parquet format so that I can expose
hive tables directly on top of this data.
event_name is common field for all types of records that I am getting in
Kafka.
I am stuck as parquet writer needs a schema to write but my different
records have different schemas  So how do I write this stream in s3 in
above partition format.


Thanks & Regards,
Anuj Jain



<http://www.cse.iitm.ac.in/%7Eanujjain/>

Re: Flink ParquetAvroWriters Sink

Posted by aj <aj...@gmail.com>.
Hi Arvid,

I am not clear with this " Note that I still recommend to just bundle the
schema with your Flink application and not reinvent the wheel."

Can you please help with some sample code on how it should be written. Or
can we connect some way so that I can understand with you .


On Thu, Jan 23, 2020 at 2:09 PM Arvid Heise <ar...@ververica.com> wrote:

> The issue is that your are not providing any meaningful type information,
> so that Flink has to resort to Kryo. You need to extract the schema during
> query compilation (in your main) and pass it to your deserialization schema.
>
> public TypeInformation<T> getProducedType() {
>       return (TypeInformation<T>) new GenericRecordAvroTypeInfo(this.schema);
> }
>
> If you don't want to extract it statically you need to tell Flink how to
> handle arbitrary GenericRecords. You could implement your own serializer
> [1], which would write GenericRecords to byte[] and vice versa.
>
> Note that I still recommend to just bundle the schema with your Flink
> application and not reinvent the wheel.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/custom_serializers.html
>
> On Thu, Jan 23, 2020 at 2:22 AM aj <aj...@gmail.com> wrote:
>
>>  Hi Arvid,
>>
>> I want to keep generic records only and I do not want to keep the schema
>> definition on the consumer side and should be resolve from the schema
>> registry only. I am following the below post
>>
>>
>> https://stackoverflow.com/questions/58849635/is-it-possible-to-deserialize-avro-messageconsuming-message-from-kafka-without/59865360#59865360
>>
>> so please help me what is wrong with my code.
>>
>>
>>
>> On Thu, Jan 23, 2020, 00:38 Arvid Heise <ar...@ververica.com> wrote:
>>
>>> Hi Anuj,
>>>
>>> I recommend using the ConfluentRegistryAvroDeserializationSchema [1]
>>> with a specific record that has been generated with the Avro Maven Plugin
>>> [2] or Avro Gradle Plugin [3]. That should result into almost no code and
>>> maximal maintainability.
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/connectors/kafka.html#the-deserializationschema
>>> [2] https://avro.apache.org/docs/1.8.2/gettingstartedjava.html
>>> [3] https://github.com/davidmc24/gradle-avro-plugin
>>>
>>> On Wed, Jan 22, 2020 at 6:43 PM aj <aj...@gmail.com> wrote:
>>>
>>>> Hi Arvid,
>>>>
>>>> I have implemented the code with envelope schema as you suggested but
>>>> now I am facing issues with the consumer . I have written code like this:
>>>>
>>>> FlinkKafkaConsumer010 kafkaConsumer010 = new
>>>> FlinkKafkaConsumer010(KAFKA_TOPICS,
>>>>                 new
>>>> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
>>>>                 properties);
>>>>
>>>> And the Deserialization class looks like this :
>>>>
>>>> pblic class KafkaGenericAvroDeserializationSchema implements
>>>> KeyedDeserializationSchema<GenericRecord> {
>>>>
>>>>     private final String registryUrl;
>>>>     private transient KafkaAvroDeserializer inner;
>>>>
>>>>     public KafkaGenericAvroDeserializationSchema(String registryUrl) {
>>>>         this.registryUrl = registryUrl;
>>>>     }
>>>>
>>>>     @Override
>>>>     public GenericRecord deserialize(byte[] messageKey, byte[] message,
>>>> String topic, int partition, long offset) {
>>>>         checkInitialized();
>>>>         return (GenericRecord) inner.deserialize(topic, message);
>>>>     }
>>>>
>>>>     @Override
>>>>     public boolean isEndOfStream(GenericRecord nextElement) {
>>>>         return false;
>>>>     }
>>>>
>>>>     @Override
>>>>     public TypeInformation<GenericRecord> getProducedType() {
>>>>         return TypeExtractor.getForClass(GenericRecord.class);
>>>>     }
>>>>
>>>>     private void checkInitialized() {
>>>>         if (inner == null) {
>>>>             Map<String, Object> props = new HashMap<>();
>>>>
>>>> props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
>>>> registryUrl);
>>>>
>>>> props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, false);
>>>>             SchemaRegistryClient client =
>>>>                     new CachedSchemaRegistryClient(
>>>>                             registryUrl,
>>>> AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT);
>>>>             inner = new KafkaAvroDeserializer(client, props);
>>>>         }
>>>>     }
>>>> }
>>>>
>>>>
>>>> It's working locally on my machine but when I deployed it on yarn
>>>> cluster I am getting below exception:
>>>>
>>>>
>>>> java.lang.Exception: org.apache.flink.streaming.runtime.tasks.
>>>> ExceptionInChainedOperatorException: Could not forward element to next
>>>> operator
>>>>     at org.apache.flink.streaming.runtime.tasks.
>>>> SourceStreamTask$LegacySourceFunctionThread
>>>> .checkThrowSourceExecutionException(SourceStreamTask.java:212)
>>>>     at org.apache.flink.streaming.runtime.tasks.SourceStreamTask
>>>> .performDefaultAction(SourceStreamTask.java:132)
>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.run(
>>>> StreamTask.java:298)
>>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
>>>> StreamTask.java:403)
>>>>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>>>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>>>>     at java.lang.Thread.run(Thread.java:748)
>>>> Caused by: org.apache.flink.streaming.runtime.tasks.
>>>> ExceptionInChainedOperatorException: Could not forward element to next
>>>> operator
>>>>     at org.apache.flink.streaming.runtime.tasks.
>>>> OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:
>>>> 654)
>>>>     at org.apache.flink.streaming.runtime.tasks.
>>>> OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>>>>     at org.apache.flink.streaming.runtime.tasks.
>>>> OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
>>>>     at org.apache.flink.streaming.api.operators.
>>>> AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator
>>>> .java:727)
>>>>     at org.apache.flink.streaming.api.operators.
>>>> AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator
>>>> .java:705)
>>>>     at org.apache.flink.streaming.api.operators.
>>>> StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts
>>>> .java:104)
>>>>     at org.apache.flink.streaming.api.operators.
>>>> StreamSourceContexts$NonTimestampContext.collectWithTimestamp(
>>>> StreamSourceContexts.java:111)
>>>>     at org.apache.flink.streaming.connectors.kafka.internals.
>>>> AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
>>>>     at org.apache.flink.streaming.connectors.kafka.internal.
>>>> Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:91)
>>>>     at org.apache.flink.streaming.connectors.kafka.internal.
>>>> Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:156)
>>>>     at org.apache.flink.streaming.connectors.kafka.
>>>> FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
>>>>     at org.apache.flink.streaming.api.operators.StreamSource.run(
>>>> StreamSource.java:100)
>>>>     at org.apache.flink.streaming.api.operators.StreamSource.run(
>>>> StreamSource.java:63)
>>>>     at org.apache.flink.streaming.runtime.tasks.
>>>> SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:
>>>> 202)
>>>> Caused by: com.esotericsoftware.kryo.KryoException: Error constructing
>>>> instance of class: org.apache.avro.Schema$LockableArrayList
>>>> Serialization trace:
>>>> types (org.apache.avro.Schema$UnionSchema)
>>>> schema (org.apache.avro.Schema$Field)
>>>> fieldMap (org.apache.avro.Schema$RecordSchema)
>>>> schema (org.apache.avro.generic.GenericData$Record)
>>>>     at com.twitter.chill.Instantiators$$anon$1.newInstance(KryoBase
>>>> .scala:136)
>>>>     at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1061)
>>>>     at com.esotericsoftware.kryo.serializers.CollectionSerializer
>>>> .create(CollectionSerializer.java:89)
>>>>     at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(
>>>> CollectionSerializer.java:93)
>>>>     at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(
>>>> CollectionSerializer.java:22)
>>>>     at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>>>>     at com.esotericsoftware.kryo.serializers.ObjectField.read(
>>>> ObjectField.java:106)
>>>>     at com.esotericsoftware.kryo.serializers.FieldSerializer.read(
>>>> FieldSerializer.java:528)
>>>>     at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>>>>     at com.esotericsoftware.kryo.serializers.ObjectField.read(
>>>> ObjectField.java:106)
>>>>     at com.esotericsoftware.kryo.serializers.FieldSerializer.read(
>>>> FieldSerializer.java:528)
>>>>     at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>>>>     at com.esotericsoftware.kryo.serializers.MapSerializer.read(
>>>> MapSerializer.java:143)
>>>>     at com.esotericsoftware.kryo.serializers.MapSerializer.read(
>>>> MapSerializer.java:21)
>>>>     at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>>>>     at com.esotericsoftware.kryo.serializers.ObjectField.read(
>>>> ObjectField.java:106)
>>>>     at com.esotericsoftware.kryo.serializers.FieldSerializer.read(
>>>> FieldSerializer.java:528)
>>>>     at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>>>>     at com.esotericsoftware.kryo.serializers.ObjectField.read(
>>>> ObjectField.java:106)
>>>>     at com.esotericsoftware.kryo.serializers.FieldSerializer.read(
>>>> FieldSerializer.java:528)
>>>>     at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
>>>>     at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
>>>> .copy(KryoSerializer.java:262)
>>>>     at org.apache.flink.streaming.runtime.tasks.
>>>> OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:
>>>> 635)
>>>>     ... 13 more
>>>> Caused by: java.lang.IllegalAccessException: Class com.twitter.chill.
>>>> Instantiators$$anonfun$normalJava$1 can not access a member of class
>>>> org.apache.avro.Schema$LockableArrayList with modifiers "public"
>>>>
>>>> Please help me to resolve this issue.
>>>>
>>>> Thanks,
>>>> Anuj
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Mon, Jan 20, 2020 at 9:42 PM aj <aj...@gmail.com> wrote:
>>>>
>>>>> Thanks, Arvid for all the clarification. I will work on the approach
>>>>> you suggested.
>>>>>
>>>>> Thanks,
>>>>> Anuj
>>>>>
>>>>> On Sat, Jan 18, 2020 at 10:17 PM Arvid Heise <ar...@ververica.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Anuj,
>>>>>>
>>>>>> I think that there may be a fundamental misunderstanding about the
>>>>>> role of a schema registry in Kafka. So let me first clarify that.
>>>>>> In each Avro/Parquet file, all records have the same schema. The
>>>>>> schema is stored within the file, such that we can always retrieve the
>>>>>> writer schema for the records.
>>>>>> When Avro was first applied to Kafka, there was the basic question on
>>>>>> how the writer schema for any record is known to the consumer. Storing the
>>>>>> complete schema on each record would mean that each record would be much
>>>>>> larger than needed. Hence, they added the schema registry that assigns a
>>>>>> unique id to schema, which is then embedded into the records.
>>>>>> Now, whenever I update a schema in my producer, I would have old
>>>>>> records with the old schema id and new records with the new schema id.
>>>>>> In my consumer, I'd use a fixed reader schema, such that my
>>>>>> application would not need to worry if the record is written with old or
>>>>>> new schema; my consumer would only see records with the reader schema.
>>>>>>
>>>>>> Given that background information, you see that in general, it's
>>>>>> impossible with a generic approach to write the parquet with the same
>>>>>> schema as it has been written in Kafka: the parquet schema needs to be
>>>>>> supplied statically during query compilation while the actual used Avro
>>>>>> schema in Kafka is only known when actually consuming data.
>>>>>>
>>>>>> But looking further down the road:
>>>>>> * since you need one schema to write the parquet files, you'd need to
>>>>>> decide: do you want to write with the new or the old schema in case of a
>>>>>> schema update? That should also be the reader schema of your application
>>>>>> for a given event type.
>>>>>> * this decision has further implications: your application need to
>>>>>> extract exactly one specific version of the schema from the schema registry
>>>>>> at query compilation. That could be either a specific schema id or the
>>>>>> latest schema for the event type.
>>>>>> * that means that the output schema is locked until you restart your
>>>>>> application and fetch a new latest schema in case of an update.
>>>>>> * at that point, it might just be easier to use the approach that I
>>>>>> outlined previously by bundling a specific schema with your application.
>>>>>>
>>>>>> If you want to extract the latest schema for a subject:
>>>>>>
>>>>>> var registryClient = new CachedSchemaRegistryClient(<schemaRepoUrls>, 1000);
>>>>>> var versions = registryClient.getAllVersions(<subject>);
>>>>>> var schema = registryClient.getSchemaById(versions.get(versions.size() - 1));
>>>>>>
>>>>>>
>>>>>> On Sat, Jan 18, 2020 at 5:22 PM aj <aj...@gmail.com> wrote:
>>>>>>
>>>>>>> Thanks, Arvid.
>>>>>>>
>>>>>>> I do not fully understand the above approach,
>>>>>>> so currently, I am thinking to go with the envelope approach that
>>>>>>> you suggested.
>>>>>>>
>>>>>>> One more question I have if I do not want to keep schema in my
>>>>>>> consumer project even its a single envelope schema. I want it to be fetched
>>>>>>> from the schema registry and pass to my parquet-sink so that I always use
>>>>>>> the same schema that is used by the producer.  Can you provide a sample
>>>>>>> code how can i infer the schema from the generic record or get it from
>>>>>>> schema registry?
>>>>>>>
>>>>>>>
>>>>>>> Regards,
>>>>>>> Anuj
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Sat, Jan 18, 2020 at 6:55 PM Arvid Heise <ar...@ververica.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> (Readded user mailing list)
>>>>>>>>
>>>>>>>> Hi Anuj,
>>>>>>>>
>>>>>>>> since I'd still recommend going with distinct sources/sinks, let me
>>>>>>>> try to solve your issues in this mail. If that doesn't work out, I'd
>>>>>>>> address your concerns about the envelope format later.
>>>>>>>>
>>>>>>>> In Flink, you can have several subtopologies in the same
>>>>>>>> application.
>>>>>>>>
>>>>>>>> Thus, for each event type, you can add
>>>>>>>> AvroSource(eventType) -> generic transformation/validation ->
>>>>>>>> AvroSink(eventType)
>>>>>>>> for each event.
>>>>>>>>
>>>>>>>> I'd put all avro schema in one project and use an avro plugin to
>>>>>>>> generate the respective Java Classes. Then I'd simply create a map of Avro
>>>>>>>> Schema (GeneratedClassA::SCHEMA, GeneratedClassB::SCHEMA...) to topic name
>>>>>>>> (event-a, event-b, ...).
>>>>>>>> Next, I'd iterate over the list to add the respective subtopologies
>>>>>>>> to env.
>>>>>>>> Finally, execute everything.
>>>>>>>>
>>>>>>>> You have one project where all validations reside. But you'd have
>>>>>>>> almost no overhead to process a given source of eventType. The downside of
>>>>>>>> that approach is of course, that each new event type would require a
>>>>>>>> redeployment, but that seems like what you'd want to do anyhow.
>>>>>>>>
>>>>>>>> Best,
>>>>>>>>
>>>>>>>> Arvid
>>>>>>>>
>>>>>>>> On Sat, Jan 18, 2020 at 2:08 PM aj <aj...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Thanks, Arvid.
>>>>>>>>>
>>>>>>>>> 1. I like your approach as I can write a single consumer and put
>>>>>>>>> the data in S3 in parquet format. The only challenge is there are extra
>>>>>>>>> columns that always going to be null as at a time I will get one type of
>>>>>>>>> event.
>>>>>>>>>
>>>>>>>>> 2. if I go with a separate schema I am not sure how I can solve it
>>>>>>>>> using a single generalize consumer. Till now what my understanding is I
>>>>>>>>> have to write a consumer for each type of event. Each consumer will read
>>>>>>>>> the whole data then filter the respective events from this and then I can
>>>>>>>>> pass this stream to sink. But this does not look scalable solution as the
>>>>>>>>> new events keep growing then I have to write a consumer for each new type.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> DataStreamSource<GenericRecord> input = env
>>>>>>>>>         .addSource(
>>>>>>>>>                 new FlinkKafkaConsumer010<GenericRecord>(topics,
>>>>>>>>>                         new
>>>>>>>>> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
>>>>>>>>>                         config).setStartFromEarliest());
>>>>>>>>>
>>>>>>>>> Example :
>>>>>>>>>
>>>>>>>>> * 1st Consumer:*
>>>>>>>>>                   DataStreamSource<GenericRecord> input =
>>>>>>>>> env.addSource(
>>>>>>>>>                 new FlinkKafkaConsumer010<GenericRecord>(topics,
>>>>>>>>>                         new
>>>>>>>>> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
>>>>>>>>>                         config).setStartFromEarliest());
>>>>>>>>> *                 DataStream<GenericRecord> aInput =
>>>>>>>>> input.filter("event_name"= "a")*
>>>>>>>>>
>>>>>>>>> * 2nd Consumer:*
>>>>>>>>>   DataStreamSource<GenericRecord> input = env.addSource(
>>>>>>>>>                 new FlinkKafkaConsumer010<GenericRecord>(topics,
>>>>>>>>>                         new
>>>>>>>>> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
>>>>>>>>>                         config).setStartFromEarliest());
>>>>>>>>> *                 DataStream<GenericRecord> bInput =
>>>>>>>>> input.filter("event_name"= "b")*
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Can you help me How I solve this using a single consumer as I do
>>>>>>>>> not want to write a separate consumer for each type of schema?
>>>>>>>>>
>>>>>>>>> For example, this is my consumer that contains different types of
>>>>>>>>> records.
>>>>>>>>>
>>>>>>>>> DataStreamSource<GenericRecord> input = env
>>>>>>>>>         .addSource(
>>>>>>>>>                 new FlinkKafkaConsumer010<GenericRecord>(topics,
>>>>>>>>>                         new
>>>>>>>>> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
>>>>>>>>>                         config).setStartFromEarliest());
>>>>>>>>>
>>>>>>>>> Now I can not write this stream directly as there is no common
>>>>>>>>> schema of records in this stream. So possible way I am thinking is
>>>>>>>>>
>>>>>>>>> 1. Can I create multiple streams from this stream using the key by
>>>>>>>>> on *"event_name"  *and then write each stream separately.
>>>>>>>>>
>>>>>>>>> Just wanna know is this possible ??
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>> Anuj
>>>>>>>>>
>>>>>>>>> On Fri, Jan 17, 2020 at 3:20 PM Arvid Heise <ar...@ververica.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Anuj,
>>>>>>>>>>
>>>>>>>>>> I originally understood that you would like to store data in the
>>>>>>>>>> same Kafka topic and also want to store it in the same parquet file. In the
>>>>>>>>>> past, I mostly used schema registry with Kafka Streams, where you could
>>>>>>>>>> only store a schema for a key and value respectively. To use different
>>>>>>>>>> record types in the same kafka topic, you had to disable schema
>>>>>>>>>> compatibility checks and just stored the schemas as different versions
>>>>>>>>>> under the same subject.
>>>>>>>>>>
>>>>>>>>>> Your approach is much better. You can ensure full schema
>>>>>>>>>> compatibility. Nevertheless, it still shares the same drawback that
>>>>>>>>>> consumption is much harder (using GenericRecord is proof of that) if you
>>>>>>>>>> want to read/write everything into the same place. Also you will never be
>>>>>>>>>> able to write one consistent file, as they can only have one schema (both
>>>>>>>>>> on Avro and Parquet).
>>>>>>>>>> So you only have two options:
>>>>>>>>>> * keep schemas separated, but then you also need to write
>>>>>>>>>> separate files per record type.
>>>>>>>>>> * have a common schema (either my outlined approach or any other
>>>>>>>>>> wrapper schema).
>>>>>>>>>> The approach with a common schema makes only sense if you want to
>>>>>>>>>> write it into one table/kafka topic.
>>>>>>>>>>
>>>>>>>>>> However, in the last mail you pointed out that you actually want
>>>>>>>>>> to store the record types separately. Then, you should keep everything
>>>>>>>>>> separated. Then you should have a sink for each type each getting the
>>>>>>>>>> respective schema. Note that you'd need to fetch the schema manually from
>>>>>>>>>> the schema registry when creating the query as you would need to pass it to
>>>>>>>>>> the sink.
>>>>>>>>>>
>>>>>>>>>> Btw, do you actually have a need to write all events into one
>>>>>>>>>> Kafka topic? The only real use case is to preserve the time order per key.
>>>>>>>>>> Everything else is much more complicated then storing events individually.
>>>>>>>>>>
>>>>>>>>>> Best,
>>>>>>>>>>
>>>>>>>>>> Arvid
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Thu, Jan 16, 2020 at 3:39 PM aj <aj...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Arvid,
>>>>>>>>>>> Thanks for the quick response. I am new to this Avro design so
>>>>>>>>>>> can you please help me understand and design for my use case.
>>>>>>>>>>>
>>>>>>>>>>> I have use case like this :
>>>>>>>>>>>
>>>>>>>>>>> 1. we have an app where a lot of action happened from the user
>>>>>>>>>>> side.
>>>>>>>>>>> 2. for each action we collect some set of information that
>>>>>>>>>>> defined using some key-value pairs. This information we want to define as
>>>>>>>>>>> proper schemas so that we maintain the proper format and not push random
>>>>>>>>>>> data.
>>>>>>>>>>> 3. So we are defining for each action a schema and register in
>>>>>>>>>>> the schema registry using  topic+record.name as the subject .
>>>>>>>>>>> 4. So I do not think the producer side has any issue as whenever
>>>>>>>>>>> we push the event to Kafka we register a new schema with the above subject.
>>>>>>>>>>>
>>>>>>>>>>> Example :
>>>>>>>>>>>
>>>>>>>>>>> {
>>>>>>>>>>> event_name : "a"
>>>>>>>>>>> "timestamp":
>>>>>>>>>>> "properties"  :[
>>>>>>>>>>>   "key-1 : "val-1"
>>>>>>>>>>>    "key-2 : "val-2"
>>>>>>>>>>> ]
>>>>>>>>>>> }
>>>>>>>>>>>
>>>>>>>>>>> {
>>>>>>>>>>> event_name : "b"
>>>>>>>>>>> "timestamp":
>>>>>>>>>>> "properties"  :[
>>>>>>>>>>>   "key-3 : "val-3"
>>>>>>>>>>>    "key-4 : "val-4"
>>>>>>>>>>> ]
>>>>>>>>>>> }
>>>>>>>>>>>
>>>>>>>>>>> Now I  have a consumer that will parse the data by fetching the
>>>>>>>>>>> schema from schema registry and deserialize in the generic record streams.
>>>>>>>>>>>
>>>>>>>>>>> Why you think it will break as I am always deserializing with
>>>>>>>>>>> writer schema only.
>>>>>>>>>>>
>>>>>>>>>>> As you suggested to keep an envelope Avro schema and not
>>>>>>>>>>> separate schema for each type of event that I am generating. I have some
>>>>>>>>>>> doubts about that:
>>>>>>>>>>>
>>>>>>>>>>> 1. How I enforce a schema on each event as it subtypes in the
>>>>>>>>>>> main schema. so when I am getting a JSON event of type "a" how I enforce
>>>>>>>>>>> and convert it to subschema type of "a" and push to Kafka.
>>>>>>>>>>> 2. I want to create a separate hive table for each of the events
>>>>>>>>>>> so when I write this data and lets says I have 20 events than for 19
>>>>>>>>>>> columns I am getting null values always in data.
>>>>>>>>>>>
>>>>>>>>>>> Please help me in doing this right way. It will be a great help
>>>>>>>>>>> and learning for me.
>>>>>>>>>>>
>>>>>>>>>>> Thanks,
>>>>>>>>>>> Anuj
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Jan 16, 2020 at 7:30 PM Arvid Heise <ar...@ververica.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi Anuj,
>>>>>>>>>>>>
>>>>>>>>>>>> you should always avoid having records with different schemas
>>>>>>>>>>>> in the same topic/dataset. You will break the compatibility features of the
>>>>>>>>>>>> schema registry and your consumer/producer code is always hard to maintain.
>>>>>>>>>>>>
>>>>>>>>>>>> A common and scalable way to avoid it is to use some kind of
>>>>>>>>>>>> envelope format.
>>>>>>>>>>>>
>>>>>>>>>>>> {
>>>>>>>>>>>>   "namespace": "example",
>>>>>>>>>>>>   "name": "Envelope",
>>>>>>>>>>>>   "type": "record",
>>>>>>>>>>>>   "fields": [
>>>>>>>>>>>>     {
>>>>>>>>>>>>       "name": "type1",
>>>>>>>>>>>>       "type": ["null", {
>>>>>>>>>>>>         "type": "record",
>>>>>>>>>>>>         "fields": [ ... ]
>>>>>>>>>>>>       }],
>>>>>>>>>>>>       "default": null
>>>>>>>>>>>>     },
>>>>>>>>>>>>     {
>>>>>>>>>>>>       "name": "type2",
>>>>>>>>>>>>       "type": ["null", {
>>>>>>>>>>>>         "type": "record",
>>>>>>>>>>>>         "fields": [ ... ]
>>>>>>>>>>>>       }],
>>>>>>>>>>>>       "default": null
>>>>>>>>>>>>     }
>>>>>>>>>>>>   ]
>>>>>>>>>>>> }
>>>>>>>>>>>>
>>>>>>>>>>>> This envelope is evolvable (arbitrary addition/removal of
>>>>>>>>>>>> wrapped types, which by themselves can be evolved), and adds only a little
>>>>>>>>>>>> overhead (1 byte per subtype). The downside is that you cannot enforce that
>>>>>>>>>>>> exactly one of the subtypes is set.
>>>>>>>>>>>>
>>>>>>>>>>>> This schema is fully compatible with the schema registry, so no
>>>>>>>>>>>> need to parse anything manually.
>>>>>>>>>>>>
>>>>>>>>>>>> This schema can easily be used with Parquet. If you can't
>>>>>>>>>>>> change the input format anymore, you can at least use that approach on your
>>>>>>>>>>>> output.
>>>>>>>>>>>>
>>>>>>>>>>>> Best,
>>>>>>>>>>>>
>>>>>>>>>>>> Arvid
>>>>>>>>>>>>
>>>>>>>>>>>> On Thu, Jan 16, 2020 at 2:53 PM aj <aj...@gmail.com>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi All,
>>>>>>>>>>>>>
>>>>>>>>>>>>> I have a use case where I am getting a different set of Avro
>>>>>>>>>>>>> records in Kafka. I am using the schema registry to store Avro schema. One
>>>>>>>>>>>>> topic can also have different types of records.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Now I have created a GenericRecord Stream using
>>>>>>>>>>>>> kafkaAvroDeseralizer by defining custom
>>>>>>>>>>>>> Deserializer class like this
>>>>>>>>>>>>>
>>>>>>>>>>>>> @Override
>>>>>>>>>>>>> public GenericRecord deserialize(
>>>>>>>>>>>>> byte[] messageKey, byte[] message, String topic, int
>>>>>>>>>>>>> partition, long offset) {
>>>>>>>>>>>>> checkInitialized();
>>>>>>>>>>>>> return (GenericRecord) inner.deserialize(topic, message);
>>>>>>>>>>>>> }
>>>>>>>>>>>>>
>>>>>>>>>>>>> private void checkInitialized() {
>>>>>>>>>>>>> if (inner == null) {
>>>>>>>>>>>>> Map<String, Object> props = new HashMap<>();
>>>>>>>>>>>>> props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
>>>>>>>>>>>>> registryUrl);
>>>>>>>>>>>>> props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG,
>>>>>>>>>>>>> false);
>>>>>>>>>>>>> SchemaRegistryClient client =
>>>>>>>>>>>>> new CachedSchemaRegistryClient(
>>>>>>>>>>>>> registryUrl,
>>>>>>>>>>>>> AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT);
>>>>>>>>>>>>> inner = new KafkaAvroDeserializer(client, props);
>>>>>>>>>>>>> }
>>>>>>>>>>>>> }
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> And this is my consumer code :
>>>>>>>>>>>>>
>>>>>>>>>>>>> DataStreamSource<GenericRecord> input = env
>>>>>>>>>>>>>         .addSource(
>>>>>>>>>>>>>                 new
>>>>>>>>>>>>> FlinkKafkaConsumer010<GenericRecord>(topics,
>>>>>>>>>>>>>                         new
>>>>>>>>>>>>> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
>>>>>>>>>>>>>                         config).setStartFromEarliest());
>>>>>>>>>>>>>
>>>>>>>>>>>>> Now I want to write this stream partition on
>>>>>>>>>>>>> *event_name="a"/year=/month=/day=* in parquet format so that
>>>>>>>>>>>>> I can expose hive tables directly on top of this data.
>>>>>>>>>>>>> event_name is common field for all types of records that I am
>>>>>>>>>>>>> getting in Kafka.
>>>>>>>>>>>>> I am stuck as parquet writer needs a schema to write but my
>>>>>>>>>>>>> different records have different schemas  So how do I write this stream in
>>>>>>>>>>>>> s3 in above partition format.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks & Regards,
>>>>>>>>>>>>> Anuj Jain
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>> Thanks & Regards,
>>>>>>>>>>> Anuj Jain
>>>>>>>>>>> Mob. : +91- 8588817877
>>>>>>>>>>> Skype : anuj.jain07
>>>>>>>>>>> <http://www.oracle.com/>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Thanks & Regards,
>>>>>>>>> Anuj Jain
>>>>>>>>> Mob. : +91- 8588817877
>>>>>>>>> Skype : anuj.jain07
>>>>>>>>> <http://www.oracle.com/>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Thanks & Regards,
>>>>>>> Anuj Jain
>>>>>>> Mob. : +91- 8588817877
>>>>>>> Skype : anuj.jain07
>>>>>>> <http://www.oracle.com/>
>>>>>>>
>>>>>>>
>>>>>>> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>> Thanks & Regards,
>>>>> Anuj Jain
>>>>> Mob. : +91- 8588817877
>>>>> Skype : anuj.jain07
>>>>> <http://www.oracle.com/>
>>>>>
>>>>>
>>>>> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>>>>>
>>>>
>>>>
>>>> --
>>>> Thanks & Regards,
>>>> Anuj Jain
>>>> Mob. : +91- 8588817877
>>>> Skype : anuj.jain07
>>>> <http://www.oracle.com/>
>>>>
>>>>
>>>> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>>>>
>>>

-- 
Thanks & Regards,
Anuj Jain
Mob. : +91- 8588817877
Skype : anuj.jain07
<http://www.oracle.com/>


<http://www.cse.iitm.ac.in/%7Eanujjain/>

Re: Flink ParquetAvroWriters Sink

Posted by Arvid Heise <ar...@ververica.com>.
The issue is that your are not providing any meaningful type information,
so that Flink has to resort to Kryo. You need to extract the schema during
query compilation (in your main) and pass it to your deserialization schema.

public TypeInformation<T> getProducedType() {
      return (TypeInformation<T>) new GenericRecordAvroTypeInfo(this.schema);
}

If you don't want to extract it statically you need to tell Flink how to
handle arbitrary GenericRecords. You could implement your own serializer
[1], which would write GenericRecords to byte[] and vice versa.

Note that I still recommend to just bundle the schema with your Flink
application and not reinvent the wheel.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/custom_serializers.html

On Thu, Jan 23, 2020 at 2:22 AM aj <aj...@gmail.com> wrote:

>  Hi Arvid,
>
> I want to keep generic records only and I do not want to keep the schema
> definition on the consumer side and should be resolve from the schema
> registry only. I am following the below post
>
>
> https://stackoverflow.com/questions/58849635/is-it-possible-to-deserialize-avro-messageconsuming-message-from-kafka-without/59865360#59865360
>
> so please help me what is wrong with my code.
>
>
>
> On Thu, Jan 23, 2020, 00:38 Arvid Heise <ar...@ververica.com> wrote:
>
>> Hi Anuj,
>>
>> I recommend using the ConfluentRegistryAvroDeserializationSchema [1] with
>> a specific record that has been generated with the Avro Maven Plugin [2] or
>> Avro Gradle Plugin [3]. That should result into almost no code and maximal
>> maintainability.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/connectors/kafka.html#the-deserializationschema
>> [2] https://avro.apache.org/docs/1.8.2/gettingstartedjava.html
>> [3] https://github.com/davidmc24/gradle-avro-plugin
>>
>> On Wed, Jan 22, 2020 at 6:43 PM aj <aj...@gmail.com> wrote:
>>
>>> Hi Arvid,
>>>
>>> I have implemented the code with envelope schema as you suggested but
>>> now I am facing issues with the consumer . I have written code like this:
>>>
>>> FlinkKafkaConsumer010 kafkaConsumer010 = new
>>> FlinkKafkaConsumer010(KAFKA_TOPICS,
>>>                 new
>>> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
>>>                 properties);
>>>
>>> And the Deserialization class looks like this :
>>>
>>> pblic class KafkaGenericAvroDeserializationSchema implements
>>> KeyedDeserializationSchema<GenericRecord> {
>>>
>>>     private final String registryUrl;
>>>     private transient KafkaAvroDeserializer inner;
>>>
>>>     public KafkaGenericAvroDeserializationSchema(String registryUrl) {
>>>         this.registryUrl = registryUrl;
>>>     }
>>>
>>>     @Override
>>>     public GenericRecord deserialize(byte[] messageKey, byte[] message,
>>> String topic, int partition, long offset) {
>>>         checkInitialized();
>>>         return (GenericRecord) inner.deserialize(topic, message);
>>>     }
>>>
>>>     @Override
>>>     public boolean isEndOfStream(GenericRecord nextElement) {
>>>         return false;
>>>     }
>>>
>>>     @Override
>>>     public TypeInformation<GenericRecord> getProducedType() {
>>>         return TypeExtractor.getForClass(GenericRecord.class);
>>>     }
>>>
>>>     private void checkInitialized() {
>>>         if (inner == null) {
>>>             Map<String, Object> props = new HashMap<>();
>>>
>>> props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
>>> registryUrl);
>>>
>>> props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, false);
>>>             SchemaRegistryClient client =
>>>                     new CachedSchemaRegistryClient(
>>>                             registryUrl,
>>> AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT);
>>>             inner = new KafkaAvroDeserializer(client, props);
>>>         }
>>>     }
>>> }
>>>
>>>
>>> It's working locally on my machine but when I deployed it on yarn
>>> cluster I am getting below exception:
>>>
>>>
>>> java.lang.Exception: org.apache.flink.streaming.runtime.tasks.
>>> ExceptionInChainedOperatorException: Could not forward element to next
>>> operator
>>>     at org.apache.flink.streaming.runtime.tasks.
>>> SourceStreamTask$LegacySourceFunctionThread
>>> .checkThrowSourceExecutionException(SourceStreamTask.java:212)
>>>     at org.apache.flink.streaming.runtime.tasks.SourceStreamTask
>>> .performDefaultAction(SourceStreamTask.java:132)
>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.run(
>>> StreamTask.java:298)
>>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
>>> StreamTask.java:403)
>>>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>>>     at java.lang.Thread.run(Thread.java:748)
>>> Caused by: org.apache.flink.streaming.runtime.tasks.
>>> ExceptionInChainedOperatorException: Could not forward element to next
>>> operator
>>>     at org.apache.flink.streaming.runtime.tasks.
>>> OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:
>>> 654)
>>>     at org.apache.flink.streaming.runtime.tasks.
>>> OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>>>     at org.apache.flink.streaming.runtime.tasks.
>>> OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
>>>     at org.apache.flink.streaming.api.operators.
>>> AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator
>>> .java:727)
>>>     at org.apache.flink.streaming.api.operators.
>>> AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator
>>> .java:705)
>>>     at org.apache.flink.streaming.api.operators.
>>> StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts
>>> .java:104)
>>>     at org.apache.flink.streaming.api.operators.
>>> StreamSourceContexts$NonTimestampContext.collectWithTimestamp(
>>> StreamSourceContexts.java:111)
>>>     at org.apache.flink.streaming.connectors.kafka.internals.
>>> AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
>>>     at org.apache.flink.streaming.connectors.kafka.internal.
>>> Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:91)
>>>     at org.apache.flink.streaming.connectors.kafka.internal.
>>> Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:156)
>>>     at org.apache.flink.streaming.connectors.kafka.
>>> FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
>>>     at org.apache.flink.streaming.api.operators.StreamSource.run(
>>> StreamSource.java:100)
>>>     at org.apache.flink.streaming.api.operators.StreamSource.run(
>>> StreamSource.java:63)
>>>     at org.apache.flink.streaming.runtime.tasks.
>>> SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:
>>> 202)
>>> Caused by: com.esotericsoftware.kryo.KryoException: Error constructing
>>> instance of class: org.apache.avro.Schema$LockableArrayList
>>> Serialization trace:
>>> types (org.apache.avro.Schema$UnionSchema)
>>> schema (org.apache.avro.Schema$Field)
>>> fieldMap (org.apache.avro.Schema$RecordSchema)
>>> schema (org.apache.avro.generic.GenericData$Record)
>>>     at com.twitter.chill.Instantiators$$anon$1.newInstance(KryoBase
>>> .scala:136)
>>>     at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1061)
>>>     at com.esotericsoftware.kryo.serializers.CollectionSerializer
>>> .create(CollectionSerializer.java:89)
>>>     at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(
>>> CollectionSerializer.java:93)
>>>     at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(
>>> CollectionSerializer.java:22)
>>>     at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>>>     at com.esotericsoftware.kryo.serializers.ObjectField.read(
>>> ObjectField.java:106)
>>>     at com.esotericsoftware.kryo.serializers.FieldSerializer.read(
>>> FieldSerializer.java:528)
>>>     at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>>>     at com.esotericsoftware.kryo.serializers.ObjectField.read(
>>> ObjectField.java:106)
>>>     at com.esotericsoftware.kryo.serializers.FieldSerializer.read(
>>> FieldSerializer.java:528)
>>>     at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>>>     at com.esotericsoftware.kryo.serializers.MapSerializer.read(
>>> MapSerializer.java:143)
>>>     at com.esotericsoftware.kryo.serializers.MapSerializer.read(
>>> MapSerializer.java:21)
>>>     at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>>>     at com.esotericsoftware.kryo.serializers.ObjectField.read(
>>> ObjectField.java:106)
>>>     at com.esotericsoftware.kryo.serializers.FieldSerializer.read(
>>> FieldSerializer.java:528)
>>>     at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>>>     at com.esotericsoftware.kryo.serializers.ObjectField.read(
>>> ObjectField.java:106)
>>>     at com.esotericsoftware.kryo.serializers.FieldSerializer.read(
>>> FieldSerializer.java:528)
>>>     at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
>>>     at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
>>> .copy(KryoSerializer.java:262)
>>>     at org.apache.flink.streaming.runtime.tasks.
>>> OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:
>>> 635)
>>>     ... 13 more
>>> Caused by: java.lang.IllegalAccessException: Class com.twitter.chill.
>>> Instantiators$$anonfun$normalJava$1 can not access a member of class
>>> org.apache.avro.Schema$LockableArrayList with modifiers "public"
>>>
>>> Please help me to resolve this issue.
>>>
>>> Thanks,
>>> Anuj
>>>
>>>
>>>
>>>
>>>
>>> On Mon, Jan 20, 2020 at 9:42 PM aj <aj...@gmail.com> wrote:
>>>
>>>> Thanks, Arvid for all the clarification. I will work on the approach
>>>> you suggested.
>>>>
>>>> Thanks,
>>>> Anuj
>>>>
>>>> On Sat, Jan 18, 2020 at 10:17 PM Arvid Heise <ar...@ververica.com>
>>>> wrote:
>>>>
>>>>> Hi Anuj,
>>>>>
>>>>> I think that there may be a fundamental misunderstanding about the
>>>>> role of a schema registry in Kafka. So let me first clarify that.
>>>>> In each Avro/Parquet file, all records have the same schema. The
>>>>> schema is stored within the file, such that we can always retrieve the
>>>>> writer schema for the records.
>>>>> When Avro was first applied to Kafka, there was the basic question on
>>>>> how the writer schema for any record is known to the consumer. Storing the
>>>>> complete schema on each record would mean that each record would be much
>>>>> larger than needed. Hence, they added the schema registry that assigns a
>>>>> unique id to schema, which is then embedded into the records.
>>>>> Now, whenever I update a schema in my producer, I would have old
>>>>> records with the old schema id and new records with the new schema id.
>>>>> In my consumer, I'd use a fixed reader schema, such that my
>>>>> application would not need to worry if the record is written with old or
>>>>> new schema; my consumer would only see records with the reader schema.
>>>>>
>>>>> Given that background information, you see that in general, it's
>>>>> impossible with a generic approach to write the parquet with the same
>>>>> schema as it has been written in Kafka: the parquet schema needs to be
>>>>> supplied statically during query compilation while the actual used Avro
>>>>> schema in Kafka is only known when actually consuming data.
>>>>>
>>>>> But looking further down the road:
>>>>> * since you need one schema to write the parquet files, you'd need to
>>>>> decide: do you want to write with the new or the old schema in case of a
>>>>> schema update? That should also be the reader schema of your application
>>>>> for a given event type.
>>>>> * this decision has further implications: your application need to
>>>>> extract exactly one specific version of the schema from the schema registry
>>>>> at query compilation. That could be either a specific schema id or the
>>>>> latest schema for the event type.
>>>>> * that means that the output schema is locked until you restart your
>>>>> application and fetch a new latest schema in case of an update.
>>>>> * at that point, it might just be easier to use the approach that I
>>>>> outlined previously by bundling a specific schema with your application.
>>>>>
>>>>> If you want to extract the latest schema for a subject:
>>>>>
>>>>> var registryClient = new CachedSchemaRegistryClient(<schemaRepoUrls>, 1000);
>>>>> var versions = registryClient.getAllVersions(<subject>);
>>>>> var schema = registryClient.getSchemaById(versions.get(versions.size() - 1));
>>>>>
>>>>>
>>>>> On Sat, Jan 18, 2020 at 5:22 PM aj <aj...@gmail.com> wrote:
>>>>>
>>>>>> Thanks, Arvid.
>>>>>>
>>>>>> I do not fully understand the above approach,
>>>>>> so currently, I am thinking to go with the envelope approach that you
>>>>>> suggested.
>>>>>>
>>>>>> One more question I have if I do not want to keep schema in my
>>>>>> consumer project even its a single envelope schema. I want it to be fetched
>>>>>> from the schema registry and pass to my parquet-sink so that I always use
>>>>>> the same schema that is used by the producer.  Can you provide a sample
>>>>>> code how can i infer the schema from the generic record or get it from
>>>>>> schema registry?
>>>>>>
>>>>>>
>>>>>> Regards,
>>>>>> Anuj
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Sat, Jan 18, 2020 at 6:55 PM Arvid Heise <ar...@ververica.com>
>>>>>> wrote:
>>>>>>
>>>>>>> (Readded user mailing list)
>>>>>>>
>>>>>>> Hi Anuj,
>>>>>>>
>>>>>>> since I'd still recommend going with distinct sources/sinks, let me
>>>>>>> try to solve your issues in this mail. If that doesn't work out, I'd
>>>>>>> address your concerns about the envelope format later.
>>>>>>>
>>>>>>> In Flink, you can have several subtopologies in the same application.
>>>>>>>
>>>>>>> Thus, for each event type, you can add
>>>>>>> AvroSource(eventType) -> generic transformation/validation ->
>>>>>>> AvroSink(eventType)
>>>>>>> for each event.
>>>>>>>
>>>>>>> I'd put all avro schema in one project and use an avro plugin to
>>>>>>> generate the respective Java Classes. Then I'd simply create a map of Avro
>>>>>>> Schema (GeneratedClassA::SCHEMA, GeneratedClassB::SCHEMA...) to topic name
>>>>>>> (event-a, event-b, ...).
>>>>>>> Next, I'd iterate over the list to add the respective subtopologies
>>>>>>> to env.
>>>>>>> Finally, execute everything.
>>>>>>>
>>>>>>> You have one project where all validations reside. But you'd have
>>>>>>> almost no overhead to process a given source of eventType. The downside of
>>>>>>> that approach is of course, that each new event type would require a
>>>>>>> redeployment, but that seems like what you'd want to do anyhow.
>>>>>>>
>>>>>>> Best,
>>>>>>>
>>>>>>> Arvid
>>>>>>>
>>>>>>> On Sat, Jan 18, 2020 at 2:08 PM aj <aj...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Thanks, Arvid.
>>>>>>>>
>>>>>>>> 1. I like your approach as I can write a single consumer and put
>>>>>>>> the data in S3 in parquet format. The only challenge is there are extra
>>>>>>>> columns that always going to be null as at a time I will get one type of
>>>>>>>> event.
>>>>>>>>
>>>>>>>> 2. if I go with a separate schema I am not sure how I can solve it
>>>>>>>> using a single generalize consumer. Till now what my understanding is I
>>>>>>>> have to write a consumer for each type of event. Each consumer will read
>>>>>>>> the whole data then filter the respective events from this and then I can
>>>>>>>> pass this stream to sink. But this does not look scalable solution as the
>>>>>>>> new events keep growing then I have to write a consumer for each new type.
>>>>>>>>
>>>>>>>>
>>>>>>>> DataStreamSource<GenericRecord> input = env
>>>>>>>>         .addSource(
>>>>>>>>                 new FlinkKafkaConsumer010<GenericRecord>(topics,
>>>>>>>>                         new
>>>>>>>> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
>>>>>>>>                         config).setStartFromEarliest());
>>>>>>>>
>>>>>>>> Example :
>>>>>>>>
>>>>>>>> * 1st Consumer:*
>>>>>>>>                   DataStreamSource<GenericRecord> input =
>>>>>>>> env.addSource(
>>>>>>>>                 new FlinkKafkaConsumer010<GenericRecord>(topics,
>>>>>>>>                         new
>>>>>>>> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
>>>>>>>>                         config).setStartFromEarliest());
>>>>>>>> *                 DataStream<GenericRecord> aInput =
>>>>>>>> input.filter("event_name"= "a")*
>>>>>>>>
>>>>>>>> * 2nd Consumer:*
>>>>>>>>   DataStreamSource<GenericRecord> input = env.addSource(
>>>>>>>>                 new FlinkKafkaConsumer010<GenericRecord>(topics,
>>>>>>>>                         new
>>>>>>>> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
>>>>>>>>                         config).setStartFromEarliest());
>>>>>>>> *                 DataStream<GenericRecord> bInput =
>>>>>>>> input.filter("event_name"= "b")*
>>>>>>>>
>>>>>>>>
>>>>>>>> Can you help me How I solve this using a single consumer as I do
>>>>>>>> not want to write a separate consumer for each type of schema?
>>>>>>>>
>>>>>>>> For example, this is my consumer that contains different types of
>>>>>>>> records.
>>>>>>>>
>>>>>>>> DataStreamSource<GenericRecord> input = env
>>>>>>>>         .addSource(
>>>>>>>>                 new FlinkKafkaConsumer010<GenericRecord>(topics,
>>>>>>>>                         new
>>>>>>>> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
>>>>>>>>                         config).setStartFromEarliest());
>>>>>>>>
>>>>>>>> Now I can not write this stream directly as there is no common
>>>>>>>> schema of records in this stream. So possible way I am thinking is
>>>>>>>>
>>>>>>>> 1. Can I create multiple streams from this stream using the key by
>>>>>>>> on *"event_name"  *and then write each stream separately.
>>>>>>>>
>>>>>>>> Just wanna know is this possible ??
>>>>>>>>
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Anuj
>>>>>>>>
>>>>>>>> On Fri, Jan 17, 2020 at 3:20 PM Arvid Heise <ar...@ververica.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi Anuj,
>>>>>>>>>
>>>>>>>>> I originally understood that you would like to store data in the
>>>>>>>>> same Kafka topic and also want to store it in the same parquet file. In the
>>>>>>>>> past, I mostly used schema registry with Kafka Streams, where you could
>>>>>>>>> only store a schema for a key and value respectively. To use different
>>>>>>>>> record types in the same kafka topic, you had to disable schema
>>>>>>>>> compatibility checks and just stored the schemas as different versions
>>>>>>>>> under the same subject.
>>>>>>>>>
>>>>>>>>> Your approach is much better. You can ensure full schema
>>>>>>>>> compatibility. Nevertheless, it still shares the same drawback that
>>>>>>>>> consumption is much harder (using GenericRecord is proof of that) if you
>>>>>>>>> want to read/write everything into the same place. Also you will never be
>>>>>>>>> able to write one consistent file, as they can only have one schema (both
>>>>>>>>> on Avro and Parquet).
>>>>>>>>> So you only have two options:
>>>>>>>>> * keep schemas separated, but then you also need to write separate
>>>>>>>>> files per record type.
>>>>>>>>> * have a common schema (either my outlined approach or any other
>>>>>>>>> wrapper schema).
>>>>>>>>> The approach with a common schema makes only sense if you want to
>>>>>>>>> write it into one table/kafka topic.
>>>>>>>>>
>>>>>>>>> However, in the last mail you pointed out that you actually want
>>>>>>>>> to store the record types separately. Then, you should keep everything
>>>>>>>>> separated. Then you should have a sink for each type each getting the
>>>>>>>>> respective schema. Note that you'd need to fetch the schema manually from
>>>>>>>>> the schema registry when creating the query as you would need to pass it to
>>>>>>>>> the sink.
>>>>>>>>>
>>>>>>>>> Btw, do you actually have a need to write all events into one
>>>>>>>>> Kafka topic? The only real use case is to preserve the time order per key.
>>>>>>>>> Everything else is much more complicated then storing events individually.
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>>
>>>>>>>>> Arvid
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thu, Jan 16, 2020 at 3:39 PM aj <aj...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Arvid,
>>>>>>>>>> Thanks for the quick response. I am new to this Avro design so
>>>>>>>>>> can you please help me understand and design for my use case.
>>>>>>>>>>
>>>>>>>>>> I have use case like this :
>>>>>>>>>>
>>>>>>>>>> 1. we have an app where a lot of action happened from the user
>>>>>>>>>> side.
>>>>>>>>>> 2. for each action we collect some set of information that
>>>>>>>>>> defined using some key-value pairs. This information we want to define as
>>>>>>>>>> proper schemas so that we maintain the proper format and not push random
>>>>>>>>>> data.
>>>>>>>>>> 3. So we are defining for each action a schema and register in
>>>>>>>>>> the schema registry using  topic+record.name as the subject .
>>>>>>>>>> 4. So I do not think the producer side has any issue as whenever
>>>>>>>>>> we push the event to Kafka we register a new schema with the above subject.
>>>>>>>>>>
>>>>>>>>>> Example :
>>>>>>>>>>
>>>>>>>>>> {
>>>>>>>>>> event_name : "a"
>>>>>>>>>> "timestamp":
>>>>>>>>>> "properties"  :[
>>>>>>>>>>   "key-1 : "val-1"
>>>>>>>>>>    "key-2 : "val-2"
>>>>>>>>>> ]
>>>>>>>>>> }
>>>>>>>>>>
>>>>>>>>>> {
>>>>>>>>>> event_name : "b"
>>>>>>>>>> "timestamp":
>>>>>>>>>> "properties"  :[
>>>>>>>>>>   "key-3 : "val-3"
>>>>>>>>>>    "key-4 : "val-4"
>>>>>>>>>> ]
>>>>>>>>>> }
>>>>>>>>>>
>>>>>>>>>> Now I  have a consumer that will parse the data by fetching the
>>>>>>>>>> schema from schema registry and deserialize in the generic record streams.
>>>>>>>>>>
>>>>>>>>>> Why you think it will break as I am always deserializing with
>>>>>>>>>> writer schema only.
>>>>>>>>>>
>>>>>>>>>> As you suggested to keep an envelope Avro schema and not separate
>>>>>>>>>> schema for each type of event that I am generating. I have some doubts
>>>>>>>>>> about that:
>>>>>>>>>>
>>>>>>>>>> 1. How I enforce a schema on each event as it subtypes in the
>>>>>>>>>> main schema. so when I am getting a JSON event of type "a" how I enforce
>>>>>>>>>> and convert it to subschema type of "a" and push to Kafka.
>>>>>>>>>> 2. I want to create a separate hive table for each of the events
>>>>>>>>>> so when I write this data and lets says I have 20 events than for 19
>>>>>>>>>> columns I am getting null values always in data.
>>>>>>>>>>
>>>>>>>>>> Please help me in doing this right way. It will be a great help
>>>>>>>>>> and learning for me.
>>>>>>>>>>
>>>>>>>>>> Thanks,
>>>>>>>>>> Anuj
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Thu, Jan 16, 2020 at 7:30 PM Arvid Heise <ar...@ververica.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Anuj,
>>>>>>>>>>>
>>>>>>>>>>> you should always avoid having records with different schemas in
>>>>>>>>>>> the same topic/dataset. You will break the compatibility features of the
>>>>>>>>>>> schema registry and your consumer/producer code is always hard to maintain.
>>>>>>>>>>>
>>>>>>>>>>> A common and scalable way to avoid it is to use some kind of
>>>>>>>>>>> envelope format.
>>>>>>>>>>>
>>>>>>>>>>> {
>>>>>>>>>>>   "namespace": "example",
>>>>>>>>>>>   "name": "Envelope",
>>>>>>>>>>>   "type": "record",
>>>>>>>>>>>   "fields": [
>>>>>>>>>>>     {
>>>>>>>>>>>       "name": "type1",
>>>>>>>>>>>       "type": ["null", {
>>>>>>>>>>>         "type": "record",
>>>>>>>>>>>         "fields": [ ... ]
>>>>>>>>>>>       }],
>>>>>>>>>>>       "default": null
>>>>>>>>>>>     },
>>>>>>>>>>>     {
>>>>>>>>>>>       "name": "type2",
>>>>>>>>>>>       "type": ["null", {
>>>>>>>>>>>         "type": "record",
>>>>>>>>>>>         "fields": [ ... ]
>>>>>>>>>>>       }],
>>>>>>>>>>>       "default": null
>>>>>>>>>>>     }
>>>>>>>>>>>   ]
>>>>>>>>>>> }
>>>>>>>>>>>
>>>>>>>>>>> This envelope is evolvable (arbitrary addition/removal of
>>>>>>>>>>> wrapped types, which by themselves can be evolved), and adds only a little
>>>>>>>>>>> overhead (1 byte per subtype). The downside is that you cannot enforce that
>>>>>>>>>>> exactly one of the subtypes is set.
>>>>>>>>>>>
>>>>>>>>>>> This schema is fully compatible with the schema registry, so no
>>>>>>>>>>> need to parse anything manually.
>>>>>>>>>>>
>>>>>>>>>>> This schema can easily be used with Parquet. If you can't change
>>>>>>>>>>> the input format anymore, you can at least use that approach on your output.
>>>>>>>>>>>
>>>>>>>>>>> Best,
>>>>>>>>>>>
>>>>>>>>>>> Arvid
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Jan 16, 2020 at 2:53 PM aj <aj...@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi All,
>>>>>>>>>>>>
>>>>>>>>>>>> I have a use case where I am getting a different set of Avro
>>>>>>>>>>>> records in Kafka. I am using the schema registry to store Avro schema. One
>>>>>>>>>>>> topic can also have different types of records.
>>>>>>>>>>>>
>>>>>>>>>>>> Now I have created a GenericRecord Stream using
>>>>>>>>>>>> kafkaAvroDeseralizer by defining custom
>>>>>>>>>>>> Deserializer class like this
>>>>>>>>>>>>
>>>>>>>>>>>> @Override
>>>>>>>>>>>> public GenericRecord deserialize(
>>>>>>>>>>>> byte[] messageKey, byte[] message, String topic, int partition,
>>>>>>>>>>>> long offset) {
>>>>>>>>>>>> checkInitialized();
>>>>>>>>>>>> return (GenericRecord) inner.deserialize(topic, message);
>>>>>>>>>>>> }
>>>>>>>>>>>>
>>>>>>>>>>>> private void checkInitialized() {
>>>>>>>>>>>> if (inner == null) {
>>>>>>>>>>>> Map<String, Object> props = new HashMap<>();
>>>>>>>>>>>> props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
>>>>>>>>>>>> registryUrl);
>>>>>>>>>>>> props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG,
>>>>>>>>>>>> false);
>>>>>>>>>>>> SchemaRegistryClient client =
>>>>>>>>>>>> new CachedSchemaRegistryClient(
>>>>>>>>>>>> registryUrl,
>>>>>>>>>>>> AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT);
>>>>>>>>>>>> inner = new KafkaAvroDeserializer(client, props);
>>>>>>>>>>>> }
>>>>>>>>>>>> }
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> And this is my consumer code :
>>>>>>>>>>>>
>>>>>>>>>>>> DataStreamSource<GenericRecord> input = env
>>>>>>>>>>>>         .addSource(
>>>>>>>>>>>>                 new FlinkKafkaConsumer010<GenericRecord>(topics,
>>>>>>>>>>>>                         new
>>>>>>>>>>>> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
>>>>>>>>>>>>                         config).setStartFromEarliest());
>>>>>>>>>>>>
>>>>>>>>>>>> Now I want to write this stream partition on
>>>>>>>>>>>> *event_name="a"/year=/month=/day=* in parquet format so that I
>>>>>>>>>>>> can expose hive tables directly on top of this data.
>>>>>>>>>>>> event_name is common field for all types of records that I am
>>>>>>>>>>>> getting in Kafka.
>>>>>>>>>>>> I am stuck as parquet writer needs a schema to write but my
>>>>>>>>>>>> different records have different schemas  So how do I write this stream in
>>>>>>>>>>>> s3 in above partition format.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks & Regards,
>>>>>>>>>>>> Anuj Jain
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> Thanks & Regards,
>>>>>>>>>> Anuj Jain
>>>>>>>>>> Mob. : +91- 8588817877
>>>>>>>>>> Skype : anuj.jain07
>>>>>>>>>> <http://www.oracle.com/>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Thanks & Regards,
>>>>>>>> Anuj Jain
>>>>>>>> Mob. : +91- 8588817877
>>>>>>>> Skype : anuj.jain07
>>>>>>>> <http://www.oracle.com/>
>>>>>>>>
>>>>>>>>
>>>>>>>> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>> --
>>>>>> Thanks & Regards,
>>>>>> Anuj Jain
>>>>>> Mob. : +91- 8588817877
>>>>>> Skype : anuj.jain07
>>>>>> <http://www.oracle.com/>
>>>>>>
>>>>>>
>>>>>> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>>>>>>
>>>>>
>>>>
>>>> --
>>>> Thanks & Regards,
>>>> Anuj Jain
>>>> Mob. : +91- 8588817877
>>>> Skype : anuj.jain07
>>>> <http://www.oracle.com/>
>>>>
>>>>
>>>> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>>>>
>>>
>>>
>>> --
>>> Thanks & Regards,
>>> Anuj Jain
>>> Mob. : +91- 8588817877
>>> Skype : anuj.jain07
>>> <http://www.oracle.com/>
>>>
>>>
>>> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>>>
>>

Re: Flink ParquetAvroWriters Sink

Posted by aj <aj...@gmail.com>.
 Hi Arvid,

I want to keep generic records only and I do not want to keep the schema
definition on the consumer side and should be resolve from the schema
registry only. I am following the below post

https://stackoverflow.com/questions/58849635/is-it-possible-to-deserialize-avro-messageconsuming-message-from-kafka-without/59865360#59865360

so please help me what is wrong with my code.



On Thu, Jan 23, 2020, 00:38 Arvid Heise <ar...@ververica.com> wrote:

> Hi Anuj,
>
> I recommend using the ConfluentRegistryAvroDeserializationSchema [1] with
> a specific record that has been generated with the Avro Maven Plugin [2] or
> Avro Gradle Plugin [3]. That should result into almost no code and maximal
> maintainability.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/connectors/kafka.html#the-deserializationschema
> [2] https://avro.apache.org/docs/1.8.2/gettingstartedjava.html
> [3] https://github.com/davidmc24/gradle-avro-plugin
>
> On Wed, Jan 22, 2020 at 6:43 PM aj <aj...@gmail.com> wrote:
>
>> Hi Arvid,
>>
>> I have implemented the code with envelope schema as you suggested but now
>> I am facing issues with the consumer . I have written code like this:
>>
>> FlinkKafkaConsumer010 kafkaConsumer010 = new
>> FlinkKafkaConsumer010(KAFKA_TOPICS,
>>                 new
>> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
>>                 properties);
>>
>> And the Deserialization class looks like this :
>>
>> pblic class KafkaGenericAvroDeserializationSchema implements
>> KeyedDeserializationSchema<GenericRecord> {
>>
>>     private final String registryUrl;
>>     private transient KafkaAvroDeserializer inner;
>>
>>     public KafkaGenericAvroDeserializationSchema(String registryUrl) {
>>         this.registryUrl = registryUrl;
>>     }
>>
>>     @Override
>>     public GenericRecord deserialize(byte[] messageKey, byte[] message,
>> String topic, int partition, long offset) {
>>         checkInitialized();
>>         return (GenericRecord) inner.deserialize(topic, message);
>>     }
>>
>>     @Override
>>     public boolean isEndOfStream(GenericRecord nextElement) {
>>         return false;
>>     }
>>
>>     @Override
>>     public TypeInformation<GenericRecord> getProducedType() {
>>         return TypeExtractor.getForClass(GenericRecord.class);
>>     }
>>
>>     private void checkInitialized() {
>>         if (inner == null) {
>>             Map<String, Object> props = new HashMap<>();
>>
>> props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
>> registryUrl);
>>
>> props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, false);
>>             SchemaRegistryClient client =
>>                     new CachedSchemaRegistryClient(
>>                             registryUrl,
>> AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT);
>>             inner = new KafkaAvroDeserializer(client, props);
>>         }
>>     }
>> }
>>
>>
>> It's working locally on my machine but when I deployed it on yarn cluster
>> I am getting below exception:
>>
>>
>> java.lang.Exception: org.apache.flink.streaming.runtime.tasks.
>> ExceptionInChainedOperatorException: Could not forward element to next
>> operator
>>     at org.apache.flink.streaming.runtime.tasks.
>> SourceStreamTask$LegacySourceFunctionThread
>> .checkThrowSourceExecutionException(SourceStreamTask.java:212)
>>     at org.apache.flink.streaming.runtime.tasks.SourceStreamTask
>> .performDefaultAction(SourceStreamTask.java:132)
>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask
>> .java:298)
>>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
>> StreamTask.java:403)
>>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>>     at java.lang.Thread.run(Thread.java:748)
>> Caused by: org.apache.flink.streaming.runtime.tasks.
>> ExceptionInChainedOperatorException: Could not forward element to next
>> operator
>>     at org.apache.flink.streaming.runtime.tasks.
>> OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:654
>> )
>>     at org.apache.flink.streaming.runtime.tasks.
>> OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>>     at org.apache.flink.streaming.runtime.tasks.
>> OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
>>     at org.apache.flink.streaming.api.operators.
>> AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator
>> .java:727)
>>     at org.apache.flink.streaming.api.operators.
>> AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator
>> .java:705)
>>     at org.apache.flink.streaming.api.operators.
>> StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts
>> .java:104)
>>     at org.apache.flink.streaming.api.operators.
>> StreamSourceContexts$NonTimestampContext.collectWithTimestamp(
>> StreamSourceContexts.java:111)
>>     at org.apache.flink.streaming.connectors.kafka.internals.
>> AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
>>     at org.apache.flink.streaming.connectors.kafka.internal.
>> Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:91)
>>     at org.apache.flink.streaming.connectors.kafka.internal.
>> Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:156)
>>     at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase
>> .run(FlinkKafkaConsumerBase.java:715)
>>     at org.apache.flink.streaming.api.operators.StreamSource.run(
>> StreamSource.java:100)
>>     at org.apache.flink.streaming.api.operators.StreamSource.run(
>> StreamSource.java:63)
>>     at org.apache.flink.streaming.runtime.tasks.
>> SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:202
>> )
>> Caused by: com.esotericsoftware.kryo.KryoException: Error constructing
>> instance of class: org.apache.avro.Schema$LockableArrayList
>> Serialization trace:
>> types (org.apache.avro.Schema$UnionSchema)
>> schema (org.apache.avro.Schema$Field)
>> fieldMap (org.apache.avro.Schema$RecordSchema)
>> schema (org.apache.avro.generic.GenericData$Record)
>>     at com.twitter.chill.Instantiators$$anon$1.newInstance(KryoBase
>> .scala:136)
>>     at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1061)
>>     at com.esotericsoftware.kryo.serializers.CollectionSerializer.create(
>> CollectionSerializer.java:89)
>>     at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(
>> CollectionSerializer.java:93)
>>     at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(
>> CollectionSerializer.java:22)
>>     at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>>     at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField
>> .java:106)
>>     at com.esotericsoftware.kryo.serializers.FieldSerializer.read(
>> FieldSerializer.java:528)
>>     at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>>     at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField
>> .java:106)
>>     at com.esotericsoftware.kryo.serializers.FieldSerializer.read(
>> FieldSerializer.java:528)
>>     at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>>     at com.esotericsoftware.kryo.serializers.MapSerializer.read(
>> MapSerializer.java:143)
>>     at com.esotericsoftware.kryo.serializers.MapSerializer.read(
>> MapSerializer.java:21)
>>     at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>>     at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField
>> .java:106)
>>     at com.esotericsoftware.kryo.serializers.FieldSerializer.read(
>> FieldSerializer.java:528)
>>     at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>>     at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField
>> .java:106)
>>     at com.esotericsoftware.kryo.serializers.FieldSerializer.read(
>> FieldSerializer.java:528)
>>     at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
>>     at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
>> .copy(KryoSerializer.java:262)
>>     at org.apache.flink.streaming.runtime.tasks.
>> OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:635
>> )
>>     ... 13 more
>> Caused by: java.lang.IllegalAccessException: Class com.twitter.chill.
>> Instantiators$$anonfun$normalJava$1 can not access a member of class
>> org.apache.avro.Schema$LockableArrayList with modifiers "public"
>>
>> Please help me to resolve this issue.
>>
>> Thanks,
>> Anuj
>>
>>
>>
>>
>>
>> On Mon, Jan 20, 2020 at 9:42 PM aj <aj...@gmail.com> wrote:
>>
>>> Thanks, Arvid for all the clarification. I will work on the approach you
>>> suggested.
>>>
>>> Thanks,
>>> Anuj
>>>
>>> On Sat, Jan 18, 2020 at 10:17 PM Arvid Heise <ar...@ververica.com>
>>> wrote:
>>>
>>>> Hi Anuj,
>>>>
>>>> I think that there may be a fundamental misunderstanding about the role
>>>> of a schema registry in Kafka. So let me first clarify that.
>>>> In each Avro/Parquet file, all records have the same schema. The schema
>>>> is stored within the file, such that we can always retrieve the writer
>>>> schema for the records.
>>>> When Avro was first applied to Kafka, there was the basic question on
>>>> how the writer schema for any record is known to the consumer. Storing the
>>>> complete schema on each record would mean that each record would be much
>>>> larger than needed. Hence, they added the schema registry that assigns a
>>>> unique id to schema, which is then embedded into the records.
>>>> Now, whenever I update a schema in my producer, I would have old
>>>> records with the old schema id and new records with the new schema id.
>>>> In my consumer, I'd use a fixed reader schema, such that my application
>>>> would not need to worry if the record is written with old or new schema; my
>>>> consumer would only see records with the reader schema.
>>>>
>>>> Given that background information, you see that in general, it's
>>>> impossible with a generic approach to write the parquet with the same
>>>> schema as it has been written in Kafka: the parquet schema needs to be
>>>> supplied statically during query compilation while the actual used Avro
>>>> schema in Kafka is only known when actually consuming data.
>>>>
>>>> But looking further down the road:
>>>> * since you need one schema to write the parquet files, you'd need to
>>>> decide: do you want to write with the new or the old schema in case of a
>>>> schema update? That should also be the reader schema of your application
>>>> for a given event type.
>>>> * this decision has further implications: your application need to
>>>> extract exactly one specific version of the schema from the schema registry
>>>> at query compilation. That could be either a specific schema id or the
>>>> latest schema for the event type.
>>>> * that means that the output schema is locked until you restart your
>>>> application and fetch a new latest schema in case of an update.
>>>> * at that point, it might just be easier to use the approach that I
>>>> outlined previously by bundling a specific schema with your application.
>>>>
>>>> If you want to extract the latest schema for a subject:
>>>>
>>>> var registryClient = new CachedSchemaRegistryClient(<schemaRepoUrls>, 1000);
>>>> var versions = registryClient.getAllVersions(<subject>);
>>>> var schema = registryClient.getSchemaById(versions.get(versions.size() - 1));
>>>>
>>>>
>>>> On Sat, Jan 18, 2020 at 5:22 PM aj <aj...@gmail.com> wrote:
>>>>
>>>>> Thanks, Arvid.
>>>>>
>>>>> I do not fully understand the above approach,
>>>>> so currently, I am thinking to go with the envelope approach that you
>>>>> suggested.
>>>>>
>>>>> One more question I have if I do not want to keep schema in my
>>>>> consumer project even its a single envelope schema. I want it to be fetched
>>>>> from the schema registry and pass to my parquet-sink so that I always use
>>>>> the same schema that is used by the producer.  Can you provide a sample
>>>>> code how can i infer the schema from the generic record or get it from
>>>>> schema registry?
>>>>>
>>>>>
>>>>> Regards,
>>>>> Anuj
>>>>>
>>>>>
>>>>>
>>>>> On Sat, Jan 18, 2020 at 6:55 PM Arvid Heise <ar...@ververica.com>
>>>>> wrote:
>>>>>
>>>>>> (Readded user mailing list)
>>>>>>
>>>>>> Hi Anuj,
>>>>>>
>>>>>> since I'd still recommend going with distinct sources/sinks, let me
>>>>>> try to solve your issues in this mail. If that doesn't work out, I'd
>>>>>> address your concerns about the envelope format later.
>>>>>>
>>>>>> In Flink, you can have several subtopologies in the same application.
>>>>>>
>>>>>> Thus, for each event type, you can add
>>>>>> AvroSource(eventType) -> generic transformation/validation ->
>>>>>> AvroSink(eventType)
>>>>>> for each event.
>>>>>>
>>>>>> I'd put all avro schema in one project and use an avro plugin to
>>>>>> generate the respective Java Classes. Then I'd simply create a map of Avro
>>>>>> Schema (GeneratedClassA::SCHEMA, GeneratedClassB::SCHEMA...) to topic name
>>>>>> (event-a, event-b, ...).
>>>>>> Next, I'd iterate over the list to add the respective subtopologies
>>>>>> to env.
>>>>>> Finally, execute everything.
>>>>>>
>>>>>> You have one project where all validations reside. But you'd have
>>>>>> almost no overhead to process a given source of eventType. The downside of
>>>>>> that approach is of course, that each new event type would require a
>>>>>> redeployment, but that seems like what you'd want to do anyhow.
>>>>>>
>>>>>> Best,
>>>>>>
>>>>>> Arvid
>>>>>>
>>>>>> On Sat, Jan 18, 2020 at 2:08 PM aj <aj...@gmail.com> wrote:
>>>>>>
>>>>>>> Thanks, Arvid.
>>>>>>>
>>>>>>> 1. I like your approach as I can write a single consumer and put the
>>>>>>> data in S3 in parquet format. The only challenge is there are extra columns
>>>>>>> that always going to be null as at a time I will get one type of event.
>>>>>>>
>>>>>>> 2. if I go with a separate schema I am not sure how I can solve it
>>>>>>> using a single generalize consumer. Till now what my understanding is I
>>>>>>> have to write a consumer for each type of event. Each consumer will read
>>>>>>> the whole data then filter the respective events from this and then I can
>>>>>>> pass this stream to sink. But this does not look scalable solution as the
>>>>>>> new events keep growing then I have to write a consumer for each new type.
>>>>>>>
>>>>>>>
>>>>>>> DataStreamSource<GenericRecord> input = env
>>>>>>>         .addSource(
>>>>>>>                 new FlinkKafkaConsumer010<GenericRecord>(topics,
>>>>>>>                         new
>>>>>>> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
>>>>>>>                         config).setStartFromEarliest());
>>>>>>>
>>>>>>> Example :
>>>>>>>
>>>>>>> * 1st Consumer:*
>>>>>>>                   DataStreamSource<GenericRecord> input =
>>>>>>> env.addSource(
>>>>>>>                 new FlinkKafkaConsumer010<GenericRecord>(topics,
>>>>>>>                         new
>>>>>>> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
>>>>>>>                         config).setStartFromEarliest());
>>>>>>> *                 DataStream<GenericRecord> aInput =
>>>>>>> input.filter("event_name"= "a")*
>>>>>>>
>>>>>>> * 2nd Consumer:*
>>>>>>>   DataStreamSource<GenericRecord> input = env.addSource(
>>>>>>>                 new FlinkKafkaConsumer010<GenericRecord>(topics,
>>>>>>>                         new
>>>>>>> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
>>>>>>>                         config).setStartFromEarliest());
>>>>>>> *                 DataStream<GenericRecord> bInput =
>>>>>>> input.filter("event_name"= "b")*
>>>>>>>
>>>>>>>
>>>>>>> Can you help me How I solve this using a single consumer as I do not
>>>>>>> want to write a separate consumer for each type of schema?
>>>>>>>
>>>>>>> For example, this is my consumer that contains different types of
>>>>>>> records.
>>>>>>>
>>>>>>> DataStreamSource<GenericRecord> input = env
>>>>>>>         .addSource(
>>>>>>>                 new FlinkKafkaConsumer010<GenericRecord>(topics,
>>>>>>>                         new
>>>>>>> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
>>>>>>>                         config).setStartFromEarliest());
>>>>>>>
>>>>>>> Now I can not write this stream directly as there is no common
>>>>>>> schema of records in this stream. So possible way I am thinking is
>>>>>>>
>>>>>>> 1. Can I create multiple streams from this stream using the key by
>>>>>>> on *"event_name"  *and then write each stream separately.
>>>>>>>
>>>>>>> Just wanna know is this possible ??
>>>>>>>
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Anuj
>>>>>>>
>>>>>>> On Fri, Jan 17, 2020 at 3:20 PM Arvid Heise <ar...@ververica.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Anuj,
>>>>>>>>
>>>>>>>> I originally understood that you would like to store data in the
>>>>>>>> same Kafka topic and also want to store it in the same parquet file. In the
>>>>>>>> past, I mostly used schema registry with Kafka Streams, where you could
>>>>>>>> only store a schema for a key and value respectively. To use different
>>>>>>>> record types in the same kafka topic, you had to disable schema
>>>>>>>> compatibility checks and just stored the schemas as different versions
>>>>>>>> under the same subject.
>>>>>>>>
>>>>>>>> Your approach is much better. You can ensure full schema
>>>>>>>> compatibility. Nevertheless, it still shares the same drawback that
>>>>>>>> consumption is much harder (using GenericRecord is proof of that) if you
>>>>>>>> want to read/write everything into the same place. Also you will never be
>>>>>>>> able to write one consistent file, as they can only have one schema (both
>>>>>>>> on Avro and Parquet).
>>>>>>>> So you only have two options:
>>>>>>>> * keep schemas separated, but then you also need to write separate
>>>>>>>> files per record type.
>>>>>>>> * have a common schema (either my outlined approach or any other
>>>>>>>> wrapper schema).
>>>>>>>> The approach with a common schema makes only sense if you want to
>>>>>>>> write it into one table/kafka topic.
>>>>>>>>
>>>>>>>> However, in the last mail you pointed out that you actually want to
>>>>>>>> store the record types separately. Then, you should keep everything
>>>>>>>> separated. Then you should have a sink for each type each getting the
>>>>>>>> respective schema. Note that you'd need to fetch the schema manually from
>>>>>>>> the schema registry when creating the query as you would need to pass it to
>>>>>>>> the sink.
>>>>>>>>
>>>>>>>> Btw, do you actually have a need to write all events into one Kafka
>>>>>>>> topic? The only real use case is to preserve the time order per key.
>>>>>>>> Everything else is much more complicated then storing events individually.
>>>>>>>>
>>>>>>>> Best,
>>>>>>>>
>>>>>>>> Arvid
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Jan 16, 2020 at 3:39 PM aj <aj...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hi Arvid,
>>>>>>>>> Thanks for the quick response. I am new to this Avro design so can
>>>>>>>>> you please help me understand and design for my use case.
>>>>>>>>>
>>>>>>>>> I have use case like this :
>>>>>>>>>
>>>>>>>>> 1. we have an app where a lot of action happened from the user
>>>>>>>>> side.
>>>>>>>>> 2. for each action we collect some set of information that
>>>>>>>>> defined using some key-value pairs. This information we want to define as
>>>>>>>>> proper schemas so that we maintain the proper format and not push random
>>>>>>>>> data.
>>>>>>>>> 3. So we are defining for each action a schema and register in the
>>>>>>>>> schema registry using  topic+record.name as the subject .
>>>>>>>>> 4. So I do not think the producer side has any issue as whenever
>>>>>>>>> we push the event to Kafka we register a new schema with the above subject.
>>>>>>>>>
>>>>>>>>> Example :
>>>>>>>>>
>>>>>>>>> {
>>>>>>>>> event_name : "a"
>>>>>>>>> "timestamp":
>>>>>>>>> "properties"  :[
>>>>>>>>>   "key-1 : "val-1"
>>>>>>>>>    "key-2 : "val-2"
>>>>>>>>> ]
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>> {
>>>>>>>>> event_name : "b"
>>>>>>>>> "timestamp":
>>>>>>>>> "properties"  :[
>>>>>>>>>   "key-3 : "val-3"
>>>>>>>>>    "key-4 : "val-4"
>>>>>>>>> ]
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>> Now I  have a consumer that will parse the data by fetching the
>>>>>>>>> schema from schema registry and deserialize in the generic record streams.
>>>>>>>>>
>>>>>>>>> Why you think it will break as I am always deserializing with
>>>>>>>>> writer schema only.
>>>>>>>>>
>>>>>>>>> As you suggested to keep an envelope Avro schema and not separate
>>>>>>>>> schema for each type of event that I am generating. I have some doubts
>>>>>>>>> about that:
>>>>>>>>>
>>>>>>>>> 1. How I enforce a schema on each event as it subtypes in the main
>>>>>>>>> schema. so when I am getting a JSON event of type "a" how I enforce and
>>>>>>>>> convert it to subschema type of "a" and push to Kafka.
>>>>>>>>> 2. I want to create a separate hive table for each of the events
>>>>>>>>> so when I write this data and lets says I have 20 events than for 19
>>>>>>>>> columns I am getting null values always in data.
>>>>>>>>>
>>>>>>>>> Please help me in doing this right way. It will be a great help
>>>>>>>>> and learning for me.
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>> Anuj
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thu, Jan 16, 2020 at 7:30 PM Arvid Heise <ar...@ververica.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Anuj,
>>>>>>>>>>
>>>>>>>>>> you should always avoid having records with different schemas in
>>>>>>>>>> the same topic/dataset. You will break the compatibility features of the
>>>>>>>>>> schema registry and your consumer/producer code is always hard to maintain.
>>>>>>>>>>
>>>>>>>>>> A common and scalable way to avoid it is to use some kind of
>>>>>>>>>> envelope format.
>>>>>>>>>>
>>>>>>>>>> {
>>>>>>>>>>   "namespace": "example",
>>>>>>>>>>   "name": "Envelope",
>>>>>>>>>>   "type": "record",
>>>>>>>>>>   "fields": [
>>>>>>>>>>     {
>>>>>>>>>>       "name": "type1",
>>>>>>>>>>       "type": ["null", {
>>>>>>>>>>         "type": "record",
>>>>>>>>>>         "fields": [ ... ]
>>>>>>>>>>       }],
>>>>>>>>>>       "default": null
>>>>>>>>>>     },
>>>>>>>>>>     {
>>>>>>>>>>       "name": "type2",
>>>>>>>>>>       "type": ["null", {
>>>>>>>>>>         "type": "record",
>>>>>>>>>>         "fields": [ ... ]
>>>>>>>>>>       }],
>>>>>>>>>>       "default": null
>>>>>>>>>>     }
>>>>>>>>>>   ]
>>>>>>>>>> }
>>>>>>>>>>
>>>>>>>>>> This envelope is evolvable (arbitrary addition/removal of wrapped
>>>>>>>>>> types, which by themselves can be evolved), and adds only a little overhead
>>>>>>>>>> (1 byte per subtype). The downside is that you cannot enforce that exactly
>>>>>>>>>> one of the subtypes is set.
>>>>>>>>>>
>>>>>>>>>> This schema is fully compatible with the schema registry, so no
>>>>>>>>>> need to parse anything manually.
>>>>>>>>>>
>>>>>>>>>> This schema can easily be used with Parquet. If you can't change
>>>>>>>>>> the input format anymore, you can at least use that approach on your output.
>>>>>>>>>>
>>>>>>>>>> Best,
>>>>>>>>>>
>>>>>>>>>> Arvid
>>>>>>>>>>
>>>>>>>>>> On Thu, Jan 16, 2020 at 2:53 PM aj <aj...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi All,
>>>>>>>>>>>
>>>>>>>>>>> I have a use case where I am getting a different set of Avro
>>>>>>>>>>> records in Kafka. I am using the schema registry to store Avro schema. One
>>>>>>>>>>> topic can also have different types of records.
>>>>>>>>>>>
>>>>>>>>>>> Now I have created a GenericRecord Stream using
>>>>>>>>>>> kafkaAvroDeseralizer by defining custom
>>>>>>>>>>> Deserializer class like this
>>>>>>>>>>>
>>>>>>>>>>> @Override
>>>>>>>>>>> public GenericRecord deserialize(
>>>>>>>>>>> byte[] messageKey, byte[] message, String topic, int partition,
>>>>>>>>>>> long offset) {
>>>>>>>>>>> checkInitialized();
>>>>>>>>>>> return (GenericRecord) inner.deserialize(topic, message);
>>>>>>>>>>> }
>>>>>>>>>>>
>>>>>>>>>>> private void checkInitialized() {
>>>>>>>>>>> if (inner == null) {
>>>>>>>>>>> Map<String, Object> props = new HashMap<>();
>>>>>>>>>>> props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
>>>>>>>>>>> registryUrl);
>>>>>>>>>>> props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG,
>>>>>>>>>>> false);
>>>>>>>>>>> SchemaRegistryClient client =
>>>>>>>>>>> new CachedSchemaRegistryClient(
>>>>>>>>>>> registryUrl,
>>>>>>>>>>> AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT);
>>>>>>>>>>> inner = new KafkaAvroDeserializer(client, props);
>>>>>>>>>>> }
>>>>>>>>>>> }
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> And this is my consumer code :
>>>>>>>>>>>
>>>>>>>>>>> DataStreamSource<GenericRecord> input = env
>>>>>>>>>>>         .addSource(
>>>>>>>>>>>                 new FlinkKafkaConsumer010<GenericRecord>(topics,
>>>>>>>>>>>                         new
>>>>>>>>>>> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
>>>>>>>>>>>                         config).setStartFromEarliest());
>>>>>>>>>>>
>>>>>>>>>>> Now I want to write this stream partition on
>>>>>>>>>>> *event_name="a"/year=/month=/day=* in parquet format so that I
>>>>>>>>>>> can expose hive tables directly on top of this data.
>>>>>>>>>>> event_name is common field for all types of records that I am
>>>>>>>>>>> getting in Kafka.
>>>>>>>>>>> I am stuck as parquet writer needs a schema to write but my
>>>>>>>>>>> different records have different schemas  So how do I write this stream in
>>>>>>>>>>> s3 in above partition format.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Thanks & Regards,
>>>>>>>>>>> Anuj Jain
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Thanks & Regards,
>>>>>>>>> Anuj Jain
>>>>>>>>> Mob. : +91- 8588817877
>>>>>>>>> Skype : anuj.jain07
>>>>>>>>> <http://www.oracle.com/>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Thanks & Regards,
>>>>>>> Anuj Jain
>>>>>>> Mob. : +91- 8588817877
>>>>>>> Skype : anuj.jain07
>>>>>>> <http://www.oracle.com/>
>>>>>>>
>>>>>>>
>>>>>>> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>> Thanks & Regards,
>>>>> Anuj Jain
>>>>> Mob. : +91- 8588817877
>>>>> Skype : anuj.jain07
>>>>> <http://www.oracle.com/>
>>>>>
>>>>>
>>>>> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>>>>>
>>>>
>>>
>>> --
>>> Thanks & Regards,
>>> Anuj Jain
>>> Mob. : +91- 8588817877
>>> Skype : anuj.jain07
>>> <http://www.oracle.com/>
>>>
>>>
>>> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>>>
>>
>>
>> --
>> Thanks & Regards,
>> Anuj Jain
>> Mob. : +91- 8588817877
>> Skype : anuj.jain07
>> <http://www.oracle.com/>
>>
>>
>> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>>
>

Re: Flink ParquetAvroWriters Sink

Posted by Arvid Heise <ar...@ververica.com>.
Hi Anuj,

I recommend using the ConfluentRegistryAvroDeserializationSchema [1] with a
specific record that has been generated with the Avro Maven Plugin [2] or
Avro Gradle Plugin [3]. That should result into almost no code and maximal
maintainability.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/connectors/kafka.html#the-deserializationschema
[2] https://avro.apache.org/docs/1.8.2/gettingstartedjava.html
[3] https://github.com/davidmc24/gradle-avro-plugin

On Wed, Jan 22, 2020 at 6:43 PM aj <aj...@gmail.com> wrote:

> Hi Arvid,
>
> I have implemented the code with envelope schema as you suggested but now
> I am facing issues with the consumer . I have written code like this:
>
> FlinkKafkaConsumer010 kafkaConsumer010 = new
> FlinkKafkaConsumer010(KAFKA_TOPICS,
>                 new
> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
>                 properties);
>
> And the Deserialization class looks like this :
>
> pblic class KafkaGenericAvroDeserializationSchema implements
> KeyedDeserializationSchema<GenericRecord> {
>
>     private final String registryUrl;
>     private transient KafkaAvroDeserializer inner;
>
>     public KafkaGenericAvroDeserializationSchema(String registryUrl) {
>         this.registryUrl = registryUrl;
>     }
>
>     @Override
>     public GenericRecord deserialize(byte[] messageKey, byte[] message,
> String topic, int partition, long offset) {
>         checkInitialized();
>         return (GenericRecord) inner.deserialize(topic, message);
>     }
>
>     @Override
>     public boolean isEndOfStream(GenericRecord nextElement) {
>         return false;
>     }
>
>     @Override
>     public TypeInformation<GenericRecord> getProducedType() {
>         return TypeExtractor.getForClass(GenericRecord.class);
>     }
>
>     private void checkInitialized() {
>         if (inner == null) {
>             Map<String, Object> props = new HashMap<>();
>
> props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
> registryUrl);
>
> props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, false);
>             SchemaRegistryClient client =
>                     new CachedSchemaRegistryClient(
>                             registryUrl,
> AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT);
>             inner = new KafkaAvroDeserializer(client, props);
>         }
>     }
> }
>
>
> It's working locally on my machine but when I deployed it on yarn cluster
> I am getting below exception:
>
>
> java.lang.Exception: org.apache.flink.streaming.runtime.tasks.
> ExceptionInChainedOperatorException: Could not forward element to next
> operator
>     at org.apache.flink.streaming.runtime.tasks.
> SourceStreamTask$LegacySourceFunctionThread
> .checkThrowSourceExecutionException(SourceStreamTask.java:212)
>     at org.apache.flink.streaming.runtime.tasks.SourceStreamTask
> .performDefaultAction(SourceStreamTask.java:132)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask
> .java:298)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
> StreamTask.java:403)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>     at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.streaming.runtime.tasks.
> ExceptionInChainedOperatorException: Could not forward element to next
> operator
>     at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:654)
>     at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>     at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
>     at org.apache.flink.streaming.api.operators.
> AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:
> 727)
>     at org.apache.flink.streaming.api.operators.
> AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:
> 705)
>     at org.apache.flink.streaming.api.operators.
> StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts
> .java:104)
>     at org.apache.flink.streaming.api.operators.
> StreamSourceContexts$NonTimestampContext.collectWithTimestamp(
> StreamSourceContexts.java:111)
>     at org.apache.flink.streaming.connectors.kafka.internals.
> AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
>     at org.apache.flink.streaming.connectors.kafka.internal.
> Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:91)
>     at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher
> .runFetchLoop(Kafka09Fetcher.java:156)
>     at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase
> .run(FlinkKafkaConsumerBase.java:715)
>     at org.apache.flink.streaming.api.operators.StreamSource.run(
> StreamSource.java:100)
>     at org.apache.flink.streaming.api.operators.StreamSource.run(
> StreamSource.java:63)
>     at org.apache.flink.streaming.runtime.tasks.
> SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:202)
> Caused by: com.esotericsoftware.kryo.KryoException: Error constructing
> instance of class: org.apache.avro.Schema$LockableArrayList
> Serialization trace:
> types (org.apache.avro.Schema$UnionSchema)
> schema (org.apache.avro.Schema$Field)
> fieldMap (org.apache.avro.Schema$RecordSchema)
> schema (org.apache.avro.generic.GenericData$Record)
>     at com.twitter.chill.Instantiators$$anon$1.newInstance(KryoBase.scala:
> 136)
>     at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1061)
>     at com.esotericsoftware.kryo.serializers.CollectionSerializer.create(
> CollectionSerializer.java:89)
>     at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(
> CollectionSerializer.java:93)
>     at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(
> CollectionSerializer.java:22)
>     at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>     at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField
> .java:106)
>     at com.esotericsoftware.kryo.serializers.FieldSerializer.read(
> FieldSerializer.java:528)
>     at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>     at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField
> .java:106)
>     at com.esotericsoftware.kryo.serializers.FieldSerializer.read(
> FieldSerializer.java:528)
>     at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>     at com.esotericsoftware.kryo.serializers.MapSerializer.read(
> MapSerializer.java:143)
>     at com.esotericsoftware.kryo.serializers.MapSerializer.read(
> MapSerializer.java:21)
>     at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>     at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField
> .java:106)
>     at com.esotericsoftware.kryo.serializers.FieldSerializer.read(
> FieldSerializer.java:528)
>     at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>     at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField
> .java:106)
>     at com.esotericsoftware.kryo.serializers.FieldSerializer.read(
> FieldSerializer.java:528)
>     at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
>     at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
> .copy(KryoSerializer.java:262)
>     at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:635)
>     ... 13 more
> Caused by: java.lang.IllegalAccessException: Class com.twitter.chill.
> Instantiators$$anonfun$normalJava$1 can not access a member of class
> org.apache.avro.Schema$LockableArrayList with modifiers "public"
>
> Please help me to resolve this issue.
>
> Thanks,
> Anuj
>
>
>
>
>
> On Mon, Jan 20, 2020 at 9:42 PM aj <aj...@gmail.com> wrote:
>
>> Thanks, Arvid for all the clarification. I will work on the approach you
>> suggested.
>>
>> Thanks,
>> Anuj
>>
>> On Sat, Jan 18, 2020 at 10:17 PM Arvid Heise <ar...@ververica.com> wrote:
>>
>>> Hi Anuj,
>>>
>>> I think that there may be a fundamental misunderstanding about the role
>>> of a schema registry in Kafka. So let me first clarify that.
>>> In each Avro/Parquet file, all records have the same schema. The schema
>>> is stored within the file, such that we can always retrieve the writer
>>> schema for the records.
>>> When Avro was first applied to Kafka, there was the basic question on
>>> how the writer schema for any record is known to the consumer. Storing the
>>> complete schema on each record would mean that each record would be much
>>> larger than needed. Hence, they added the schema registry that assigns a
>>> unique id to schema, which is then embedded into the records.
>>> Now, whenever I update a schema in my producer, I would have old records
>>> with the old schema id and new records with the new schema id.
>>> In my consumer, I'd use a fixed reader schema, such that my application
>>> would not need to worry if the record is written with old or new schema; my
>>> consumer would only see records with the reader schema.
>>>
>>> Given that background information, you see that in general, it's
>>> impossible with a generic approach to write the parquet with the same
>>> schema as it has been written in Kafka: the parquet schema needs to be
>>> supplied statically during query compilation while the actual used Avro
>>> schema in Kafka is only known when actually consuming data.
>>>
>>> But looking further down the road:
>>> * since you need one schema to write the parquet files, you'd need to
>>> decide: do you want to write with the new or the old schema in case of a
>>> schema update? That should also be the reader schema of your application
>>> for a given event type.
>>> * this decision has further implications: your application need to
>>> extract exactly one specific version of the schema from the schema registry
>>> at query compilation. That could be either a specific schema id or the
>>> latest schema for the event type.
>>> * that means that the output schema is locked until you restart your
>>> application and fetch a new latest schema in case of an update.
>>> * at that point, it might just be easier to use the approach that I
>>> outlined previously by bundling a specific schema with your application.
>>>
>>> If you want to extract the latest schema for a subject:
>>>
>>> var registryClient = new CachedSchemaRegistryClient(<schemaRepoUrls>, 1000);
>>> var versions = registryClient.getAllVersions(<subject>);
>>> var schema = registryClient.getSchemaById(versions.get(versions.size() - 1));
>>>
>>>
>>> On Sat, Jan 18, 2020 at 5:22 PM aj <aj...@gmail.com> wrote:
>>>
>>>> Thanks, Arvid.
>>>>
>>>> I do not fully understand the above approach,
>>>> so currently, I am thinking to go with the envelope approach that you
>>>> suggested.
>>>>
>>>> One more question I have if I do not want to keep schema in my consumer
>>>> project even its a single envelope schema. I want it to be fetched from the
>>>> schema registry and pass to my parquet-sink so that I always use the same
>>>> schema that is used by the producer.  Can you provide a sample code how can
>>>> i infer the schema from the generic record or get it from schema registry?
>>>>
>>>>
>>>> Regards,
>>>> Anuj
>>>>
>>>>
>>>>
>>>> On Sat, Jan 18, 2020 at 6:55 PM Arvid Heise <ar...@ververica.com>
>>>> wrote:
>>>>
>>>>> (Readded user mailing list)
>>>>>
>>>>> Hi Anuj,
>>>>>
>>>>> since I'd still recommend going with distinct sources/sinks, let me
>>>>> try to solve your issues in this mail. If that doesn't work out, I'd
>>>>> address your concerns about the envelope format later.
>>>>>
>>>>> In Flink, you can have several subtopologies in the same application.
>>>>>
>>>>> Thus, for each event type, you can add
>>>>> AvroSource(eventType) -> generic transformation/validation ->
>>>>> AvroSink(eventType)
>>>>> for each event.
>>>>>
>>>>> I'd put all avro schema in one project and use an avro plugin to
>>>>> generate the respective Java Classes. Then I'd simply create a map of Avro
>>>>> Schema (GeneratedClassA::SCHEMA, GeneratedClassB::SCHEMA...) to topic name
>>>>> (event-a, event-b, ...).
>>>>> Next, I'd iterate over the list to add the respective subtopologies to
>>>>> env.
>>>>> Finally, execute everything.
>>>>>
>>>>> You have one project where all validations reside. But you'd have
>>>>> almost no overhead to process a given source of eventType. The downside of
>>>>> that approach is of course, that each new event type would require a
>>>>> redeployment, but that seems like what you'd want to do anyhow.
>>>>>
>>>>> Best,
>>>>>
>>>>> Arvid
>>>>>
>>>>> On Sat, Jan 18, 2020 at 2:08 PM aj <aj...@gmail.com> wrote:
>>>>>
>>>>>> Thanks, Arvid.
>>>>>>
>>>>>> 1. I like your approach as I can write a single consumer and put the
>>>>>> data in S3 in parquet format. The only challenge is there are extra columns
>>>>>> that always going to be null as at a time I will get one type of event.
>>>>>>
>>>>>> 2. if I go with a separate schema I am not sure how I can solve it
>>>>>> using a single generalize consumer. Till now what my understanding is I
>>>>>> have to write a consumer for each type of event. Each consumer will read
>>>>>> the whole data then filter the respective events from this and then I can
>>>>>> pass this stream to sink. But this does not look scalable solution as the
>>>>>> new events keep growing then I have to write a consumer for each new type.
>>>>>>
>>>>>>
>>>>>> DataStreamSource<GenericRecord> input = env
>>>>>>         .addSource(
>>>>>>                 new FlinkKafkaConsumer010<GenericRecord>(topics,
>>>>>>                         new
>>>>>> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
>>>>>>                         config).setStartFromEarliest());
>>>>>>
>>>>>> Example :
>>>>>>
>>>>>> * 1st Consumer:*
>>>>>>                   DataStreamSource<GenericRecord> input =
>>>>>> env.addSource(
>>>>>>                 new FlinkKafkaConsumer010<GenericRecord>(topics,
>>>>>>                         new
>>>>>> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
>>>>>>                         config).setStartFromEarliest());
>>>>>> *                 DataStream<GenericRecord> aInput =
>>>>>> input.filter("event_name"= "a")*
>>>>>>
>>>>>> * 2nd Consumer:*
>>>>>>   DataStreamSource<GenericRecord> input = env.addSource(
>>>>>>                 new FlinkKafkaConsumer010<GenericRecord>(topics,
>>>>>>                         new
>>>>>> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
>>>>>>                         config).setStartFromEarliest());
>>>>>> *                 DataStream<GenericRecord> bInput =
>>>>>> input.filter("event_name"= "b")*
>>>>>>
>>>>>>
>>>>>> Can you help me How I solve this using a single consumer as I do not
>>>>>> want to write a separate consumer for each type of schema?
>>>>>>
>>>>>> For example, this is my consumer that contains different types of
>>>>>> records.
>>>>>>
>>>>>> DataStreamSource<GenericRecord> input = env
>>>>>>         .addSource(
>>>>>>                 new FlinkKafkaConsumer010<GenericRecord>(topics,
>>>>>>                         new
>>>>>> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
>>>>>>                         config).setStartFromEarliest());
>>>>>>
>>>>>> Now I can not write this stream directly as there is no common schema
>>>>>> of records in this stream. So possible way I am thinking is
>>>>>>
>>>>>> 1. Can I create multiple streams from this stream using the key by on *"event_name"
>>>>>> *and then write each stream separately.
>>>>>>
>>>>>> Just wanna know is this possible ??
>>>>>>
>>>>>>
>>>>>> Thanks,
>>>>>> Anuj
>>>>>>
>>>>>> On Fri, Jan 17, 2020 at 3:20 PM Arvid Heise <ar...@ververica.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Anuj,
>>>>>>>
>>>>>>> I originally understood that you would like to store data in the
>>>>>>> same Kafka topic and also want to store it in the same parquet file. In the
>>>>>>> past, I mostly used schema registry with Kafka Streams, where you could
>>>>>>> only store a schema for a key and value respectively. To use different
>>>>>>> record types in the same kafka topic, you had to disable schema
>>>>>>> compatibility checks and just stored the schemas as different versions
>>>>>>> under the same subject.
>>>>>>>
>>>>>>> Your approach is much better. You can ensure full schema
>>>>>>> compatibility. Nevertheless, it still shares the same drawback that
>>>>>>> consumption is much harder (using GenericRecord is proof of that) if you
>>>>>>> want to read/write everything into the same place. Also you will never be
>>>>>>> able to write one consistent file, as they can only have one schema (both
>>>>>>> on Avro and Parquet).
>>>>>>> So you only have two options:
>>>>>>> * keep schemas separated, but then you also need to write separate
>>>>>>> files per record type.
>>>>>>> * have a common schema (either my outlined approach or any other
>>>>>>> wrapper schema).
>>>>>>> The approach with a common schema makes only sense if you want to
>>>>>>> write it into one table/kafka topic.
>>>>>>>
>>>>>>> However, in the last mail you pointed out that you actually want to
>>>>>>> store the record types separately. Then, you should keep everything
>>>>>>> separated. Then you should have a sink for each type each getting the
>>>>>>> respective schema. Note that you'd need to fetch the schema manually from
>>>>>>> the schema registry when creating the query as you would need to pass it to
>>>>>>> the sink.
>>>>>>>
>>>>>>> Btw, do you actually have a need to write all events into one Kafka
>>>>>>> topic? The only real use case is to preserve the time order per key.
>>>>>>> Everything else is much more complicated then storing events individually.
>>>>>>>
>>>>>>> Best,
>>>>>>>
>>>>>>> Arvid
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Jan 16, 2020 at 3:39 PM aj <aj...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi Arvid,
>>>>>>>> Thanks for the quick response. I am new to this Avro design so can
>>>>>>>> you please help me understand and design for my use case.
>>>>>>>>
>>>>>>>> I have use case like this :
>>>>>>>>
>>>>>>>> 1. we have an app where a lot of action happened from the user side.
>>>>>>>> 2. for each action we collect some set of information that
>>>>>>>> defined using some key-value pairs. This information we want to define as
>>>>>>>> proper schemas so that we maintain the proper format and not push random
>>>>>>>> data.
>>>>>>>> 3. So we are defining for each action a schema and register in the
>>>>>>>> schema registry using  topic+record.name as the subject .
>>>>>>>> 4. So I do not think the producer side has any issue as whenever we
>>>>>>>> push the event to Kafka we register a new schema with the above subject.
>>>>>>>>
>>>>>>>> Example :
>>>>>>>>
>>>>>>>> {
>>>>>>>> event_name : "a"
>>>>>>>> "timestamp":
>>>>>>>> "properties"  :[
>>>>>>>>   "key-1 : "val-1"
>>>>>>>>    "key-2 : "val-2"
>>>>>>>> ]
>>>>>>>> }
>>>>>>>>
>>>>>>>> {
>>>>>>>> event_name : "b"
>>>>>>>> "timestamp":
>>>>>>>> "properties"  :[
>>>>>>>>   "key-3 : "val-3"
>>>>>>>>    "key-4 : "val-4"
>>>>>>>> ]
>>>>>>>> }
>>>>>>>>
>>>>>>>> Now I  have a consumer that will parse the data by fetching the
>>>>>>>> schema from schema registry and deserialize in the generic record streams.
>>>>>>>>
>>>>>>>> Why you think it will break as I am always deserializing with
>>>>>>>> writer schema only.
>>>>>>>>
>>>>>>>> As you suggested to keep an envelope Avro schema and not separate
>>>>>>>> schema for each type of event that I am generating. I have some doubts
>>>>>>>> about that:
>>>>>>>>
>>>>>>>> 1. How I enforce a schema on each event as it subtypes in the main
>>>>>>>> schema. so when I am getting a JSON event of type "a" how I enforce and
>>>>>>>> convert it to subschema type of "a" and push to Kafka.
>>>>>>>> 2. I want to create a separate hive table for each of the events so
>>>>>>>> when I write this data and lets says I have 20 events than for 19 columns I
>>>>>>>> am getting null values always in data.
>>>>>>>>
>>>>>>>> Please help me in doing this right way. It will be a great help and
>>>>>>>> learning for me.
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Anuj
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Jan 16, 2020 at 7:30 PM Arvid Heise <ar...@ververica.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi Anuj,
>>>>>>>>>
>>>>>>>>> you should always avoid having records with different schemas in
>>>>>>>>> the same topic/dataset. You will break the compatibility features of the
>>>>>>>>> schema registry and your consumer/producer code is always hard to maintain.
>>>>>>>>>
>>>>>>>>> A common and scalable way to avoid it is to use some kind of
>>>>>>>>> envelope format.
>>>>>>>>>
>>>>>>>>> {
>>>>>>>>>   "namespace": "example",
>>>>>>>>>   "name": "Envelope",
>>>>>>>>>   "type": "record",
>>>>>>>>>   "fields": [
>>>>>>>>>     {
>>>>>>>>>       "name": "type1",
>>>>>>>>>       "type": ["null", {
>>>>>>>>>         "type": "record",
>>>>>>>>>         "fields": [ ... ]
>>>>>>>>>       }],
>>>>>>>>>       "default": null
>>>>>>>>>     },
>>>>>>>>>     {
>>>>>>>>>       "name": "type2",
>>>>>>>>>       "type": ["null", {
>>>>>>>>>         "type": "record",
>>>>>>>>>         "fields": [ ... ]
>>>>>>>>>       }],
>>>>>>>>>       "default": null
>>>>>>>>>     }
>>>>>>>>>   ]
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>> This envelope is evolvable (arbitrary addition/removal of wrapped
>>>>>>>>> types, which by themselves can be evolved), and adds only a little overhead
>>>>>>>>> (1 byte per subtype). The downside is that you cannot enforce that exactly
>>>>>>>>> one of the subtypes is set.
>>>>>>>>>
>>>>>>>>> This schema is fully compatible with the schema registry, so no
>>>>>>>>> need to parse anything manually.
>>>>>>>>>
>>>>>>>>> This schema can easily be used with Parquet. If you can't change
>>>>>>>>> the input format anymore, you can at least use that approach on your output.
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>>
>>>>>>>>> Arvid
>>>>>>>>>
>>>>>>>>> On Thu, Jan 16, 2020 at 2:53 PM aj <aj...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hi All,
>>>>>>>>>>
>>>>>>>>>> I have a use case where I am getting a different set of Avro
>>>>>>>>>> records in Kafka. I am using the schema registry to store Avro schema. One
>>>>>>>>>> topic can also have different types of records.
>>>>>>>>>>
>>>>>>>>>> Now I have created a GenericRecord Stream using
>>>>>>>>>> kafkaAvroDeseralizer by defining custom
>>>>>>>>>> Deserializer class like this
>>>>>>>>>>
>>>>>>>>>> @Override
>>>>>>>>>> public GenericRecord deserialize(
>>>>>>>>>> byte[] messageKey, byte[] message, String topic, int partition,
>>>>>>>>>> long offset) {
>>>>>>>>>> checkInitialized();
>>>>>>>>>> return (GenericRecord) inner.deserialize(topic, message);
>>>>>>>>>> }
>>>>>>>>>>
>>>>>>>>>> private void checkInitialized() {
>>>>>>>>>> if (inner == null) {
>>>>>>>>>> Map<String, Object> props = new HashMap<>();
>>>>>>>>>> props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
>>>>>>>>>> registryUrl);
>>>>>>>>>> props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG,
>>>>>>>>>> false);
>>>>>>>>>> SchemaRegistryClient client =
>>>>>>>>>> new CachedSchemaRegistryClient(
>>>>>>>>>> registryUrl,
>>>>>>>>>> AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT);
>>>>>>>>>> inner = new KafkaAvroDeserializer(client, props);
>>>>>>>>>> }
>>>>>>>>>> }
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> And this is my consumer code :
>>>>>>>>>>
>>>>>>>>>> DataStreamSource<GenericRecord> input = env
>>>>>>>>>>         .addSource(
>>>>>>>>>>                 new FlinkKafkaConsumer010<GenericRecord>(topics,
>>>>>>>>>>                         new
>>>>>>>>>> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
>>>>>>>>>>                         config).setStartFromEarliest());
>>>>>>>>>>
>>>>>>>>>> Now I want to write this stream partition on
>>>>>>>>>> *event_name="a"/year=/month=/day=* in parquet format so that I
>>>>>>>>>> can expose hive tables directly on top of this data.
>>>>>>>>>> event_name is common field for all types of records that I am
>>>>>>>>>> getting in Kafka.
>>>>>>>>>> I am stuck as parquet writer needs a schema to write but my
>>>>>>>>>> different records have different schemas  So how do I write this stream in
>>>>>>>>>> s3 in above partition format.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Thanks & Regards,
>>>>>>>>>> Anuj Jain
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Thanks & Regards,
>>>>>>>> Anuj Jain
>>>>>>>> Mob. : +91- 8588817877
>>>>>>>> Skype : anuj.jain07
>>>>>>>> <http://www.oracle.com/>
>>>>>>>>
>>>>>>>>
>>>>>>>> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>> --
>>>>>> Thanks & Regards,
>>>>>> Anuj Jain
>>>>>> Mob. : +91- 8588817877
>>>>>> Skype : anuj.jain07
>>>>>> <http://www.oracle.com/>
>>>>>>
>>>>>>
>>>>>> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>>>>>>
>>>>>
>>>>
>>>> --
>>>> Thanks & Regards,
>>>> Anuj Jain
>>>> Mob. : +91- 8588817877
>>>> Skype : anuj.jain07
>>>> <http://www.oracle.com/>
>>>>
>>>>
>>>> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>>>>
>>>
>>
>> --
>> Thanks & Regards,
>> Anuj Jain
>> Mob. : +91- 8588817877
>> Skype : anuj.jain07
>> <http://www.oracle.com/>
>>
>>
>> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>>
>
>
> --
> Thanks & Regards,
> Anuj Jain
> Mob. : +91- 8588817877
> Skype : anuj.jain07
> <http://www.oracle.com/>
>
>
> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>

Re: Flink ParquetAvroWriters Sink

Posted by aj <aj...@gmail.com>.
I am able to resolve this issue by setting classloader.resolve-order as
parent-first.

On Wed, Jan 22, 2020, 23:13 aj <aj...@gmail.com> wrote:

> Hi Arvid,
>
> I have implemented the code with envelope schema as you suggested but now
> I am facing issues with the consumer . I have written code like this:
>
> FlinkKafkaConsumer010 kafkaConsumer010 = new
> FlinkKafkaConsumer010(KAFKA_TOPICS,
>                 new
> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
>                 properties);
>
> And the Deserialization class looks like this :
>
> pblic class KafkaGenericAvroDeserializationSchema implements
> KeyedDeserializationSchema<GenericRecord> {
>
>     private final String registryUrl;
>     private transient KafkaAvroDeserializer inner;
>
>     public KafkaGenericAvroDeserializationSchema(String registryUrl) {
>         this.registryUrl = registryUrl;
>     }
>
>     @Override
>     public GenericRecord deserialize(byte[] messageKey, byte[] message,
> String topic, int partition, long offset) {
>         checkInitialized();
>         return (GenericRecord) inner.deserialize(topic, message);
>     }
>
>     @Override
>     public boolean isEndOfStream(GenericRecord nextElement) {
>         return false;
>     }
>
>     @Override
>     public TypeInformation<GenericRecord> getProducedType() {
>         return TypeExtractor.getForClass(GenericRecord.class);
>     }
>
>     private void checkInitialized() {
>         if (inner == null) {
>             Map<String, Object> props = new HashMap<>();
>
> props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
> registryUrl);
>
> props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, false);
>             SchemaRegistryClient client =
>                     new CachedSchemaRegistryClient(
>                             registryUrl,
> AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT);
>             inner = new KafkaAvroDeserializer(client, props);
>         }
>     }
> }
>
>
> It's working locally on my machine but when I deployed it on yarn cluster
> I am getting below exception:
>
>
> java.lang.Exception: org.apache.flink.streaming.runtime.tasks.
> ExceptionInChainedOperatorException: Could not forward element to next
> operator
>     at org.apache.flink.streaming.runtime.tasks.
> SourceStreamTask$LegacySourceFunctionThread
> .checkThrowSourceExecutionException(SourceStreamTask.java:212)
>     at org.apache.flink.streaming.runtime.tasks.SourceStreamTask
> .performDefaultAction(SourceStreamTask.java:132)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask
> .java:298)
>     at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(
> StreamTask.java:403)
>     at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
>     at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.streaming.runtime.tasks.
> ExceptionInChainedOperatorException: Could not forward element to next
> operator
>     at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:654)
>     at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
>     at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
>     at org.apache.flink.streaming.api.operators.
> AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:
> 727)
>     at org.apache.flink.streaming.api.operators.
> AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:
> 705)
>     at org.apache.flink.streaming.api.operators.
> StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts
> .java:104)
>     at org.apache.flink.streaming.api.operators.
> StreamSourceContexts$NonTimestampContext.collectWithTimestamp(
> StreamSourceContexts.java:111)
>     at org.apache.flink.streaming.connectors.kafka.internals.
> AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
>     at org.apache.flink.streaming.connectors.kafka.internal.
> Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:91)
>     at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher
> .runFetchLoop(Kafka09Fetcher.java:156)
>     at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase
> .run(FlinkKafkaConsumerBase.java:715)
>     at org.apache.flink.streaming.api.operators.StreamSource.run(
> StreamSource.java:100)
>     at org.apache.flink.streaming.api.operators.StreamSource.run(
> StreamSource.java:63)
>     at org.apache.flink.streaming.runtime.tasks.
> SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:202)
> Caused by: com.esotericsoftware.kryo.KryoException: Error constructing
> instance of class: org.apache.avro.Schema$LockableArrayList
> Serialization trace:
> types (org.apache.avro.Schema$UnionSchema)
> schema (org.apache.avro.Schema$Field)
> fieldMap (org.apache.avro.Schema$RecordSchema)
> schema (org.apache.avro.generic.GenericData$Record)
>     at com.twitter.chill.Instantiators$$anon$1.newInstance(KryoBase.scala:
> 136)
>     at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1061)
>     at com.esotericsoftware.kryo.serializers.CollectionSerializer.create(
> CollectionSerializer.java:89)
>     at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(
> CollectionSerializer.java:93)
>     at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(
> CollectionSerializer.java:22)
>     at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>     at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField
> .java:106)
>     at com.esotericsoftware.kryo.serializers.FieldSerializer.read(
> FieldSerializer.java:528)
>     at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>     at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField
> .java:106)
>     at com.esotericsoftware.kryo.serializers.FieldSerializer.read(
> FieldSerializer.java:528)
>     at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
>     at com.esotericsoftware.kryo.serializers.MapSerializer.read(
> MapSerializer.java:143)
>     at com.esotericsoftware.kryo.serializers.MapSerializer.read(
> MapSerializer.java:21)
>     at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>     at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField
> .java:106)
>     at com.esotericsoftware.kryo.serializers.FieldSerializer.read(
> FieldSerializer.java:528)
>     at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
>     at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField
> .java:106)
>     at com.esotericsoftware.kryo.serializers.FieldSerializer.read(
> FieldSerializer.java:528)
>     at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
>     at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
> .copy(KryoSerializer.java:262)
>     at org.apache.flink.streaming.runtime.tasks.
> OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:635)
>     ... 13 more
> Caused by: java.lang.IllegalAccessException: Class com.twitter.chill.
> Instantiators$$anonfun$normalJava$1 can not access a member of class
> org.apache.avro.Schema$LockableArrayList with modifiers "public"
>
> Please help me to resolve this issue.
>
> Thanks,
> Anuj
>
>
>
>
>
> On Mon, Jan 20, 2020 at 9:42 PM aj <aj...@gmail.com> wrote:
>
>> Thanks, Arvid for all the clarification. I will work on the approach you
>> suggested.
>>
>> Thanks,
>> Anuj
>>
>> On Sat, Jan 18, 2020 at 10:17 PM Arvid Heise <ar...@ververica.com> wrote:
>>
>>> Hi Anuj,
>>>
>>> I think that there may be a fundamental misunderstanding about the role
>>> of a schema registry in Kafka. So let me first clarify that.
>>> In each Avro/Parquet file, all records have the same schema. The schema
>>> is stored within the file, such that we can always retrieve the writer
>>> schema for the records.
>>> When Avro was first applied to Kafka, there was the basic question on
>>> how the writer schema for any record is known to the consumer. Storing the
>>> complete schema on each record would mean that each record would be much
>>> larger than needed. Hence, they added the schema registry that assigns a
>>> unique id to schema, which is then embedded into the records.
>>> Now, whenever I update a schema in my producer, I would have old records
>>> with the old schema id and new records with the new schema id.
>>> In my consumer, I'd use a fixed reader schema, such that my application
>>> would not need to worry if the record is written with old or new schema; my
>>> consumer would only see records with the reader schema.
>>>
>>> Given that background information, you see that in general, it's
>>> impossible with a generic approach to write the parquet with the same
>>> schema as it has been written in Kafka: the parquet schema needs to be
>>> supplied statically during query compilation while the actual used Avro
>>> schema in Kafka is only known when actually consuming data.
>>>
>>> But looking further down the road:
>>> * since you need one schema to write the parquet files, you'd need to
>>> decide: do you want to write with the new or the old schema in case of a
>>> schema update? That should also be the reader schema of your application
>>> for a given event type.
>>> * this decision has further implications: your application need to
>>> extract exactly one specific version of the schema from the schema registry
>>> at query compilation. That could be either a specific schema id or the
>>> latest schema for the event type.
>>> * that means that the output schema is locked until you restart your
>>> application and fetch a new latest schema in case of an update.
>>> * at that point, it might just be easier to use the approach that I
>>> outlined previously by bundling a specific schema with your application.
>>>
>>> If you want to extract the latest schema for a subject:
>>>
>>> var registryClient = new CachedSchemaRegistryClient(<schemaRepoUrls>, 1000);
>>> var versions = registryClient.getAllVersions(<subject>);
>>> var schema = registryClient.getSchemaById(versions.get(versions.size() - 1));
>>>
>>>
>>> On Sat, Jan 18, 2020 at 5:22 PM aj <aj...@gmail.com> wrote:
>>>
>>>> Thanks, Arvid.
>>>>
>>>> I do not fully understand the above approach,
>>>> so currently, I am thinking to go with the envelope approach that you
>>>> suggested.
>>>>
>>>> One more question I have if I do not want to keep schema in my consumer
>>>> project even its a single envelope schema. I want it to be fetched from the
>>>> schema registry and pass to my parquet-sink so that I always use the same
>>>> schema that is used by the producer.  Can you provide a sample code how can
>>>> i infer the schema from the generic record or get it from schema registry?
>>>>
>>>>
>>>> Regards,
>>>> Anuj
>>>>
>>>>
>>>>
>>>> On Sat, Jan 18, 2020 at 6:55 PM Arvid Heise <ar...@ververica.com>
>>>> wrote:
>>>>
>>>>> (Readded user mailing list)
>>>>>
>>>>> Hi Anuj,
>>>>>
>>>>> since I'd still recommend going with distinct sources/sinks, let me
>>>>> try to solve your issues in this mail. If that doesn't work out, I'd
>>>>> address your concerns about the envelope format later.
>>>>>
>>>>> In Flink, you can have several subtopologies in the same application.
>>>>>
>>>>> Thus, for each event type, you can add
>>>>> AvroSource(eventType) -> generic transformation/validation ->
>>>>> AvroSink(eventType)
>>>>> for each event.
>>>>>
>>>>> I'd put all avro schema in one project and use an avro plugin to
>>>>> generate the respective Java Classes. Then I'd simply create a map of Avro
>>>>> Schema (GeneratedClassA::SCHEMA, GeneratedClassB::SCHEMA...) to topic name
>>>>> (event-a, event-b, ...).
>>>>> Next, I'd iterate over the list to add the respective subtopologies to
>>>>> env.
>>>>> Finally, execute everything.
>>>>>
>>>>> You have one project where all validations reside. But you'd have
>>>>> almost no overhead to process a given source of eventType. The downside of
>>>>> that approach is of course, that each new event type would require a
>>>>> redeployment, but that seems like what you'd want to do anyhow.
>>>>>
>>>>> Best,
>>>>>
>>>>> Arvid
>>>>>
>>>>> On Sat, Jan 18, 2020 at 2:08 PM aj <aj...@gmail.com> wrote:
>>>>>
>>>>>> Thanks, Arvid.
>>>>>>
>>>>>> 1. I like your approach as I can write a single consumer and put the
>>>>>> data in S3 in parquet format. The only challenge is there are extra columns
>>>>>> that always going to be null as at a time I will get one type of event.
>>>>>>
>>>>>> 2. if I go with a separate schema I am not sure how I can solve it
>>>>>> using a single generalize consumer. Till now what my understanding is I
>>>>>> have to write a consumer for each type of event. Each consumer will read
>>>>>> the whole data then filter the respective events from this and then I can
>>>>>> pass this stream to sink. But this does not look scalable solution as the
>>>>>> new events keep growing then I have to write a consumer for each new type.
>>>>>>
>>>>>>
>>>>>> DataStreamSource<GenericRecord> input = env
>>>>>>         .addSource(
>>>>>>                 new FlinkKafkaConsumer010<GenericRecord>(topics,
>>>>>>                         new
>>>>>> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
>>>>>>                         config).setStartFromEarliest());
>>>>>>
>>>>>> Example :
>>>>>>
>>>>>> * 1st Consumer:*
>>>>>>                   DataStreamSource<GenericRecord> input =
>>>>>> env.addSource(
>>>>>>                 new FlinkKafkaConsumer010<GenericRecord>(topics,
>>>>>>                         new
>>>>>> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
>>>>>>                         config).setStartFromEarliest());
>>>>>> *                 DataStream<GenericRecord> aInput =
>>>>>> input.filter("event_name"= "a")*
>>>>>>
>>>>>> * 2nd Consumer:*
>>>>>>   DataStreamSource<GenericRecord> input = env.addSource(
>>>>>>                 new FlinkKafkaConsumer010<GenericRecord>(topics,
>>>>>>                         new
>>>>>> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
>>>>>>                         config).setStartFromEarliest());
>>>>>> *                 DataStream<GenericRecord> bInput =
>>>>>> input.filter("event_name"= "b")*
>>>>>>
>>>>>>
>>>>>> Can you help me How I solve this using a single consumer as I do not
>>>>>> want to write a separate consumer for each type of schema?
>>>>>>
>>>>>> For example, this is my consumer that contains different types of
>>>>>> records.
>>>>>>
>>>>>> DataStreamSource<GenericRecord> input = env
>>>>>>         .addSource(
>>>>>>                 new FlinkKafkaConsumer010<GenericRecord>(topics,
>>>>>>                         new
>>>>>> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
>>>>>>                         config).setStartFromEarliest());
>>>>>>
>>>>>> Now I can not write this stream directly as there is no common schema
>>>>>> of records in this stream. So possible way I am thinking is
>>>>>>
>>>>>> 1. Can I create multiple streams from this stream using the key by on *"event_name"
>>>>>> *and then write each stream separately.
>>>>>>
>>>>>> Just wanna know is this possible ??
>>>>>>
>>>>>>
>>>>>> Thanks,
>>>>>> Anuj
>>>>>>
>>>>>> On Fri, Jan 17, 2020 at 3:20 PM Arvid Heise <ar...@ververica.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Anuj,
>>>>>>>
>>>>>>> I originally understood that you would like to store data in the
>>>>>>> same Kafka topic and also want to store it in the same parquet file. In the
>>>>>>> past, I mostly used schema registry with Kafka Streams, where you could
>>>>>>> only store a schema for a key and value respectively. To use different
>>>>>>> record types in the same kafka topic, you had to disable schema
>>>>>>> compatibility checks and just stored the schemas as different versions
>>>>>>> under the same subject.
>>>>>>>
>>>>>>> Your approach is much better. You can ensure full schema
>>>>>>> compatibility. Nevertheless, it still shares the same drawback that
>>>>>>> consumption is much harder (using GenericRecord is proof of that) if you
>>>>>>> want to read/write everything into the same place. Also you will never be
>>>>>>> able to write one consistent file, as they can only have one schema (both
>>>>>>> on Avro and Parquet).
>>>>>>> So you only have two options:
>>>>>>> * keep schemas separated, but then you also need to write separate
>>>>>>> files per record type.
>>>>>>> * have a common schema (either my outlined approach or any other
>>>>>>> wrapper schema).
>>>>>>> The approach with a common schema makes only sense if you want to
>>>>>>> write it into one table/kafka topic.
>>>>>>>
>>>>>>> However, in the last mail you pointed out that you actually want to
>>>>>>> store the record types separately. Then, you should keep everything
>>>>>>> separated. Then you should have a sink for each type each getting the
>>>>>>> respective schema. Note that you'd need to fetch the schema manually from
>>>>>>> the schema registry when creating the query as you would need to pass it to
>>>>>>> the sink.
>>>>>>>
>>>>>>> Btw, do you actually have a need to write all events into one Kafka
>>>>>>> topic? The only real use case is to preserve the time order per key.
>>>>>>> Everything else is much more complicated then storing events individually.
>>>>>>>
>>>>>>> Best,
>>>>>>>
>>>>>>> Arvid
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Jan 16, 2020 at 3:39 PM aj <aj...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi Arvid,
>>>>>>>> Thanks for the quick response. I am new to this Avro design so can
>>>>>>>> you please help me understand and design for my use case.
>>>>>>>>
>>>>>>>> I have use case like this :
>>>>>>>>
>>>>>>>> 1. we have an app where a lot of action happened from the user side.
>>>>>>>> 2. for each action we collect some set of information that
>>>>>>>> defined using some key-value pairs. This information we want to define as
>>>>>>>> proper schemas so that we maintain the proper format and not push random
>>>>>>>> data.
>>>>>>>> 3. So we are defining for each action a schema and register in the
>>>>>>>> schema registry using  topic+record.name as the subject .
>>>>>>>> 4. So I do not think the producer side has any issue as whenever we
>>>>>>>> push the event to Kafka we register a new schema with the above subject.
>>>>>>>>
>>>>>>>> Example :
>>>>>>>>
>>>>>>>> {
>>>>>>>> event_name : "a"
>>>>>>>> "timestamp":
>>>>>>>> "properties"  :[
>>>>>>>>   "key-1 : "val-1"
>>>>>>>>    "key-2 : "val-2"
>>>>>>>> ]
>>>>>>>> }
>>>>>>>>
>>>>>>>> {
>>>>>>>> event_name : "b"
>>>>>>>> "timestamp":
>>>>>>>> "properties"  :[
>>>>>>>>   "key-3 : "val-3"
>>>>>>>>    "key-4 : "val-4"
>>>>>>>> ]
>>>>>>>> }
>>>>>>>>
>>>>>>>> Now I  have a consumer that will parse the data by fetching the
>>>>>>>> schema from schema registry and deserialize in the generic record streams.
>>>>>>>>
>>>>>>>> Why you think it will break as I am always deserializing with
>>>>>>>> writer schema only.
>>>>>>>>
>>>>>>>> As you suggested to keep an envelope Avro schema and not separate
>>>>>>>> schema for each type of event that I am generating. I have some doubts
>>>>>>>> about that:
>>>>>>>>
>>>>>>>> 1. How I enforce a schema on each event as it subtypes in the main
>>>>>>>> schema. so when I am getting a JSON event of type "a" how I enforce and
>>>>>>>> convert it to subschema type of "a" and push to Kafka.
>>>>>>>> 2. I want to create a separate hive table for each of the events so
>>>>>>>> when I write this data and lets says I have 20 events than for 19 columns I
>>>>>>>> am getting null values always in data.
>>>>>>>>
>>>>>>>> Please help me in doing this right way. It will be a great help and
>>>>>>>> learning for me.
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Anuj
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Jan 16, 2020 at 7:30 PM Arvid Heise <ar...@ververica.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi Anuj,
>>>>>>>>>
>>>>>>>>> you should always avoid having records with different schemas in
>>>>>>>>> the same topic/dataset. You will break the compatibility features of the
>>>>>>>>> schema registry and your consumer/producer code is always hard to maintain.
>>>>>>>>>
>>>>>>>>> A common and scalable way to avoid it is to use some kind of
>>>>>>>>> envelope format.
>>>>>>>>>
>>>>>>>>> {
>>>>>>>>>   "namespace": "example",
>>>>>>>>>   "name": "Envelope",
>>>>>>>>>   "type": "record",
>>>>>>>>>   "fields": [
>>>>>>>>>     {
>>>>>>>>>       "name": "type1",
>>>>>>>>>       "type": ["null", {
>>>>>>>>>         "type": "record",
>>>>>>>>>         "fields": [ ... ]
>>>>>>>>>       }],
>>>>>>>>>       "default": null
>>>>>>>>>     },
>>>>>>>>>     {
>>>>>>>>>       "name": "type2",
>>>>>>>>>       "type": ["null", {
>>>>>>>>>         "type": "record",
>>>>>>>>>         "fields": [ ... ]
>>>>>>>>>       }],
>>>>>>>>>       "default": null
>>>>>>>>>     }
>>>>>>>>>   ]
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>> This envelope is evolvable (arbitrary addition/removal of wrapped
>>>>>>>>> types, which by themselves can be evolved), and adds only a little overhead
>>>>>>>>> (1 byte per subtype). The downside is that you cannot enforce that exactly
>>>>>>>>> one of the subtypes is set.
>>>>>>>>>
>>>>>>>>> This schema is fully compatible with the schema registry, so no
>>>>>>>>> need to parse anything manually.
>>>>>>>>>
>>>>>>>>> This schema can easily be used with Parquet. If you can't change
>>>>>>>>> the input format anymore, you can at least use that approach on your output.
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>>
>>>>>>>>> Arvid
>>>>>>>>>
>>>>>>>>> On Thu, Jan 16, 2020 at 2:53 PM aj <aj...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hi All,
>>>>>>>>>>
>>>>>>>>>> I have a use case where I am getting a different set of Avro
>>>>>>>>>> records in Kafka. I am using the schema registry to store Avro schema. One
>>>>>>>>>> topic can also have different types of records.
>>>>>>>>>>
>>>>>>>>>> Now I have created a GenericRecord Stream using
>>>>>>>>>> kafkaAvroDeseralizer by defining custom
>>>>>>>>>> Deserializer class like this
>>>>>>>>>>
>>>>>>>>>> @Override
>>>>>>>>>> public GenericRecord deserialize(
>>>>>>>>>> byte[] messageKey, byte[] message, String topic, int partition,
>>>>>>>>>> long offset) {
>>>>>>>>>> checkInitialized();
>>>>>>>>>> return (GenericRecord) inner.deserialize(topic, message);
>>>>>>>>>> }
>>>>>>>>>>
>>>>>>>>>> private void checkInitialized() {
>>>>>>>>>> if (inner == null) {
>>>>>>>>>> Map<String, Object> props = new HashMap<>();
>>>>>>>>>> props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
>>>>>>>>>> registryUrl);
>>>>>>>>>> props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG,
>>>>>>>>>> false);
>>>>>>>>>> SchemaRegistryClient client =
>>>>>>>>>> new CachedSchemaRegistryClient(
>>>>>>>>>> registryUrl,
>>>>>>>>>> AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT);
>>>>>>>>>> inner = new KafkaAvroDeserializer(client, props);
>>>>>>>>>> }
>>>>>>>>>> }
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> And this is my consumer code :
>>>>>>>>>>
>>>>>>>>>> DataStreamSource<GenericRecord> input = env
>>>>>>>>>>         .addSource(
>>>>>>>>>>                 new FlinkKafkaConsumer010<GenericRecord>(topics,
>>>>>>>>>>                         new
>>>>>>>>>> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
>>>>>>>>>>                         config).setStartFromEarliest());
>>>>>>>>>>
>>>>>>>>>> Now I want to write this stream partition on
>>>>>>>>>> *event_name="a"/year=/month=/day=* in parquet format so that I
>>>>>>>>>> can expose hive tables directly on top of this data.
>>>>>>>>>> event_name is common field for all types of records that I am
>>>>>>>>>> getting in Kafka.
>>>>>>>>>> I am stuck as parquet writer needs a schema to write but my
>>>>>>>>>> different records have different schemas  So how do I write this stream in
>>>>>>>>>> s3 in above partition format.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Thanks & Regards,
>>>>>>>>>> Anuj Jain
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Thanks & Regards,
>>>>>>>> Anuj Jain
>>>>>>>> Mob. : +91- 8588817877
>>>>>>>> Skype : anuj.jain07
>>>>>>>> <http://www.oracle.com/>
>>>>>>>>
>>>>>>>>
>>>>>>>> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>> --
>>>>>> Thanks & Regards,
>>>>>> Anuj Jain
>>>>>> Mob. : +91- 8588817877
>>>>>> Skype : anuj.jain07
>>>>>> <http://www.oracle.com/>
>>>>>>
>>>>>>
>>>>>> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>>>>>>
>>>>>
>>>>
>>>> --
>>>> Thanks & Regards,
>>>> Anuj Jain
>>>> Mob. : +91- 8588817877
>>>> Skype : anuj.jain07
>>>> <http://www.oracle.com/>
>>>>
>>>>
>>>> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>>>>
>>>
>>
>> --
>> Thanks & Regards,
>> Anuj Jain
>> Mob. : +91- 8588817877
>> Skype : anuj.jain07
>> <http://www.oracle.com/>
>>
>>
>> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>>
>
>
> --
> Thanks & Regards,
> Anuj Jain
> Mob. : +91- 8588817877
> Skype : anuj.jain07
> <http://www.oracle.com/>
>
>
> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>

Re: Flink ParquetAvroWriters Sink

Posted by aj <aj...@gmail.com>.
Hi Arvid,

I have implemented the code with envelope schema as you suggested but now I
am facing issues with the consumer . I have written code like this:

FlinkKafkaConsumer010 kafkaConsumer010 = new
FlinkKafkaConsumer010(KAFKA_TOPICS,
                new
KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
                properties);

And the Deserialization class looks like this :

pblic class KafkaGenericAvroDeserializationSchema implements
KeyedDeserializationSchema<GenericRecord> {

    private final String registryUrl;
    private transient KafkaAvroDeserializer inner;

    public KafkaGenericAvroDeserializationSchema(String registryUrl) {
        this.registryUrl = registryUrl;
    }

    @Override
    public GenericRecord deserialize(byte[] messageKey, byte[] message,
String topic, int partition, long offset) {
        checkInitialized();
        return (GenericRecord) inner.deserialize(topic, message);
    }

    @Override
    public boolean isEndOfStream(GenericRecord nextElement) {
        return false;
    }

    @Override
    public TypeInformation<GenericRecord> getProducedType() {
        return TypeExtractor.getForClass(GenericRecord.class);
    }

    private void checkInitialized() {
        if (inner == null) {
            Map<String, Object> props = new HashMap<>();

props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
registryUrl);

props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, false);
            SchemaRegistryClient client =
                    new CachedSchemaRegistryClient(
                            registryUrl,
AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT);
            inner = new KafkaAvroDeserializer(client, props);
        }
    }
}


It's working locally on my machine but when I deployed it on yarn cluster I
am getting below exception:


java.lang.Exception: org.apache.flink.streaming.runtime.tasks.
ExceptionInChainedOperatorException: Could not forward element to next
operator
    at org.apache.flink.streaming.runtime.tasks.
SourceStreamTask$LegacySourceFunctionThread
.checkThrowSourceExecutionException(SourceStreamTask.java:212)
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask
.performDefaultAction(SourceStreamTask.java:132)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask
.java:298)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask
.java:403)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.streaming.runtime.tasks.
ExceptionInChainedOperatorException: Could not forward element to next
operator
    at org.apache.flink.streaming.runtime.tasks.
OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:654)
    at org.apache.flink.streaming.runtime.tasks.
OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
    at org.apache.flink.streaming.runtime.tasks.
OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
    at org.apache.flink.streaming.api.operators.
AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:
727)
    at org.apache.flink.streaming.api.operators.
AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:
705)
    at org.apache.flink.streaming.api.operators.
StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:
104)
    at org.apache.flink.streaming.api.operators.
StreamSourceContexts$NonTimestampContext.collectWithTimestamp(
StreamSourceContexts.java:111)
    at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher
.emitRecordWithTimestamp(AbstractFetcher.java:398)
    at org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher
.emitRecord(Kafka010Fetcher.java:91)
    at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher
.runFetchLoop(Kafka09Fetcher.java:156)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase
.run(FlinkKafkaConsumerBase.java:715)
    at org.apache.flink.streaming.api.operators.StreamSource.run(
StreamSource.java:100)
    at org.apache.flink.streaming.api.operators.StreamSource.run(
StreamSource.java:63)
    at org.apache.flink.streaming.runtime.tasks.
SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:202)
Caused by: com.esotericsoftware.kryo.KryoException: Error constructing
instance of class: org.apache.avro.Schema$LockableArrayList
Serialization trace:
types (org.apache.avro.Schema$UnionSchema)
schema (org.apache.avro.Schema$Field)
fieldMap (org.apache.avro.Schema$RecordSchema)
schema (org.apache.avro.generic.GenericData$Record)
    at com.twitter.chill.Instantiators$$anon$1.newInstance(KryoBase.scala:
136)
    at com.esotericsoftware.kryo.Kryo.newInstance(Kryo.java:1061)
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.create(
CollectionSerializer.java:89)
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(
CollectionSerializer.java:93)
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(
CollectionSerializer.java:22)
    at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
    at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField
.java:106)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.read(
FieldSerializer.java:528)
    at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
    at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField
.java:106)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.read(
FieldSerializer.java:528)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
    at com.esotericsoftware.kryo.serializers.MapSerializer.read(
MapSerializer.java:143)
    at com.esotericsoftware.kryo.serializers.MapSerializer.read(
MapSerializer.java:21)
    at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
    at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField
.java:106)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.read(
FieldSerializer.java:528)
    at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
    at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField
.java:106)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.read(
FieldSerializer.java:528)
    at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
    at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(
KryoSerializer.java:262)
    at org.apache.flink.streaming.runtime.tasks.
OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:635)
    ... 13 more
Caused by: java.lang.IllegalAccessException: Class com.twitter.chill.
Instantiators$$anonfun$normalJava$1 can not access a member of class
org.apache.avro.Schema$LockableArrayList with modifiers "public"

Please help me to resolve this issue.

Thanks,
Anuj





On Mon, Jan 20, 2020 at 9:42 PM aj <aj...@gmail.com> wrote:

> Thanks, Arvid for all the clarification. I will work on the approach you
> suggested.
>
> Thanks,
> Anuj
>
> On Sat, Jan 18, 2020 at 10:17 PM Arvid Heise <ar...@ververica.com> wrote:
>
>> Hi Anuj,
>>
>> I think that there may be a fundamental misunderstanding about the role
>> of a schema registry in Kafka. So let me first clarify that.
>> In each Avro/Parquet file, all records have the same schema. The schema
>> is stored within the file, such that we can always retrieve the writer
>> schema for the records.
>> When Avro was first applied to Kafka, there was the basic question on how
>> the writer schema for any record is known to the consumer. Storing the
>> complete schema on each record would mean that each record would be much
>> larger than needed. Hence, they added the schema registry that assigns a
>> unique id to schema, which is then embedded into the records.
>> Now, whenever I update a schema in my producer, I would have old records
>> with the old schema id and new records with the new schema id.
>> In my consumer, I'd use a fixed reader schema, such that my application
>> would not need to worry if the record is written with old or new schema; my
>> consumer would only see records with the reader schema.
>>
>> Given that background information, you see that in general, it's
>> impossible with a generic approach to write the parquet with the same
>> schema as it has been written in Kafka: the parquet schema needs to be
>> supplied statically during query compilation while the actual used Avro
>> schema in Kafka is only known when actually consuming data.
>>
>> But looking further down the road:
>> * since you need one schema to write the parquet files, you'd need to
>> decide: do you want to write with the new or the old schema in case of a
>> schema update? That should also be the reader schema of your application
>> for a given event type.
>> * this decision has further implications: your application need to
>> extract exactly one specific version of the schema from the schema registry
>> at query compilation. That could be either a specific schema id or the
>> latest schema for the event type.
>> * that means that the output schema is locked until you restart your
>> application and fetch a new latest schema in case of an update.
>> * at that point, it might just be easier to use the approach that I
>> outlined previously by bundling a specific schema with your application.
>>
>> If you want to extract the latest schema for a subject:
>>
>> var registryClient = new CachedSchemaRegistryClient(<schemaRepoUrls>, 1000);
>> var versions = registryClient.getAllVersions(<subject>);
>> var schema = registryClient.getSchemaById(versions.get(versions.size() - 1));
>>
>>
>> On Sat, Jan 18, 2020 at 5:22 PM aj <aj...@gmail.com> wrote:
>>
>>> Thanks, Arvid.
>>>
>>> I do not fully understand the above approach,
>>> so currently, I am thinking to go with the envelope approach that you
>>> suggested.
>>>
>>> One more question I have if I do not want to keep schema in my consumer
>>> project even its a single envelope schema. I want it to be fetched from the
>>> schema registry and pass to my parquet-sink so that I always use the same
>>> schema that is used by the producer.  Can you provide a sample code how can
>>> i infer the schema from the generic record or get it from schema registry?
>>>
>>>
>>> Regards,
>>> Anuj
>>>
>>>
>>>
>>> On Sat, Jan 18, 2020 at 6:55 PM Arvid Heise <ar...@ververica.com> wrote:
>>>
>>>> (Readded user mailing list)
>>>>
>>>> Hi Anuj,
>>>>
>>>> since I'd still recommend going with distinct sources/sinks, let me try
>>>> to solve your issues in this mail. If that doesn't work out, I'd address
>>>> your concerns about the envelope format later.
>>>>
>>>> In Flink, you can have several subtopologies in the same application.
>>>>
>>>> Thus, for each event type, you can add
>>>> AvroSource(eventType) -> generic transformation/validation ->
>>>> AvroSink(eventType)
>>>> for each event.
>>>>
>>>> I'd put all avro schema in one project and use an avro plugin to
>>>> generate the respective Java Classes. Then I'd simply create a map of Avro
>>>> Schema (GeneratedClassA::SCHEMA, GeneratedClassB::SCHEMA...) to topic name
>>>> (event-a, event-b, ...).
>>>> Next, I'd iterate over the list to add the respective subtopologies to
>>>> env.
>>>> Finally, execute everything.
>>>>
>>>> You have one project where all validations reside. But you'd have
>>>> almost no overhead to process a given source of eventType. The downside of
>>>> that approach is of course, that each new event type would require a
>>>> redeployment, but that seems like what you'd want to do anyhow.
>>>>
>>>> Best,
>>>>
>>>> Arvid
>>>>
>>>> On Sat, Jan 18, 2020 at 2:08 PM aj <aj...@gmail.com> wrote:
>>>>
>>>>> Thanks, Arvid.
>>>>>
>>>>> 1. I like your approach as I can write a single consumer and put the
>>>>> data in S3 in parquet format. The only challenge is there are extra columns
>>>>> that always going to be null as at a time I will get one type of event.
>>>>>
>>>>> 2. if I go with a separate schema I am not sure how I can solve it
>>>>> using a single generalize consumer. Till now what my understanding is I
>>>>> have to write a consumer for each type of event. Each consumer will read
>>>>> the whole data then filter the respective events from this and then I can
>>>>> pass this stream to sink. But this does not look scalable solution as the
>>>>> new events keep growing then I have to write a consumer for each new type.
>>>>>
>>>>>
>>>>> DataStreamSource<GenericRecord> input = env
>>>>>         .addSource(
>>>>>                 new FlinkKafkaConsumer010<GenericRecord>(topics,
>>>>>                         new
>>>>> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
>>>>>                         config).setStartFromEarliest());
>>>>>
>>>>> Example :
>>>>>
>>>>> * 1st Consumer:*
>>>>>                   DataStreamSource<GenericRecord> input =
>>>>> env.addSource(
>>>>>                 new FlinkKafkaConsumer010<GenericRecord>(topics,
>>>>>                         new
>>>>> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
>>>>>                         config).setStartFromEarliest());
>>>>> *                 DataStream<GenericRecord> aInput =
>>>>> input.filter("event_name"= "a")*
>>>>>
>>>>> * 2nd Consumer:*
>>>>>   DataStreamSource<GenericRecord> input = env.addSource(
>>>>>                 new FlinkKafkaConsumer010<GenericRecord>(topics,
>>>>>                         new
>>>>> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
>>>>>                         config).setStartFromEarliest());
>>>>> *                 DataStream<GenericRecord> bInput =
>>>>> input.filter("event_name"= "b")*
>>>>>
>>>>>
>>>>> Can you help me How I solve this using a single consumer as I do not
>>>>> want to write a separate consumer for each type of schema?
>>>>>
>>>>> For example, this is my consumer that contains different types of
>>>>> records.
>>>>>
>>>>> DataStreamSource<GenericRecord> input = env
>>>>>         .addSource(
>>>>>                 new FlinkKafkaConsumer010<GenericRecord>(topics,
>>>>>                         new
>>>>> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
>>>>>                         config).setStartFromEarliest());
>>>>>
>>>>> Now I can not write this stream directly as there is no common schema
>>>>> of records in this stream. So possible way I am thinking is
>>>>>
>>>>> 1. Can I create multiple streams from this stream using the key by on *"event_name"
>>>>> *and then write each stream separately.
>>>>>
>>>>> Just wanna know is this possible ??
>>>>>
>>>>>
>>>>> Thanks,
>>>>> Anuj
>>>>>
>>>>> On Fri, Jan 17, 2020 at 3:20 PM Arvid Heise <ar...@ververica.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Anuj,
>>>>>>
>>>>>> I originally understood that you would like to store data in the same
>>>>>> Kafka topic and also want to store it in the same parquet file. In the
>>>>>> past, I mostly used schema registry with Kafka Streams, where you could
>>>>>> only store a schema for a key and value respectively. To use different
>>>>>> record types in the same kafka topic, you had to disable schema
>>>>>> compatibility checks and just stored the schemas as different versions
>>>>>> under the same subject.
>>>>>>
>>>>>> Your approach is much better. You can ensure full schema
>>>>>> compatibility. Nevertheless, it still shares the same drawback that
>>>>>> consumption is much harder (using GenericRecord is proof of that) if you
>>>>>> want to read/write everything into the same place. Also you will never be
>>>>>> able to write one consistent file, as they can only have one schema (both
>>>>>> on Avro and Parquet).
>>>>>> So you only have two options:
>>>>>> * keep schemas separated, but then you also need to write separate
>>>>>> files per record type.
>>>>>> * have a common schema (either my outlined approach or any other
>>>>>> wrapper schema).
>>>>>> The approach with a common schema makes only sense if you want to
>>>>>> write it into one table/kafka topic.
>>>>>>
>>>>>> However, in the last mail you pointed out that you actually want to
>>>>>> store the record types separately. Then, you should keep everything
>>>>>> separated. Then you should have a sink for each type each getting the
>>>>>> respective schema. Note that you'd need to fetch the schema manually from
>>>>>> the schema registry when creating the query as you would need to pass it to
>>>>>> the sink.
>>>>>>
>>>>>> Btw, do you actually have a need to write all events into one Kafka
>>>>>> topic? The only real use case is to preserve the time order per key.
>>>>>> Everything else is much more complicated then storing events individually.
>>>>>>
>>>>>> Best,
>>>>>>
>>>>>> Arvid
>>>>>>
>>>>>>
>>>>>> On Thu, Jan 16, 2020 at 3:39 PM aj <aj...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi Arvid,
>>>>>>> Thanks for the quick response. I am new to this Avro design so can
>>>>>>> you please help me understand and design for my use case.
>>>>>>>
>>>>>>> I have use case like this :
>>>>>>>
>>>>>>> 1. we have an app where a lot of action happened from the user side.
>>>>>>> 2. for each action we collect some set of information that
>>>>>>> defined using some key-value pairs. This information we want to define as
>>>>>>> proper schemas so that we maintain the proper format and not push random
>>>>>>> data.
>>>>>>> 3. So we are defining for each action a schema and register in the
>>>>>>> schema registry using  topic+record.name as the subject .
>>>>>>> 4. So I do not think the producer side has any issue as whenever we
>>>>>>> push the event to Kafka we register a new schema with the above subject.
>>>>>>>
>>>>>>> Example :
>>>>>>>
>>>>>>> {
>>>>>>> event_name : "a"
>>>>>>> "timestamp":
>>>>>>> "properties"  :[
>>>>>>>   "key-1 : "val-1"
>>>>>>>    "key-2 : "val-2"
>>>>>>> ]
>>>>>>> }
>>>>>>>
>>>>>>> {
>>>>>>> event_name : "b"
>>>>>>> "timestamp":
>>>>>>> "properties"  :[
>>>>>>>   "key-3 : "val-3"
>>>>>>>    "key-4 : "val-4"
>>>>>>> ]
>>>>>>> }
>>>>>>>
>>>>>>> Now I  have a consumer that will parse the data by fetching the
>>>>>>> schema from schema registry and deserialize in the generic record streams.
>>>>>>>
>>>>>>> Why you think it will break as I am always deserializing with writer
>>>>>>> schema only.
>>>>>>>
>>>>>>> As you suggested to keep an envelope Avro schema and not separate
>>>>>>> schema for each type of event that I am generating. I have some doubts
>>>>>>> about that:
>>>>>>>
>>>>>>> 1. How I enforce a schema on each event as it subtypes in the main
>>>>>>> schema. so when I am getting a JSON event of type "a" how I enforce and
>>>>>>> convert it to subschema type of "a" and push to Kafka.
>>>>>>> 2. I want to create a separate hive table for each of the events so
>>>>>>> when I write this data and lets says I have 20 events than for 19 columns I
>>>>>>> am getting null values always in data.
>>>>>>>
>>>>>>> Please help me in doing this right way. It will be a great help and
>>>>>>> learning for me.
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Anuj
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Jan 16, 2020 at 7:30 PM Arvid Heise <ar...@ververica.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Anuj,
>>>>>>>>
>>>>>>>> you should always avoid having records with different schemas in
>>>>>>>> the same topic/dataset. You will break the compatibility features of the
>>>>>>>> schema registry and your consumer/producer code is always hard to maintain.
>>>>>>>>
>>>>>>>> A common and scalable way to avoid it is to use some kind of
>>>>>>>> envelope format.
>>>>>>>>
>>>>>>>> {
>>>>>>>>   "namespace": "example",
>>>>>>>>   "name": "Envelope",
>>>>>>>>   "type": "record",
>>>>>>>>   "fields": [
>>>>>>>>     {
>>>>>>>>       "name": "type1",
>>>>>>>>       "type": ["null", {
>>>>>>>>         "type": "record",
>>>>>>>>         "fields": [ ... ]
>>>>>>>>       }],
>>>>>>>>       "default": null
>>>>>>>>     },
>>>>>>>>     {
>>>>>>>>       "name": "type2",
>>>>>>>>       "type": ["null", {
>>>>>>>>         "type": "record",
>>>>>>>>         "fields": [ ... ]
>>>>>>>>       }],
>>>>>>>>       "default": null
>>>>>>>>     }
>>>>>>>>   ]
>>>>>>>> }
>>>>>>>>
>>>>>>>> This envelope is evolvable (arbitrary addition/removal of wrapped
>>>>>>>> types, which by themselves can be evolved), and adds only a little overhead
>>>>>>>> (1 byte per subtype). The downside is that you cannot enforce that exactly
>>>>>>>> one of the subtypes is set.
>>>>>>>>
>>>>>>>> This schema is fully compatible with the schema registry, so no
>>>>>>>> need to parse anything manually.
>>>>>>>>
>>>>>>>> This schema can easily be used with Parquet. If you can't change
>>>>>>>> the input format anymore, you can at least use that approach on your output.
>>>>>>>>
>>>>>>>> Best,
>>>>>>>>
>>>>>>>> Arvid
>>>>>>>>
>>>>>>>> On Thu, Jan 16, 2020 at 2:53 PM aj <aj...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hi All,
>>>>>>>>>
>>>>>>>>> I have a use case where I am getting a different set of Avro
>>>>>>>>> records in Kafka. I am using the schema registry to store Avro schema. One
>>>>>>>>> topic can also have different types of records.
>>>>>>>>>
>>>>>>>>> Now I have created a GenericRecord Stream using
>>>>>>>>> kafkaAvroDeseralizer by defining custom
>>>>>>>>> Deserializer class like this
>>>>>>>>>
>>>>>>>>> @Override
>>>>>>>>> public GenericRecord deserialize(
>>>>>>>>> byte[] messageKey, byte[] message, String topic, int partition,
>>>>>>>>> long offset) {
>>>>>>>>> checkInitialized();
>>>>>>>>> return (GenericRecord) inner.deserialize(topic, message);
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>> private void checkInitialized() {
>>>>>>>>> if (inner == null) {
>>>>>>>>> Map<String, Object> props = new HashMap<>();
>>>>>>>>> props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
>>>>>>>>> registryUrl);
>>>>>>>>> props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG,
>>>>>>>>> false);
>>>>>>>>> SchemaRegistryClient client =
>>>>>>>>> new CachedSchemaRegistryClient(
>>>>>>>>> registryUrl,
>>>>>>>>> AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT);
>>>>>>>>> inner = new KafkaAvroDeserializer(client, props);
>>>>>>>>> }
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> And this is my consumer code :
>>>>>>>>>
>>>>>>>>> DataStreamSource<GenericRecord> input = env
>>>>>>>>>         .addSource(
>>>>>>>>>                 new FlinkKafkaConsumer010<GenericRecord>(topics,
>>>>>>>>>                         new
>>>>>>>>> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
>>>>>>>>>                         config).setStartFromEarliest());
>>>>>>>>>
>>>>>>>>> Now I want to write this stream partition on
>>>>>>>>> *event_name="a"/year=/month=/day=* in parquet format so that I
>>>>>>>>> can expose hive tables directly on top of this data.
>>>>>>>>> event_name is common field for all types of records that I am
>>>>>>>>> getting in Kafka.
>>>>>>>>> I am stuck as parquet writer needs a schema to write but my
>>>>>>>>> different records have different schemas  So how do I write this stream in
>>>>>>>>> s3 in above partition format.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Thanks & Regards,
>>>>>>>>> Anuj Jain
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Thanks & Regards,
>>>>>>> Anuj Jain
>>>>>>> Mob. : +91- 8588817877
>>>>>>> Skype : anuj.jain07
>>>>>>> <http://www.oracle.com/>
>>>>>>>
>>>>>>>
>>>>>>> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>> Thanks & Regards,
>>>>> Anuj Jain
>>>>> Mob. : +91- 8588817877
>>>>> Skype : anuj.jain07
>>>>> <http://www.oracle.com/>
>>>>>
>>>>>
>>>>> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>>>>>
>>>>
>>>
>>> --
>>> Thanks & Regards,
>>> Anuj Jain
>>> Mob. : +91- 8588817877
>>> Skype : anuj.jain07
>>> <http://www.oracle.com/>
>>>
>>>
>>> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>>>
>>
>
> --
> Thanks & Regards,
> Anuj Jain
> Mob. : +91- 8588817877
> Skype : anuj.jain07
> <http://www.oracle.com/>
>
>
> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>


-- 
Thanks & Regards,
Anuj Jain
Mob. : +91- 8588817877
Skype : anuj.jain07
<http://www.oracle.com/>


<http://www.cse.iitm.ac.in/%7Eanujjain/>

Re: Flink ParquetAvroWriters Sink

Posted by aj <aj...@gmail.com>.
Thanks, Arvid for all the clarification. I will work on the approach you
suggested.

Thanks,
Anuj

On Sat, Jan 18, 2020 at 10:17 PM Arvid Heise <ar...@ververica.com> wrote:

> Hi Anuj,
>
> I think that there may be a fundamental misunderstanding about the role of
> a schema registry in Kafka. So let me first clarify that.
> In each Avro/Parquet file, all records have the same schema. The schema is
> stored within the file, such that we can always retrieve the writer schema
> for the records.
> When Avro was first applied to Kafka, there was the basic question on how
> the writer schema for any record is known to the consumer. Storing the
> complete schema on each record would mean that each record would be much
> larger than needed. Hence, they added the schema registry that assigns a
> unique id to schema, which is then embedded into the records.
> Now, whenever I update a schema in my producer, I would have old records
> with the old schema id and new records with the new schema id.
> In my consumer, I'd use a fixed reader schema, such that my application
> would not need to worry if the record is written with old or new schema; my
> consumer would only see records with the reader schema.
>
> Given that background information, you see that in general, it's
> impossible with a generic approach to write the parquet with the same
> schema as it has been written in Kafka: the parquet schema needs to be
> supplied statically during query compilation while the actual used Avro
> schema in Kafka is only known when actually consuming data.
>
> But looking further down the road:
> * since you need one schema to write the parquet files, you'd need to
> decide: do you want to write with the new or the old schema in case of a
> schema update? That should also be the reader schema of your application
> for a given event type.
> * this decision has further implications: your application need to extract
> exactly one specific version of the schema from the schema registry at
> query compilation. That could be either a specific schema id or the latest
> schema for the event type.
> * that means that the output schema is locked until you restart your
> application and fetch a new latest schema in case of an update.
> * at that point, it might just be easier to use the approach that I
> outlined previously by bundling a specific schema with your application.
>
> If you want to extract the latest schema for a subject:
>
> var registryClient = new CachedSchemaRegistryClient(<schemaRepoUrls>, 1000);
> var versions = registryClient.getAllVersions(<subject>);
> var schema = registryClient.getSchemaById(versions.get(versions.size() - 1));
>
>
> On Sat, Jan 18, 2020 at 5:22 PM aj <aj...@gmail.com> wrote:
>
>> Thanks, Arvid.
>>
>> I do not fully understand the above approach,
>> so currently, I am thinking to go with the envelope approach that you
>> suggested.
>>
>> One more question I have if I do not want to keep schema in my consumer
>> project even its a single envelope schema. I want it to be fetched from the
>> schema registry and pass to my parquet-sink so that I always use the same
>> schema that is used by the producer.  Can you provide a sample code how can
>> i infer the schema from the generic record or get it from schema registry?
>>
>>
>> Regards,
>> Anuj
>>
>>
>>
>> On Sat, Jan 18, 2020 at 6:55 PM Arvid Heise <ar...@ververica.com> wrote:
>>
>>> (Readded user mailing list)
>>>
>>> Hi Anuj,
>>>
>>> since I'd still recommend going with distinct sources/sinks, let me try
>>> to solve your issues in this mail. If that doesn't work out, I'd address
>>> your concerns about the envelope format later.
>>>
>>> In Flink, you can have several subtopologies in the same application.
>>>
>>> Thus, for each event type, you can add
>>> AvroSource(eventType) -> generic transformation/validation ->
>>> AvroSink(eventType)
>>> for each event.
>>>
>>> I'd put all avro schema in one project and use an avro plugin to
>>> generate the respective Java Classes. Then I'd simply create a map of Avro
>>> Schema (GeneratedClassA::SCHEMA, GeneratedClassB::SCHEMA...) to topic name
>>> (event-a, event-b, ...).
>>> Next, I'd iterate over the list to add the respective subtopologies to
>>> env.
>>> Finally, execute everything.
>>>
>>> You have one project where all validations reside. But you'd have almost
>>> no overhead to process a given source of eventType. The downside of that
>>> approach is of course, that each new event type would require a
>>> redeployment, but that seems like what you'd want to do anyhow.
>>>
>>> Best,
>>>
>>> Arvid
>>>
>>> On Sat, Jan 18, 2020 at 2:08 PM aj <aj...@gmail.com> wrote:
>>>
>>>> Thanks, Arvid.
>>>>
>>>> 1. I like your approach as I can write a single consumer and put the
>>>> data in S3 in parquet format. The only challenge is there are extra columns
>>>> that always going to be null as at a time I will get one type of event.
>>>>
>>>> 2. if I go with a separate schema I am not sure how I can solve it
>>>> using a single generalize consumer. Till now what my understanding is I
>>>> have to write a consumer for each type of event. Each consumer will read
>>>> the whole data then filter the respective events from this and then I can
>>>> pass this stream to sink. But this does not look scalable solution as the
>>>> new events keep growing then I have to write a consumer for each new type.
>>>>
>>>>
>>>> DataStreamSource<GenericRecord> input = env
>>>>         .addSource(
>>>>                 new FlinkKafkaConsumer010<GenericRecord>(topics,
>>>>                         new
>>>> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
>>>>                         config).setStartFromEarliest());
>>>>
>>>> Example :
>>>>
>>>> * 1st Consumer:*
>>>>                   DataStreamSource<GenericRecord> input = env.addSource(
>>>>                 new FlinkKafkaConsumer010<GenericRecord>(topics,
>>>>                         new
>>>> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
>>>>                         config).setStartFromEarliest());
>>>> *                 DataStream<GenericRecord> aInput =
>>>> input.filter("event_name"= "a")*
>>>>
>>>> * 2nd Consumer:*
>>>>   DataStreamSource<GenericRecord> input = env.addSource(
>>>>                 new FlinkKafkaConsumer010<GenericRecord>(topics,
>>>>                         new
>>>> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
>>>>                         config).setStartFromEarliest());
>>>> *                 DataStream<GenericRecord> bInput =
>>>> input.filter("event_name"= "b")*
>>>>
>>>>
>>>> Can you help me How I solve this using a single consumer as I do not
>>>> want to write a separate consumer for each type of schema?
>>>>
>>>> For example, this is my consumer that contains different types of
>>>> records.
>>>>
>>>> DataStreamSource<GenericRecord> input = env
>>>>         .addSource(
>>>>                 new FlinkKafkaConsumer010<GenericRecord>(topics,
>>>>                         new
>>>> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
>>>>                         config).setStartFromEarliest());
>>>>
>>>> Now I can not write this stream directly as there is no common schema
>>>> of records in this stream. So possible way I am thinking is
>>>>
>>>> 1. Can I create multiple streams from this stream using the key by on *"event_name"
>>>> *and then write each stream separately.
>>>>
>>>> Just wanna know is this possible ??
>>>>
>>>>
>>>> Thanks,
>>>> Anuj
>>>>
>>>> On Fri, Jan 17, 2020 at 3:20 PM Arvid Heise <ar...@ververica.com>
>>>> wrote:
>>>>
>>>>> Hi Anuj,
>>>>>
>>>>> I originally understood that you would like to store data in the same
>>>>> Kafka topic and also want to store it in the same parquet file. In the
>>>>> past, I mostly used schema registry with Kafka Streams, where you could
>>>>> only store a schema for a key and value respectively. To use different
>>>>> record types in the same kafka topic, you had to disable schema
>>>>> compatibility checks and just stored the schemas as different versions
>>>>> under the same subject.
>>>>>
>>>>> Your approach is much better. You can ensure full schema
>>>>> compatibility. Nevertheless, it still shares the same drawback that
>>>>> consumption is much harder (using GenericRecord is proof of that) if you
>>>>> want to read/write everything into the same place. Also you will never be
>>>>> able to write one consistent file, as they can only have one schema (both
>>>>> on Avro and Parquet).
>>>>> So you only have two options:
>>>>> * keep schemas separated, but then you also need to write separate
>>>>> files per record type.
>>>>> * have a common schema (either my outlined approach or any other
>>>>> wrapper schema).
>>>>> The approach with a common schema makes only sense if you want to
>>>>> write it into one table/kafka topic.
>>>>>
>>>>> However, in the last mail you pointed out that you actually want to
>>>>> store the record types separately. Then, you should keep everything
>>>>> separated. Then you should have a sink for each type each getting the
>>>>> respective schema. Note that you'd need to fetch the schema manually from
>>>>> the schema registry when creating the query as you would need to pass it to
>>>>> the sink.
>>>>>
>>>>> Btw, do you actually have a need to write all events into one Kafka
>>>>> topic? The only real use case is to preserve the time order per key.
>>>>> Everything else is much more complicated then storing events individually.
>>>>>
>>>>> Best,
>>>>>
>>>>> Arvid
>>>>>
>>>>>
>>>>> On Thu, Jan 16, 2020 at 3:39 PM aj <aj...@gmail.com> wrote:
>>>>>
>>>>>> Hi Arvid,
>>>>>> Thanks for the quick response. I am new to this Avro design so can
>>>>>> you please help me understand and design for my use case.
>>>>>>
>>>>>> I have use case like this :
>>>>>>
>>>>>> 1. we have an app where a lot of action happened from the user side.
>>>>>> 2. for each action we collect some set of information that
>>>>>> defined using some key-value pairs. This information we want to define as
>>>>>> proper schemas so that we maintain the proper format and not push random
>>>>>> data.
>>>>>> 3. So we are defining for each action a schema and register in the
>>>>>> schema registry using  topic+record.name as the subject .
>>>>>> 4. So I do not think the producer side has any issue as whenever we
>>>>>> push the event to Kafka we register a new schema with the above subject.
>>>>>>
>>>>>> Example :
>>>>>>
>>>>>> {
>>>>>> event_name : "a"
>>>>>> "timestamp":
>>>>>> "properties"  :[
>>>>>>   "key-1 : "val-1"
>>>>>>    "key-2 : "val-2"
>>>>>> ]
>>>>>> }
>>>>>>
>>>>>> {
>>>>>> event_name : "b"
>>>>>> "timestamp":
>>>>>> "properties"  :[
>>>>>>   "key-3 : "val-3"
>>>>>>    "key-4 : "val-4"
>>>>>> ]
>>>>>> }
>>>>>>
>>>>>> Now I  have a consumer that will parse the data by fetching the
>>>>>> schema from schema registry and deserialize in the generic record streams.
>>>>>>
>>>>>> Why you think it will break as I am always deserializing with writer
>>>>>> schema only.
>>>>>>
>>>>>> As you suggested to keep an envelope Avro schema and not separate
>>>>>> schema for each type of event that I am generating. I have some doubts
>>>>>> about that:
>>>>>>
>>>>>> 1. How I enforce a schema on each event as it subtypes in the main
>>>>>> schema. so when I am getting a JSON event of type "a" how I enforce and
>>>>>> convert it to subschema type of "a" and push to Kafka.
>>>>>> 2. I want to create a separate hive table for each of the events so
>>>>>> when I write this data and lets says I have 20 events than for 19 columns I
>>>>>> am getting null values always in data.
>>>>>>
>>>>>> Please help me in doing this right way. It will be a great help and
>>>>>> learning for me.
>>>>>>
>>>>>> Thanks,
>>>>>> Anuj
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Thu, Jan 16, 2020 at 7:30 PM Arvid Heise <ar...@ververica.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Anuj,
>>>>>>>
>>>>>>> you should always avoid having records with different schemas in the
>>>>>>> same topic/dataset. You will break the compatibility features of the schema
>>>>>>> registry and your consumer/producer code is always hard to maintain.
>>>>>>>
>>>>>>> A common and scalable way to avoid it is to use some kind of
>>>>>>> envelope format.
>>>>>>>
>>>>>>> {
>>>>>>>   "namespace": "example",
>>>>>>>   "name": "Envelope",
>>>>>>>   "type": "record",
>>>>>>>   "fields": [
>>>>>>>     {
>>>>>>>       "name": "type1",
>>>>>>>       "type": ["null", {
>>>>>>>         "type": "record",
>>>>>>>         "fields": [ ... ]
>>>>>>>       }],
>>>>>>>       "default": null
>>>>>>>     },
>>>>>>>     {
>>>>>>>       "name": "type2",
>>>>>>>       "type": ["null", {
>>>>>>>         "type": "record",
>>>>>>>         "fields": [ ... ]
>>>>>>>       }],
>>>>>>>       "default": null
>>>>>>>     }
>>>>>>>   ]
>>>>>>> }
>>>>>>>
>>>>>>> This envelope is evolvable (arbitrary addition/removal of wrapped
>>>>>>> types, which by themselves can be evolved), and adds only a little overhead
>>>>>>> (1 byte per subtype). The downside is that you cannot enforce that exactly
>>>>>>> one of the subtypes is set.
>>>>>>>
>>>>>>> This schema is fully compatible with the schema registry, so no need
>>>>>>> to parse anything manually.
>>>>>>>
>>>>>>> This schema can easily be used with Parquet. If you can't change the
>>>>>>> input format anymore, you can at least use that approach on your output.
>>>>>>>
>>>>>>> Best,
>>>>>>>
>>>>>>> Arvid
>>>>>>>
>>>>>>> On Thu, Jan 16, 2020 at 2:53 PM aj <aj...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi All,
>>>>>>>>
>>>>>>>> I have a use case where I am getting a different set of Avro
>>>>>>>> records in Kafka. I am using the schema registry to store Avro schema. One
>>>>>>>> topic can also have different types of records.
>>>>>>>>
>>>>>>>> Now I have created a GenericRecord Stream using
>>>>>>>> kafkaAvroDeseralizer by defining custom
>>>>>>>> Deserializer class like this
>>>>>>>>
>>>>>>>> @Override
>>>>>>>> public GenericRecord deserialize(
>>>>>>>> byte[] messageKey, byte[] message, String topic, int partition,
>>>>>>>> long offset) {
>>>>>>>> checkInitialized();
>>>>>>>> return (GenericRecord) inner.deserialize(topic, message);
>>>>>>>> }
>>>>>>>>
>>>>>>>> private void checkInitialized() {
>>>>>>>> if (inner == null) {
>>>>>>>> Map<String, Object> props = new HashMap<>();
>>>>>>>> props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
>>>>>>>> registryUrl);
>>>>>>>> props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG,
>>>>>>>> false);
>>>>>>>> SchemaRegistryClient client =
>>>>>>>> new CachedSchemaRegistryClient(
>>>>>>>> registryUrl,
>>>>>>>> AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT);
>>>>>>>> inner = new KafkaAvroDeserializer(client, props);
>>>>>>>> }
>>>>>>>> }
>>>>>>>>
>>>>>>>>
>>>>>>>> And this is my consumer code :
>>>>>>>>
>>>>>>>> DataStreamSource<GenericRecord> input = env
>>>>>>>>         .addSource(
>>>>>>>>                 new FlinkKafkaConsumer010<GenericRecord>(topics,
>>>>>>>>                         new
>>>>>>>> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
>>>>>>>>                         config).setStartFromEarliest());
>>>>>>>>
>>>>>>>> Now I want to write this stream partition on
>>>>>>>> *event_name="a"/year=/month=/day=* in parquet format so that I can
>>>>>>>> expose hive tables directly on top of this data.
>>>>>>>> event_name is common field for all types of records that I am
>>>>>>>> getting in Kafka.
>>>>>>>> I am stuck as parquet writer needs a schema to write but my
>>>>>>>> different records have different schemas  So how do I write this stream in
>>>>>>>> s3 in above partition format.
>>>>>>>>
>>>>>>>>
>>>>>>>> Thanks & Regards,
>>>>>>>> Anuj Jain
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>> --
>>>>>> Thanks & Regards,
>>>>>> Anuj Jain
>>>>>> Mob. : +91- 8588817877
>>>>>> Skype : anuj.jain07
>>>>>> <http://www.oracle.com/>
>>>>>>
>>>>>>
>>>>>> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>>>>>>
>>>>>
>>>>
>>>> --
>>>> Thanks & Regards,
>>>> Anuj Jain
>>>> Mob. : +91- 8588817877
>>>> Skype : anuj.jain07
>>>> <http://www.oracle.com/>
>>>>
>>>>
>>>> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>>>>
>>>
>>
>> --
>> Thanks & Regards,
>> Anuj Jain
>> Mob. : +91- 8588817877
>> Skype : anuj.jain07
>> <http://www.oracle.com/>
>>
>>
>> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>>
>

-- 
Thanks & Regards,
Anuj Jain
Mob. : +91- 8588817877
Skype : anuj.jain07
<http://www.oracle.com/>


<http://www.cse.iitm.ac.in/%7Eanujjain/>

Re: Flink ParquetAvroWriters Sink

Posted by Arvid Heise <ar...@ververica.com>.
Hi Anuj,

I think that there may be a fundamental misunderstanding about the role of
a schema registry in Kafka. So let me first clarify that.
In each Avro/Parquet file, all records have the same schema. The schema is
stored within the file, such that we can always retrieve the writer schema
for the records.
When Avro was first applied to Kafka, there was the basic question on how
the writer schema for any record is known to the consumer. Storing the
complete schema on each record would mean that each record would be much
larger than needed. Hence, they added the schema registry that assigns a
unique id to schema, which is then embedded into the records.
Now, whenever I update a schema in my producer, I would have old records
with the old schema id and new records with the new schema id.
In my consumer, I'd use a fixed reader schema, such that my application
would not need to worry if the record is written with old or new schema; my
consumer would only see records with the reader schema.

Given that background information, you see that in general, it's impossible
with a generic approach to write the parquet with the same schema as it has
been written in Kafka: the parquet schema needs to be supplied statically
during query compilation while the actual used Avro schema in Kafka is only
known when actually consuming data.

But looking further down the road:
* since you need one schema to write the parquet files, you'd need to
decide: do you want to write with the new or the old schema in case of a
schema update? That should also be the reader schema of your application
for a given event type.
* this decision has further implications: your application need to extract
exactly one specific version of the schema from the schema registry at
query compilation. That could be either a specific schema id or the latest
schema for the event type.
* that means that the output schema is locked until you restart your
application and fetch a new latest schema in case of an update.
* at that point, it might just be easier to use the approach that I
outlined previously by bundling a specific schema with your application.

If you want to extract the latest schema for a subject:

var registryClient = new CachedSchemaRegistryClient(<schemaRepoUrls>, 1000);
var versions = registryClient.getAllVersions(<subject>);
var schema = registryClient.getSchemaById(versions.get(versions.size() - 1));


On Sat, Jan 18, 2020 at 5:22 PM aj <aj...@gmail.com> wrote:

> Thanks, Arvid.
>
> I do not fully understand the above approach,
> so currently, I am thinking to go with the envelope approach that you
> suggested.
>
> One more question I have if I do not want to keep schema in my consumer
> project even its a single envelope schema. I want it to be fetched from the
> schema registry and pass to my parquet-sink so that I always use the same
> schema that is used by the producer.  Can you provide a sample code how can
> i infer the schema from the generic record or get it from schema registry?
>
>
> Regards,
> Anuj
>
>
>
> On Sat, Jan 18, 2020 at 6:55 PM Arvid Heise <ar...@ververica.com> wrote:
>
>> (Readded user mailing list)
>>
>> Hi Anuj,
>>
>> since I'd still recommend going with distinct sources/sinks, let me try
>> to solve your issues in this mail. If that doesn't work out, I'd address
>> your concerns about the envelope format later.
>>
>> In Flink, you can have several subtopologies in the same application.
>>
>> Thus, for each event type, you can add
>> AvroSource(eventType) -> generic transformation/validation ->
>> AvroSink(eventType)
>> for each event.
>>
>> I'd put all avro schema in one project and use an avro plugin to generate
>> the respective Java Classes. Then I'd simply create a map of Avro Schema
>> (GeneratedClassA::SCHEMA, GeneratedClassB::SCHEMA...) to topic name
>> (event-a, event-b, ...).
>> Next, I'd iterate over the list to add the respective subtopologies to
>> env.
>> Finally, execute everything.
>>
>> You have one project where all validations reside. But you'd have almost
>> no overhead to process a given source of eventType. The downside of that
>> approach is of course, that each new event type would require a
>> redeployment, but that seems like what you'd want to do anyhow.
>>
>> Best,
>>
>> Arvid
>>
>> On Sat, Jan 18, 2020 at 2:08 PM aj <aj...@gmail.com> wrote:
>>
>>> Thanks, Arvid.
>>>
>>> 1. I like your approach as I can write a single consumer and put the
>>> data in S3 in parquet format. The only challenge is there are extra columns
>>> that always going to be null as at a time I will get one type of event.
>>>
>>> 2. if I go with a separate schema I am not sure how I can solve it using
>>> a single generalize consumer. Till now what my understanding is I have to
>>> write a consumer for each type of event. Each consumer will read the whole
>>> data then filter the respective events from this and then I can pass this
>>> stream to sink. But this does not look scalable solution as the new events
>>> keep growing then I have to write a consumer for each new type.
>>>
>>>
>>> DataStreamSource<GenericRecord> input = env
>>>         .addSource(
>>>                 new FlinkKafkaConsumer010<GenericRecord>(topics,
>>>                         new
>>> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
>>>                         config).setStartFromEarliest());
>>>
>>> Example :
>>>
>>> * 1st Consumer:*
>>>                   DataStreamSource<GenericRecord> input = env.addSource(
>>>                 new FlinkKafkaConsumer010<GenericRecord>(topics,
>>>                         new
>>> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
>>>                         config).setStartFromEarliest());
>>> *                 DataStream<GenericRecord> aInput =
>>> input.filter("event_name"= "a")*
>>>
>>> * 2nd Consumer:*
>>>   DataStreamSource<GenericRecord> input = env.addSource(
>>>                 new FlinkKafkaConsumer010<GenericRecord>(topics,
>>>                         new
>>> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
>>>                         config).setStartFromEarliest());
>>> *                 DataStream<GenericRecord> bInput =
>>> input.filter("event_name"= "b")*
>>>
>>>
>>> Can you help me How I solve this using a single consumer as I do not
>>> want to write a separate consumer for each type of schema?
>>>
>>> For example, this is my consumer that contains different types of
>>> records.
>>>
>>> DataStreamSource<GenericRecord> input = env
>>>         .addSource(
>>>                 new FlinkKafkaConsumer010<GenericRecord>(topics,
>>>                         new
>>> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
>>>                         config).setStartFromEarliest());
>>>
>>> Now I can not write this stream directly as there is no common schema of
>>> records in this stream. So possible way I am thinking is
>>>
>>> 1. Can I create multiple streams from this stream using the key by on *"event_name"
>>> *and then write each stream separately.
>>>
>>> Just wanna know is this possible ??
>>>
>>>
>>> Thanks,
>>> Anuj
>>>
>>> On Fri, Jan 17, 2020 at 3:20 PM Arvid Heise <ar...@ververica.com> wrote:
>>>
>>>> Hi Anuj,
>>>>
>>>> I originally understood that you would like to store data in the same
>>>> Kafka topic and also want to store it in the same parquet file. In the
>>>> past, I mostly used schema registry with Kafka Streams, where you could
>>>> only store a schema for a key and value respectively. To use different
>>>> record types in the same kafka topic, you had to disable schema
>>>> compatibility checks and just stored the schemas as different versions
>>>> under the same subject.
>>>>
>>>> Your approach is much better. You can ensure full schema compatibility.
>>>> Nevertheless, it still shares the same drawback that consumption is much
>>>> harder (using GenericRecord is proof of that) if you want to read/write
>>>> everything into the same place. Also you will never be able to write one
>>>> consistent file, as they can only have one schema (both on Avro and
>>>> Parquet).
>>>> So you only have two options:
>>>> * keep schemas separated, but then you also need to write separate
>>>> files per record type.
>>>> * have a common schema (either my outlined approach or any other
>>>> wrapper schema).
>>>> The approach with a common schema makes only sense if you want to write
>>>> it into one table/kafka topic.
>>>>
>>>> However, in the last mail you pointed out that you actually want to
>>>> store the record types separately. Then, you should keep everything
>>>> separated. Then you should have a sink for each type each getting the
>>>> respective schema. Note that you'd need to fetch the schema manually from
>>>> the schema registry when creating the query as you would need to pass it to
>>>> the sink.
>>>>
>>>> Btw, do you actually have a need to write all events into one Kafka
>>>> topic? The only real use case is to preserve the time order per key.
>>>> Everything else is much more complicated then storing events individually.
>>>>
>>>> Best,
>>>>
>>>> Arvid
>>>>
>>>>
>>>> On Thu, Jan 16, 2020 at 3:39 PM aj <aj...@gmail.com> wrote:
>>>>
>>>>> Hi Arvid,
>>>>> Thanks for the quick response. I am new to this Avro design so can you
>>>>> please help me understand and design for my use case.
>>>>>
>>>>> I have use case like this :
>>>>>
>>>>> 1. we have an app where a lot of action happened from the user side.
>>>>> 2. for each action we collect some set of information that
>>>>> defined using some key-value pairs. This information we want to define as
>>>>> proper schemas so that we maintain the proper format and not push random
>>>>> data.
>>>>> 3. So we are defining for each action a schema and register in the
>>>>> schema registry using  topic+record.name as the subject .
>>>>> 4. So I do not think the producer side has any issue as whenever we
>>>>> push the event to Kafka we register a new schema with the above subject.
>>>>>
>>>>> Example :
>>>>>
>>>>> {
>>>>> event_name : "a"
>>>>> "timestamp":
>>>>> "properties"  :[
>>>>>   "key-1 : "val-1"
>>>>>    "key-2 : "val-2"
>>>>> ]
>>>>> }
>>>>>
>>>>> {
>>>>> event_name : "b"
>>>>> "timestamp":
>>>>> "properties"  :[
>>>>>   "key-3 : "val-3"
>>>>>    "key-4 : "val-4"
>>>>> ]
>>>>> }
>>>>>
>>>>> Now I  have a consumer that will parse the data by fetching the schema
>>>>> from schema registry and deserialize in the generic record streams.
>>>>>
>>>>> Why you think it will break as I am always deserializing with writer
>>>>> schema only.
>>>>>
>>>>> As you suggested to keep an envelope Avro schema and not separate
>>>>> schema for each type of event that I am generating. I have some doubts
>>>>> about that:
>>>>>
>>>>> 1. How I enforce a schema on each event as it subtypes in the main
>>>>> schema. so when I am getting a JSON event of type "a" how I enforce and
>>>>> convert it to subschema type of "a" and push to Kafka.
>>>>> 2. I want to create a separate hive table for each of the events so
>>>>> when I write this data and lets says I have 20 events than for 19 columns I
>>>>> am getting null values always in data.
>>>>>
>>>>> Please help me in doing this right way. It will be a great help and
>>>>> learning for me.
>>>>>
>>>>> Thanks,
>>>>> Anuj
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Thu, Jan 16, 2020 at 7:30 PM Arvid Heise <ar...@ververica.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Anuj,
>>>>>>
>>>>>> you should always avoid having records with different schemas in the
>>>>>> same topic/dataset. You will break the compatibility features of the schema
>>>>>> registry and your consumer/producer code is always hard to maintain.
>>>>>>
>>>>>> A common and scalable way to avoid it is to use some kind of envelope
>>>>>> format.
>>>>>>
>>>>>> {
>>>>>>   "namespace": "example",
>>>>>>   "name": "Envelope",
>>>>>>   "type": "record",
>>>>>>   "fields": [
>>>>>>     {
>>>>>>       "name": "type1",
>>>>>>       "type": ["null", {
>>>>>>         "type": "record",
>>>>>>         "fields": [ ... ]
>>>>>>       }],
>>>>>>       "default": null
>>>>>>     },
>>>>>>     {
>>>>>>       "name": "type2",
>>>>>>       "type": ["null", {
>>>>>>         "type": "record",
>>>>>>         "fields": [ ... ]
>>>>>>       }],
>>>>>>       "default": null
>>>>>>     }
>>>>>>   ]
>>>>>> }
>>>>>>
>>>>>> This envelope is evolvable (arbitrary addition/removal of wrapped
>>>>>> types, which by themselves can be evolved), and adds only a little overhead
>>>>>> (1 byte per subtype). The downside is that you cannot enforce that exactly
>>>>>> one of the subtypes is set.
>>>>>>
>>>>>> This schema is fully compatible with the schema registry, so no need
>>>>>> to parse anything manually.
>>>>>>
>>>>>> This schema can easily be used with Parquet. If you can't change the
>>>>>> input format anymore, you can at least use that approach on your output.
>>>>>>
>>>>>> Best,
>>>>>>
>>>>>> Arvid
>>>>>>
>>>>>> On Thu, Jan 16, 2020 at 2:53 PM aj <aj...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi All,
>>>>>>>
>>>>>>> I have a use case where I am getting a different set of Avro records
>>>>>>> in Kafka. I am using the schema registry to store Avro schema. One topic
>>>>>>> can also have different types of records.
>>>>>>>
>>>>>>> Now I have created a GenericRecord Stream using kafkaAvroDeseralizer
>>>>>>> by defining custom
>>>>>>> Deserializer class like this
>>>>>>>
>>>>>>> @Override
>>>>>>> public GenericRecord deserialize(
>>>>>>> byte[] messageKey, byte[] message, String topic, int partition, long
>>>>>>> offset) {
>>>>>>> checkInitialized();
>>>>>>> return (GenericRecord) inner.deserialize(topic, message);
>>>>>>> }
>>>>>>>
>>>>>>> private void checkInitialized() {
>>>>>>> if (inner == null) {
>>>>>>> Map<String, Object> props = new HashMap<>();
>>>>>>> props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
>>>>>>> registryUrl);
>>>>>>> props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG,
>>>>>>> false);
>>>>>>> SchemaRegistryClient client =
>>>>>>> new CachedSchemaRegistryClient(
>>>>>>> registryUrl,
>>>>>>> AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT);
>>>>>>> inner = new KafkaAvroDeserializer(client, props);
>>>>>>> }
>>>>>>> }
>>>>>>>
>>>>>>>
>>>>>>> And this is my consumer code :
>>>>>>>
>>>>>>> DataStreamSource<GenericRecord> input = env
>>>>>>>         .addSource(
>>>>>>>                 new FlinkKafkaConsumer010<GenericRecord>(topics,
>>>>>>>                         new
>>>>>>> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
>>>>>>>                         config).setStartFromEarliest());
>>>>>>>
>>>>>>> Now I want to write this stream partition on
>>>>>>> *event_name="a"/year=/month=/day=* in parquet format so that I can
>>>>>>> expose hive tables directly on top of this data.
>>>>>>> event_name is common field for all types of records that I am
>>>>>>> getting in Kafka.
>>>>>>> I am stuck as parquet writer needs a schema to write but my
>>>>>>> different records have different schemas  So how do I write this stream in
>>>>>>> s3 in above partition format.
>>>>>>>
>>>>>>>
>>>>>>> Thanks & Regards,
>>>>>>> Anuj Jain
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>> Thanks & Regards,
>>>>> Anuj Jain
>>>>> Mob. : +91- 8588817877
>>>>> Skype : anuj.jain07
>>>>> <http://www.oracle.com/>
>>>>>
>>>>>
>>>>> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>>>>>
>>>>
>>>
>>> --
>>> Thanks & Regards,
>>> Anuj Jain
>>> Mob. : +91- 8588817877
>>> Skype : anuj.jain07
>>> <http://www.oracle.com/>
>>>
>>>
>>> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>>>
>>
>
> --
> Thanks & Regards,
> Anuj Jain
> Mob. : +91- 8588817877
> Skype : anuj.jain07
> <http://www.oracle.com/>
>
>
> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>

Re: Flink ParquetAvroWriters Sink

Posted by aj <aj...@gmail.com>.
Thanks, Arvid.

I do not fully understand the above approach,
so currently, I am thinking to go with the envelope approach that you
suggested.

One more question I have if I do not want to keep schema in my consumer
project even its a single envelope schema. I want it to be fetched from the
schema registry and pass to my parquet-sink so that I always use the same
schema that is used by the producer.  Can you provide a sample code how can
i infer the schema from the generic record or get it from schema registry?


Regards,
Anuj



On Sat, Jan 18, 2020 at 6:55 PM Arvid Heise <ar...@ververica.com> wrote:

> (Readded user mailing list)
>
> Hi Anuj,
>
> since I'd still recommend going with distinct sources/sinks, let me try to
> solve your issues in this mail. If that doesn't work out, I'd address your
> concerns about the envelope format later.
>
> In Flink, you can have several subtopologies in the same application.
>
> Thus, for each event type, you can add
> AvroSource(eventType) -> generic transformation/validation ->
> AvroSink(eventType)
> for each event.
>
> I'd put all avro schema in one project and use an avro plugin to generate
> the respective Java Classes. Then I'd simply create a map of Avro Schema
> (GeneratedClassA::SCHEMA, GeneratedClassB::SCHEMA...) to topic name
> (event-a, event-b, ...).
> Next, I'd iterate over the list to add the respective subtopologies to env.
> Finally, execute everything.
>
> You have one project where all validations reside. But you'd have almost
> no overhead to process a given source of eventType. The downside of that
> approach is of course, that each new event type would require a
> redeployment, but that seems like what you'd want to do anyhow.
>
> Best,
>
> Arvid
>
> On Sat, Jan 18, 2020 at 2:08 PM aj <aj...@gmail.com> wrote:
>
>> Thanks, Arvid.
>>
>> 1. I like your approach as I can write a single consumer and put the data
>> in S3 in parquet format. The only challenge is there are extra columns that
>> always going to be null as at a time I will get one type of event.
>>
>> 2. if I go with a separate schema I am not sure how I can solve it using
>> a single generalize consumer. Till now what my understanding is I have to
>> write a consumer for each type of event. Each consumer will read the whole
>> data then filter the respective events from this and then I can pass this
>> stream to sink. But this does not look scalable solution as the new events
>> keep growing then I have to write a consumer for each new type.
>>
>>
>> DataStreamSource<GenericRecord> input = env
>>         .addSource(
>>                 new FlinkKafkaConsumer010<GenericRecord>(topics,
>>                         new
>> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
>>                         config).setStartFromEarliest());
>>
>> Example :
>>
>> * 1st Consumer:*
>>                   DataStreamSource<GenericRecord> input = env.addSource(
>>                 new FlinkKafkaConsumer010<GenericRecord>(topics,
>>                         new
>> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
>>                         config).setStartFromEarliest());
>> *                 DataStream<GenericRecord> aInput =
>> input.filter("event_name"= "a")*
>>
>> * 2nd Consumer:*
>>   DataStreamSource<GenericRecord> input = env.addSource(
>>                 new FlinkKafkaConsumer010<GenericRecord>(topics,
>>                         new
>> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
>>                         config).setStartFromEarliest());
>> *                 DataStream<GenericRecord> bInput =
>> input.filter("event_name"= "b")*
>>
>>
>> Can you help me How I solve this using a single consumer as I do not want
>> to write a separate consumer for each type of schema?
>>
>> For example, this is my consumer that contains different types of records.
>>
>> DataStreamSource<GenericRecord> input = env
>>         .addSource(
>>                 new FlinkKafkaConsumer010<GenericRecord>(topics,
>>                         new
>> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
>>                         config).setStartFromEarliest());
>>
>> Now I can not write this stream directly as there is no common schema of
>> records in this stream. So possible way I am thinking is
>>
>> 1. Can I create multiple streams from this stream using the key by on *"event_name"
>> *and then write each stream separately.
>>
>> Just wanna know is this possible ??
>>
>>
>> Thanks,
>> Anuj
>>
>> On Fri, Jan 17, 2020 at 3:20 PM Arvid Heise <ar...@ververica.com> wrote:
>>
>>> Hi Anuj,
>>>
>>> I originally understood that you would like to store data in the same
>>> Kafka topic and also want to store it in the same parquet file. In the
>>> past, I mostly used schema registry with Kafka Streams, where you could
>>> only store a schema for a key and value respectively. To use different
>>> record types in the same kafka topic, you had to disable schema
>>> compatibility checks and just stored the schemas as different versions
>>> under the same subject.
>>>
>>> Your approach is much better. You can ensure full schema compatibility.
>>> Nevertheless, it still shares the same drawback that consumption is much
>>> harder (using GenericRecord is proof of that) if you want to read/write
>>> everything into the same place. Also you will never be able to write one
>>> consistent file, as they can only have one schema (both on Avro and
>>> Parquet).
>>> So you only have two options:
>>> * keep schemas separated, but then you also need to write separate files
>>> per record type.
>>> * have a common schema (either my outlined approach or any other wrapper
>>> schema).
>>> The approach with a common schema makes only sense if you want to write
>>> it into one table/kafka topic.
>>>
>>> However, in the last mail you pointed out that you actually want to
>>> store the record types separately. Then, you should keep everything
>>> separated. Then you should have a sink for each type each getting the
>>> respective schema. Note that you'd need to fetch the schema manually from
>>> the schema registry when creating the query as you would need to pass it to
>>> the sink.
>>>
>>> Btw, do you actually have a need to write all events into one Kafka
>>> topic? The only real use case is to preserve the time order per key.
>>> Everything else is much more complicated then storing events individually.
>>>
>>> Best,
>>>
>>> Arvid
>>>
>>>
>>> On Thu, Jan 16, 2020 at 3:39 PM aj <aj...@gmail.com> wrote:
>>>
>>>> Hi Arvid,
>>>> Thanks for the quick response. I am new to this Avro design so can you
>>>> please help me understand and design for my use case.
>>>>
>>>> I have use case like this :
>>>>
>>>> 1. we have an app where a lot of action happened from the user side.
>>>> 2. for each action we collect some set of information that
>>>> defined using some key-value pairs. This information we want to define as
>>>> proper schemas so that we maintain the proper format and not push random
>>>> data.
>>>> 3. So we are defining for each action a schema and register in the
>>>> schema registry using  topic+record.name as the subject .
>>>> 4. So I do not think the producer side has any issue as whenever we
>>>> push the event to Kafka we register a new schema with the above subject.
>>>>
>>>> Example :
>>>>
>>>> {
>>>> event_name : "a"
>>>> "timestamp":
>>>> "properties"  :[
>>>>   "key-1 : "val-1"
>>>>    "key-2 : "val-2"
>>>> ]
>>>> }
>>>>
>>>> {
>>>> event_name : "b"
>>>> "timestamp":
>>>> "properties"  :[
>>>>   "key-3 : "val-3"
>>>>    "key-4 : "val-4"
>>>> ]
>>>> }
>>>>
>>>> Now I  have a consumer that will parse the data by fetching the schema
>>>> from schema registry and deserialize in the generic record streams.
>>>>
>>>> Why you think it will break as I am always deserializing with writer
>>>> schema only.
>>>>
>>>> As you suggested to keep an envelope Avro schema and not separate
>>>> schema for each type of event that I am generating. I have some doubts
>>>> about that:
>>>>
>>>> 1. How I enforce a schema on each event as it subtypes in the main
>>>> schema. so when I am getting a JSON event of type "a" how I enforce and
>>>> convert it to subschema type of "a" and push to Kafka.
>>>> 2. I want to create a separate hive table for each of the events so
>>>> when I write this data and lets says I have 20 events than for 19 columns I
>>>> am getting null values always in data.
>>>>
>>>> Please help me in doing this right way. It will be a great help and
>>>> learning for me.
>>>>
>>>> Thanks,
>>>> Anuj
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Thu, Jan 16, 2020 at 7:30 PM Arvid Heise <ar...@ververica.com>
>>>> wrote:
>>>>
>>>>> Hi Anuj,
>>>>>
>>>>> you should always avoid having records with different schemas in the
>>>>> same topic/dataset. You will break the compatibility features of the schema
>>>>> registry and your consumer/producer code is always hard to maintain.
>>>>>
>>>>> A common and scalable way to avoid it is to use some kind of envelope
>>>>> format.
>>>>>
>>>>> {
>>>>>   "namespace": "example",
>>>>>   "name": "Envelope",
>>>>>   "type": "record",
>>>>>   "fields": [
>>>>>     {
>>>>>       "name": "type1",
>>>>>       "type": ["null", {
>>>>>         "type": "record",
>>>>>         "fields": [ ... ]
>>>>>       }],
>>>>>       "default": null
>>>>>     },
>>>>>     {
>>>>>       "name": "type2",
>>>>>       "type": ["null", {
>>>>>         "type": "record",
>>>>>         "fields": [ ... ]
>>>>>       }],
>>>>>       "default": null
>>>>>     }
>>>>>   ]
>>>>> }
>>>>>
>>>>> This envelope is evolvable (arbitrary addition/removal of wrapped
>>>>> types, which by themselves can be evolved), and adds only a little overhead
>>>>> (1 byte per subtype). The downside is that you cannot enforce that exactly
>>>>> one of the subtypes is set.
>>>>>
>>>>> This schema is fully compatible with the schema registry, so no need
>>>>> to parse anything manually.
>>>>>
>>>>> This schema can easily be used with Parquet. If you can't change the
>>>>> input format anymore, you can at least use that approach on your output.
>>>>>
>>>>> Best,
>>>>>
>>>>> Arvid
>>>>>
>>>>> On Thu, Jan 16, 2020 at 2:53 PM aj <aj...@gmail.com> wrote:
>>>>>
>>>>>> Hi All,
>>>>>>
>>>>>> I have a use case where I am getting a different set of Avro records
>>>>>> in Kafka. I am using the schema registry to store Avro schema. One topic
>>>>>> can also have different types of records.
>>>>>>
>>>>>> Now I have created a GenericRecord Stream using kafkaAvroDeseralizer
>>>>>> by defining custom
>>>>>> Deserializer class like this
>>>>>>
>>>>>> @Override
>>>>>> public GenericRecord deserialize(
>>>>>> byte[] messageKey, byte[] message, String topic, int partition, long
>>>>>> offset) {
>>>>>> checkInitialized();
>>>>>> return (GenericRecord) inner.deserialize(topic, message);
>>>>>> }
>>>>>>
>>>>>> private void checkInitialized() {
>>>>>> if (inner == null) {
>>>>>> Map<String, Object> props = new HashMap<>();
>>>>>> props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
>>>>>> registryUrl);
>>>>>> props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG,
>>>>>> false);
>>>>>> SchemaRegistryClient client =
>>>>>> new CachedSchemaRegistryClient(
>>>>>> registryUrl,
>>>>>> AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT);
>>>>>> inner = new KafkaAvroDeserializer(client, props);
>>>>>> }
>>>>>> }
>>>>>>
>>>>>>
>>>>>> And this is my consumer code :
>>>>>>
>>>>>> DataStreamSource<GenericRecord> input = env
>>>>>>         .addSource(
>>>>>>                 new FlinkKafkaConsumer010<GenericRecord>(topics,
>>>>>>                         new
>>>>>> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
>>>>>>                         config).setStartFromEarliest());
>>>>>>
>>>>>> Now I want to write this stream partition on
>>>>>> *event_name="a"/year=/month=/day=* in parquet format so that I can
>>>>>> expose hive tables directly on top of this data.
>>>>>> event_name is common field for all types of records that I am getting
>>>>>> in Kafka.
>>>>>> I am stuck as parquet writer needs a schema to write but my different
>>>>>> records have different schemas  So how do I write this stream in s3 in
>>>>>> above partition format.
>>>>>>
>>>>>>
>>>>>> Thanks & Regards,
>>>>>> Anuj Jain
>>>>>>
>>>>>>
>>>>>>
>>>>>> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>>>>>>
>>>>>
>>>>
>>>> --
>>>> Thanks & Regards,
>>>> Anuj Jain
>>>> Mob. : +91- 8588817877
>>>> Skype : anuj.jain07
>>>> <http://www.oracle.com/>
>>>>
>>>>
>>>> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>>>>
>>>
>>
>> --
>> Thanks & Regards,
>> Anuj Jain
>> Mob. : +91- 8588817877
>> Skype : anuj.jain07
>> <http://www.oracle.com/>
>>
>>
>> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>>
>

-- 
Thanks & Regards,
Anuj Jain
Mob. : +91- 8588817877
Skype : anuj.jain07
<http://www.oracle.com/>


<http://www.cse.iitm.ac.in/%7Eanujjain/>

Re: Flink ParquetAvroWriters Sink

Posted by Arvid Heise <ar...@ververica.com>.
(Readded user mailing list)

Hi Anuj,

since I'd still recommend going with distinct sources/sinks, let me try to
solve your issues in this mail. If that doesn't work out, I'd address your
concerns about the envelope format later.

In Flink, you can have several subtopologies in the same application.

Thus, for each event type, you can add
AvroSource(eventType) -> generic transformation/validation ->
AvroSink(eventType)
for each event.

I'd put all avro schema in one project and use an avro plugin to generate
the respective Java Classes. Then I'd simply create a map of Avro Schema
(GeneratedClassA::SCHEMA, GeneratedClassB::SCHEMA...) to topic name
(event-a, event-b, ...).
Next, I'd iterate over the list to add the respective subtopologies to env.
Finally, execute everything.

You have one project where all validations reside. But you'd have almost no
overhead to process a given source of eventType. The downside of that
approach is of course, that each new event type would require a
redeployment, but that seems like what you'd want to do anyhow.

Best,

Arvid

On Sat, Jan 18, 2020 at 2:08 PM aj <aj...@gmail.com> wrote:

> Thanks, Arvid.
>
> 1. I like your approach as I can write a single consumer and put the data
> in S3 in parquet format. The only challenge is there are extra columns that
> always going to be null as at a time I will get one type of event.
>
> 2. if I go with a separate schema I am not sure how I can solve it using a
> single generalize consumer. Till now what my understanding is I have to
> write a consumer for each type of event. Each consumer will read the whole
> data then filter the respective events from this and then I can pass this
> stream to sink. But this does not look scalable solution as the new events
> keep growing then I have to write a consumer for each new type.
>
>
> DataStreamSource<GenericRecord> input = env
>         .addSource(
>                 new FlinkKafkaConsumer010<GenericRecord>(topics,
>                         new
> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
>                         config).setStartFromEarliest());
>
> Example :
>
> * 1st Consumer:*
>                   DataStreamSource<GenericRecord> input = env.addSource(
>                 new FlinkKafkaConsumer010<GenericRecord>(topics,
>                         new
> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
>                         config).setStartFromEarliest());
> *                 DataStream<GenericRecord> aInput =
> input.filter("event_name"= "a")*
>
> * 2nd Consumer:*
>   DataStreamSource<GenericRecord> input = env.addSource(
>                 new FlinkKafkaConsumer010<GenericRecord>(topics,
>                         new
> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
>                         config).setStartFromEarliest());
> *                 DataStream<GenericRecord> bInput =
> input.filter("event_name"= "b")*
>
>
> Can you help me How I solve this using a single consumer as I do not want
> to write a separate consumer for each type of schema?
>
> For example, this is my consumer that contains different types of records.
>
> DataStreamSource<GenericRecord> input = env
>         .addSource(
>                 new FlinkKafkaConsumer010<GenericRecord>(topics,
>                         new
> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
>                         config).setStartFromEarliest());
>
> Now I can not write this stream directly as there is no common schema of
> records in this stream. So possible way I am thinking is
>
> 1. Can I create multiple streams from this stream using the key by on *"event_name"
> *and then write each stream separately.
>
> Just wanna know is this possible ??
>
>
> Thanks,
> Anuj
>
> On Fri, Jan 17, 2020 at 3:20 PM Arvid Heise <ar...@ververica.com> wrote:
>
>> Hi Anuj,
>>
>> I originally understood that you would like to store data in the same
>> Kafka topic and also want to store it in the same parquet file. In the
>> past, I mostly used schema registry with Kafka Streams, where you could
>> only store a schema for a key and value respectively. To use different
>> record types in the same kafka topic, you had to disable schema
>> compatibility checks and just stored the schemas as different versions
>> under the same subject.
>>
>> Your approach is much better. You can ensure full schema compatibility.
>> Nevertheless, it still shares the same drawback that consumption is much
>> harder (using GenericRecord is proof of that) if you want to read/write
>> everything into the same place. Also you will never be able to write one
>> consistent file, as they can only have one schema (both on Avro and
>> Parquet).
>> So you only have two options:
>> * keep schemas separated, but then you also need to write separate files
>> per record type.
>> * have a common schema (either my outlined approach or any other wrapper
>> schema).
>> The approach with a common schema makes only sense if you want to write
>> it into one table/kafka topic.
>>
>> However, in the last mail you pointed out that you actually want to store
>> the record types separately. Then, you should keep everything separated.
>> Then you should have a sink for each type each getting the respective
>> schema. Note that you'd need to fetch the schema manually from the schema
>> registry when creating the query as you would need to pass it to the sink.
>>
>> Btw, do you actually have a need to write all events into one Kafka
>> topic? The only real use case is to preserve the time order per key.
>> Everything else is much more complicated then storing events individually.
>>
>> Best,
>>
>> Arvid
>>
>>
>> On Thu, Jan 16, 2020 at 3:39 PM aj <aj...@gmail.com> wrote:
>>
>>> Hi Arvid,
>>> Thanks for the quick response. I am new to this Avro design so can you
>>> please help me understand and design for my use case.
>>>
>>> I have use case like this :
>>>
>>> 1. we have an app where a lot of action happened from the user side.
>>> 2. for each action we collect some set of information that defined using
>>> some key-value pairs. This information we want to define as proper schemas
>>> so that we maintain the proper format and not push random data.
>>> 3. So we are defining for each action a schema and register in the
>>> schema registry using  topic+record.name as the subject .
>>> 4. So I do not think the producer side has any issue as whenever we push
>>> the event to Kafka we register a new schema with the above subject.
>>>
>>> Example :
>>>
>>> {
>>> event_name : "a"
>>> "timestamp":
>>> "properties"  :[
>>>   "key-1 : "val-1"
>>>    "key-2 : "val-2"
>>> ]
>>> }
>>>
>>> {
>>> event_name : "b"
>>> "timestamp":
>>> "properties"  :[
>>>   "key-3 : "val-3"
>>>    "key-4 : "val-4"
>>> ]
>>> }
>>>
>>> Now I  have a consumer that will parse the data by fetching the schema
>>> from schema registry and deserialize in the generic record streams.
>>>
>>> Why you think it will break as I am always deserializing with writer
>>> schema only.
>>>
>>> As you suggested to keep an envelope Avro schema and not separate schema
>>> for each type of event that I am generating. I have some doubts about that:
>>>
>>> 1. How I enforce a schema on each event as it subtypes in the main
>>> schema. so when I am getting a JSON event of type "a" how I enforce and
>>> convert it to subschema type of "a" and push to Kafka.
>>> 2. I want to create a separate hive table for each of the events so when
>>> I write this data and lets says I have 20 events than for 19 columns I am
>>> getting null values always in data.
>>>
>>> Please help me in doing this right way. It will be a great help and
>>> learning for me.
>>>
>>> Thanks,
>>> Anuj
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Thu, Jan 16, 2020 at 7:30 PM Arvid Heise <ar...@ververica.com> wrote:
>>>
>>>> Hi Anuj,
>>>>
>>>> you should always avoid having records with different schemas in the
>>>> same topic/dataset. You will break the compatibility features of the schema
>>>> registry and your consumer/producer code is always hard to maintain.
>>>>
>>>> A common and scalable way to avoid it is to use some kind of envelope
>>>> format.
>>>>
>>>> {
>>>>   "namespace": "example",
>>>>   "name": "Envelope",
>>>>   "type": "record",
>>>>   "fields": [
>>>>     {
>>>>       "name": "type1",
>>>>       "type": ["null", {
>>>>         "type": "record",
>>>>         "fields": [ ... ]
>>>>       }],
>>>>       "default": null
>>>>     },
>>>>     {
>>>>       "name": "type2",
>>>>       "type": ["null", {
>>>>         "type": "record",
>>>>         "fields": [ ... ]
>>>>       }],
>>>>       "default": null
>>>>     }
>>>>   ]
>>>> }
>>>>
>>>> This envelope is evolvable (arbitrary addition/removal of wrapped
>>>> types, which by themselves can be evolved), and adds only a little overhead
>>>> (1 byte per subtype). The downside is that you cannot enforce that exactly
>>>> one of the subtypes is set.
>>>>
>>>> This schema is fully compatible with the schema registry, so no need to
>>>> parse anything manually.
>>>>
>>>> This schema can easily be used with Parquet. If you can't change the
>>>> input format anymore, you can at least use that approach on your output.
>>>>
>>>> Best,
>>>>
>>>> Arvid
>>>>
>>>> On Thu, Jan 16, 2020 at 2:53 PM aj <aj...@gmail.com> wrote:
>>>>
>>>>> Hi All,
>>>>>
>>>>> I have a use case where I am getting a different set of Avro records
>>>>> in Kafka. I am using the schema registry to store Avro schema. One topic
>>>>> can also have different types of records.
>>>>>
>>>>> Now I have created a GenericRecord Stream using kafkaAvroDeseralizer
>>>>> by defining custom
>>>>> Deserializer class like this
>>>>>
>>>>> @Override
>>>>> public GenericRecord deserialize(
>>>>> byte[] messageKey, byte[] message, String topic, int partition, long
>>>>> offset) {
>>>>> checkInitialized();
>>>>> return (GenericRecord) inner.deserialize(topic, message);
>>>>> }
>>>>>
>>>>> private void checkInitialized() {
>>>>> if (inner == null) {
>>>>> Map<String, Object> props = new HashMap<>();
>>>>> props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
>>>>> registryUrl);
>>>>> props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG,
>>>>> false);
>>>>> SchemaRegistryClient client =
>>>>> new CachedSchemaRegistryClient(
>>>>> registryUrl,
>>>>> AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT);
>>>>> inner = new KafkaAvroDeserializer(client, props);
>>>>> }
>>>>> }
>>>>>
>>>>>
>>>>> And this is my consumer code :
>>>>>
>>>>> DataStreamSource<GenericRecord> input = env
>>>>>         .addSource(
>>>>>                 new FlinkKafkaConsumer010<GenericRecord>(topics,
>>>>>                         new
>>>>> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
>>>>>                         config).setStartFromEarliest());
>>>>>
>>>>> Now I want to write this stream partition on
>>>>> *event_name="a"/year=/month=/day=* in parquet format so that I can
>>>>> expose hive tables directly on top of this data.
>>>>> event_name is common field for all types of records that I am getting
>>>>> in Kafka.
>>>>> I am stuck as parquet writer needs a schema to write but my different
>>>>> records have different schemas  So how do I write this stream in s3 in
>>>>> above partition format.
>>>>>
>>>>>
>>>>> Thanks & Regards,
>>>>> Anuj Jain
>>>>>
>>>>>
>>>>>
>>>>> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>>>>>
>>>>
>>>
>>> --
>>> Thanks & Regards,
>>> Anuj Jain
>>> Mob. : +91- 8588817877
>>> Skype : anuj.jain07
>>> <http://www.oracle.com/>
>>>
>>>
>>> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>>>
>>
>
> --
> Thanks & Regards,
> Anuj Jain
> Mob. : +91- 8588817877
> Skype : anuj.jain07
> <http://www.oracle.com/>
>
>
> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>

Re: Flink ParquetAvroWriters Sink

Posted by Arvid Heise <ar...@ververica.com>.
Hi Anuj,

I originally understood that you would like to store data in the same Kafka
topic and also want to store it in the same parquet file. In the past, I
mostly used schema registry with Kafka Streams, where you could only store
a schema for a key and value respectively. To use different record types in
the same kafka topic, you had to disable schema compatibility checks and
just stored the schemas as different versions under the same subject.

Your approach is much better. You can ensure full schema compatibility.
Nevertheless, it still shares the same drawback that consumption is much
harder (using GenericRecord is proof of that) if you want to read/write
everything into the same place. Also you will never be able to write one
consistent file, as they can only have one schema (both on Avro and
Parquet).
So you only have two options:
* keep schemas separated, but then you also need to write separate files
per record type.
* have a common schema (either my outlined approach or any other wrapper
schema).
The approach with a common schema makes only sense if you want to write it
into one table/kafka topic.

However, in the last mail you pointed out that you actually want to store
the record types separately. Then, you should keep everything separated.
Then you should have a sink for each type each getting the respective
schema. Note that you'd need to fetch the schema manually from the schema
registry when creating the query as you would need to pass it to the sink.

Btw, do you actually have a need to write all events into one Kafka topic?
The only real use case is to preserve the time order per key. Everything
else is much more complicated then storing events individually.

Best,

Arvid


On Thu, Jan 16, 2020 at 3:39 PM aj <aj...@gmail.com> wrote:

> Hi Arvid,
> Thanks for the quick response. I am new to this Avro design so can you
> please help me understand and design for my use case.
>
> I have use case like this :
>
> 1. we have an app where a lot of action happened from the user side.
> 2. for each action we collect some set of information that defined using
> some key-value pairs. This information we want to define as proper schemas
> so that we maintain the proper format and not push random data.
> 3. So we are defining for each action a schema and register in the schema
> registry using  topic+record.name as the subject .
> 4. So I do not think the producer side has any issue as whenever we push
> the event to Kafka we register a new schema with the above subject.
>
> Example :
>
> {
> event_name : "a"
> "timestamp":
> "properties"  :[
>   "key-1 : "val-1"
>    "key-2 : "val-2"
> ]
> }
>
> {
> event_name : "b"
> "timestamp":
> "properties"  :[
>   "key-3 : "val-3"
>    "key-4 : "val-4"
> ]
> }
>
> Now I  have a consumer that will parse the data by fetching the schema
> from schema registry and deserialize in the generic record streams.
>
> Why you think it will break as I am always deserializing with writer
> schema only.
>
> As you suggested to keep an envelope Avro schema and not separate schema
> for each type of event that I am generating. I have some doubts about that:
>
> 1. How I enforce a schema on each event as it subtypes in the main schema.
> so when I am getting a JSON event of type "a" how I enforce and convert it
> to subschema type of "a" and push to Kafka.
> 2. I want to create a separate hive table for each of the events so when I
> write this data and lets says I have 20 events than for 19 columns I am
> getting null values always in data.
>
> Please help me in doing this right way. It will be a great help and
> learning for me.
>
> Thanks,
> Anuj
>
>
>
>
>
>
>
> On Thu, Jan 16, 2020 at 7:30 PM Arvid Heise <ar...@ververica.com> wrote:
>
>> Hi Anuj,
>>
>> you should always avoid having records with different schemas in the same
>> topic/dataset. You will break the compatibility features of the schema
>> registry and your consumer/producer code is always hard to maintain.
>>
>> A common and scalable way to avoid it is to use some kind of envelope
>> format.
>>
>> {
>>   "namespace": "example",
>>   "name": "Envelope",
>>   "type": "record",
>>   "fields": [
>>     {
>>       "name": "type1",
>>       "type": ["null", {
>>         "type": "record",
>>         "fields": [ ... ]
>>       }],
>>       "default": null
>>     },
>>     {
>>       "name": "type2",
>>       "type": ["null", {
>>         "type": "record",
>>         "fields": [ ... ]
>>       }],
>>       "default": null
>>     }
>>   ]
>> }
>>
>> This envelope is evolvable (arbitrary addition/removal of wrapped types,
>> which by themselves can be evolved), and adds only a little overhead (1
>> byte per subtype). The downside is that you cannot enforce that exactly one
>> of the subtypes is set.
>>
>> This schema is fully compatible with the schema registry, so no need to
>> parse anything manually.
>>
>> This schema can easily be used with Parquet. If you can't change the
>> input format anymore, you can at least use that approach on your output.
>>
>> Best,
>>
>> Arvid
>>
>> On Thu, Jan 16, 2020 at 2:53 PM aj <aj...@gmail.com> wrote:
>>
>>> Hi All,
>>>
>>> I have a use case where I am getting a different set of Avro records in
>>> Kafka. I am using the schema registry to store Avro schema. One topic can
>>> also have different types of records.
>>>
>>> Now I have created a GenericRecord Stream using kafkaAvroDeseralizer by
>>> defining custom
>>> Deserializer class like this
>>>
>>> @Override
>>> public GenericRecord deserialize(
>>> byte[] messageKey, byte[] message, String topic, int partition, long
>>> offset) {
>>> checkInitialized();
>>> return (GenericRecord) inner.deserialize(topic, message);
>>> }
>>>
>>> private void checkInitialized() {
>>> if (inner == null) {
>>> Map<String, Object> props = new HashMap<>();
>>> props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
>>> registryUrl);
>>> props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG,
>>> false);
>>> SchemaRegistryClient client =
>>> new CachedSchemaRegistryClient(
>>> registryUrl,
>>> AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT);
>>> inner = new KafkaAvroDeserializer(client, props);
>>> }
>>> }
>>>
>>>
>>> And this is my consumer code :
>>>
>>> DataStreamSource<GenericRecord> input = env
>>>         .addSource(
>>>                 new FlinkKafkaConsumer010<GenericRecord>(topics,
>>>                         new
>>> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
>>>                         config).setStartFromEarliest());
>>>
>>> Now I want to write this stream partition on
>>> *event_name="a"/year=/month=/day=* in parquet format so that I can
>>> expose hive tables directly on top of this data.
>>> event_name is common field for all types of records that I am getting in
>>> Kafka.
>>> I am stuck as parquet writer needs a schema to write but my different
>>> records have different schemas  So how do I write this stream in s3 in
>>> above partition format.
>>>
>>>
>>> Thanks & Regards,
>>> Anuj Jain
>>>
>>>
>>>
>>> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>>>
>>
>
> --
> Thanks & Regards,
> Anuj Jain
> Mob. : +91- 8588817877
> Skype : anuj.jain07
> <http://www.oracle.com/>
>
>
> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>

Re: Flink ParquetAvroWriters Sink

Posted by aj <aj...@gmail.com>.
Hi Arvid,
Thanks for the quick response. I am new to this Avro design so can you
please help me understand and design for my use case.

I have use case like this :

1. we have an app where a lot of action happened from the user side.
2. for each action we collect some set of information that defined using
some key-value pairs. This information we want to define as proper schemas
so that we maintain the proper format and not push random data.
3. So we are defining for each action a schema and register in the schema
registry using  topic+record.name as the subject .
4. So I do not think the producer side has any issue as whenever we push
the event to Kafka we register a new schema with the above subject.

Example :

{
event_name : "a"
"timestamp":
"properties"  :[
  "key-1 : "val-1"
   "key-2 : "val-2"
]
}

{
event_name : "b"
"timestamp":
"properties"  :[
  "key-3 : "val-3"
   "key-4 : "val-4"
]
}

Now I  have a consumer that will parse the data by fetching the schema from
schema registry and deserialize in the generic record streams.

Why you think it will break as I am always deserializing with writer schema
only.

As you suggested to keep an envelope Avro schema and not separate schema
for each type of event that I am generating. I have some doubts about that:

1. How I enforce a schema on each event as it subtypes in the main schema.
so when I am getting a JSON event of type "a" how I enforce and convert it
to subschema type of "a" and push to Kafka.
2. I want to create a separate hive table for each of the events so when I
write this data and lets says I have 20 events than for 19 columns I am
getting null values always in data.

Please help me in doing this right way. It will be a great help and
learning for me.

Thanks,
Anuj







On Thu, Jan 16, 2020 at 7:30 PM Arvid Heise <ar...@ververica.com> wrote:

> Hi Anuj,
>
> you should always avoid having records with different schemas in the same
> topic/dataset. You will break the compatibility features of the schema
> registry and your consumer/producer code is always hard to maintain.
>
> A common and scalable way to avoid it is to use some kind of envelope
> format.
>
> {
>   "namespace": "example",
>   "name": "Envelope",
>   "type": "record",
>   "fields": [
>     {
>       "name": "type1",
>       "type": ["null", {
>         "type": "record",
>         "fields": [ ... ]
>       }],
>       "default": null
>     },
>     {
>       "name": "type2",
>       "type": ["null", {
>         "type": "record",
>         "fields": [ ... ]
>       }],
>       "default": null
>     }
>   ]
> }
>
> This envelope is evolvable (arbitrary addition/removal of wrapped types,
> which by themselves can be evolved), and adds only a little overhead (1
> byte per subtype). The downside is that you cannot enforce that exactly one
> of the subtypes is set.
>
> This schema is fully compatible with the schema registry, so no need to
> parse anything manually.
>
> This schema can easily be used with Parquet. If you can't change the input
> format anymore, you can at least use that approach on your output.
>
> Best,
>
> Arvid
>
> On Thu, Jan 16, 2020 at 2:53 PM aj <aj...@gmail.com> wrote:
>
>> Hi All,
>>
>> I have a use case where I am getting a different set of Avro records in
>> Kafka. I am using the schema registry to store Avro schema. One topic can
>> also have different types of records.
>>
>> Now I have created a GenericRecord Stream using kafkaAvroDeseralizer by
>> defining custom
>> Deserializer class like this
>>
>> @Override
>> public GenericRecord deserialize(
>> byte[] messageKey, byte[] message, String topic, int partition, long
>> offset) {
>> checkInitialized();
>> return (GenericRecord) inner.deserialize(topic, message);
>> }
>>
>> private void checkInitialized() {
>> if (inner == null) {
>> Map<String, Object> props = new HashMap<>();
>> props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
>> registryUrl);
>> props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, false);
>> SchemaRegistryClient client =
>> new CachedSchemaRegistryClient(
>> registryUrl,
>> AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT);
>> inner = new KafkaAvroDeserializer(client, props);
>> }
>> }
>>
>>
>> And this is my consumer code :
>>
>> DataStreamSource<GenericRecord> input = env
>>         .addSource(
>>                 new FlinkKafkaConsumer010<GenericRecord>(topics,
>>                         new
>> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
>>                         config).setStartFromEarliest());
>>
>> Now I want to write this stream partition on
>> *event_name="a"/year=/month=/day=* in parquet format so that I can
>> expose hive tables directly on top of this data.
>> event_name is common field for all types of records that I am getting in
>> Kafka.
>> I am stuck as parquet writer needs a schema to write but my different
>> records have different schemas  So how do I write this stream in s3 in
>> above partition format.
>>
>>
>> Thanks & Regards,
>> Anuj Jain
>>
>>
>>
>> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>>
>

-- 
Thanks & Regards,
Anuj Jain
Mob. : +91- 8588817877
Skype : anuj.jain07
<http://www.oracle.com/>


<http://www.cse.iitm.ac.in/%7Eanujjain/>

Re: Flink ParquetAvroWriters Sink

Posted by Arvid Heise <ar...@ververica.com>.
Hi Anuj,

you should always avoid having records with different schemas in the same
topic/dataset. You will break the compatibility features of the schema
registry and your consumer/producer code is always hard to maintain.

A common and scalable way to avoid it is to use some kind of envelope
format.

{
  "namespace": "example",
  "name": "Envelope",
  "type": "record",
  "fields": [
    {
      "name": "type1",
      "type": ["null", {
        "type": "record",
        "fields": [ ... ]
      }],
      "default": null
    },
    {
      "name": "type2",
      "type": ["null", {
        "type": "record",
        "fields": [ ... ]
      }],
      "default": null
    }
  ]
}

This envelope is evolvable (arbitrary addition/removal of wrapped types,
which by themselves can be evolved), and adds only a little overhead (1
byte per subtype). The downside is that you cannot enforce that exactly one
of the subtypes is set.

This schema is fully compatible with the schema registry, so no need to
parse anything manually.

This schema can easily be used with Parquet. If you can't change the input
format anymore, you can at least use that approach on your output.

Best,

Arvid

On Thu, Jan 16, 2020 at 2:53 PM aj <aj...@gmail.com> wrote:

> Hi All,
>
> I have a use case where I am getting a different set of Avro records in
> Kafka. I am using the schema registry to store Avro schema. One topic can
> also have different types of records.
>
> Now I have created a GenericRecord Stream using kafkaAvroDeseralizer by
> defining custom
> Deserializer class like this
>
> @Override
> public GenericRecord deserialize(
> byte[] messageKey, byte[] message, String topic, int partition, long
> offset) {
> checkInitialized();
> return (GenericRecord) inner.deserialize(topic, message);
> }
>
> private void checkInitialized() {
> if (inner == null) {
> Map<String, Object> props = new HashMap<>();
> props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
> registryUrl);
> props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, false);
> SchemaRegistryClient client =
> new CachedSchemaRegistryClient(
> registryUrl, AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT);
> inner = new KafkaAvroDeserializer(client, props);
> }
> }
>
>
> And this is my consumer code :
>
> DataStreamSource<GenericRecord> input = env
>         .addSource(
>                 new FlinkKafkaConsumer010<GenericRecord>(topics,
>                         new
> KafkaGenericAvroDeserializationSchema(schemaRegistryUrl),
>                         config).setStartFromEarliest());
>
> Now I want to write this stream partition on
> *event_name="a"/year=/month=/day=* in parquet format so that I can expose
> hive tables directly on top of this data.
> event_name is common field for all types of records that I am getting in
> Kafka.
> I am stuck as parquet writer needs a schema to write but my different
> records have different schemas  So how do I write this stream in s3 in
> above partition format.
>
>
> Thanks & Regards,
> Anuj Jain
>
>
>
> <http://www.cse.iitm.ac.in/%7Eanujjain/>
>