You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Steve Whelan <sw...@jwplayer.com> on 2020/02/04 06:13:22 UTC

Performance issue with RegistryAvroSerializationSchema

Hi,

I'm running Flink v1.9. I backported the commit adding serialization
support for Confluent's schema registry[1]. Using the code as is, I saw a
nearly 50% drop in peak throughput for my job compared to using
*AvroRowSerializationSchema*.

Looking at the code, *RegistryAvroSerializationSchema.serialize()* executes:


public byte[] serialize(T object) {
  checkAvroInitialized();

  if (object == null) {
    return null;
  } else {
    try {
      Encoder encoder = getEncoder();

*schemaCoderProvider.get()        .writeSchema(getSchema(),
getOutputStream());*
      getDatumWriter().write(object, encoder);
      encoder.flush();
      byte[] bytes = getOutputStream().toByteArray();
      getOutputStream().reset();
      return bytes;
    } catch (IOException e) {
      throw new WrappingRuntimeException("Failed to serialize schema
registry.", e);
    }
  }
}


For every single message. *ConfluentSchemaRegistryCoder.writeSchema()* attempts
to register the schema.


public void writeSchema(Schema schema, OutputStream out) throws IOException
{
  try {
*    int registeredId = schemaRegistryClient.register(subject, schema);*
    out.write(CONFLUENT_MAGIC_BYTE);
    byte[] schemaIdBytes =
ByteBuffer.allocate(4).putInt(registeredId).array();
    out.write(schemaIdBytes);
  } catch (RestClientException e) {
    throw new IOException("Could not register schema in registry", e);
  }
}


It's making an HTTP request to the Schema Registry for every single
message. Since the output schema does not change over the course of a
streaming job, it seems you should only need to register the schema once.

I moved the schema registration call into
*RegistryAvroSerializationSchema.checkAvroInitialized()* and added a helper
function to add the magic byte and schema id bytes to be called from
*RegistryAvroSerializationSchema.serialize()*. After this change, the jobs
performance returned to comparable levels to using
*AvroRowSerializationSchema.*

Am I right in thinking this was perhaps a design flaw and not intentionally
done?


[1]
https://github.com/apache/flink/commit/37a818ce8714adf14153587bf99c0900e5af42b7#diff-c60754c7ce564f4229c22913d783c339

Re: Performance issue with RegistryAvroSerializationSchema

Posted by Robert Metzger <rm...@apache.org>.
Steve,
thanks a lot for looking into this closer!

Let's discuss the resolution of the issue in the ticket Dawid has created:
https://issues.apache.org/jira/browse/FLINK-15941

Best,
Robert

On Thu, Feb 6, 2020 at 6:59 PM Steve Whelan <sw...@jwplayer.com> wrote:

> Robert,
>
> You are correct that it is using a *CachedSchemaRegistryClient* object.
> Therefore, *schemaRegistryClient.*register() should be checking the cache
> first before sending a request to the Registry. However, turning on debug
> logging of my Registry, I can see a request being sent for every serialized
> message. Therefore, this meant the cache in *schemaRegistryClient.*register()
> was empty.
>
> By adding some more debug logging, I think I found the issue within
> *RegistryAvroSerializationSchema.serialize():*
>
> public byte[] serialize(T object) {
>   checkAvroInitialized();
>
>   if (object == null) {
>     return null;
>   } else {
>     try {
>       Encoder encoder = getEncoder();
>
> *schemaCoderProvider.get()        .writeSchema(getSchema(),
> getOutputStream());   // get()  *Creates a new instance of {@link
> SchemaCoder}[1]
>       getDatumWriter().write(object, encoder);
>       encoder.flush();
>       byte[] bytes = getOutputStream().toByteArray();
>       getOutputStream().reset();
>       return bytes;
>     } catch (IOException e) {
>       throw new WrappingRuntimeException("Failed to serialize schema
> registry.", e);
>     }
>   }
> }
>
>
> This *schemaCoderProvider*.get() call is creating a new instance of
> *SchemaCoder* every time, instead of using the one that was instantiated
> inside *RegistryAvroSerializationSchema.*checkAvroInitialized(). This
> means, we get an object with a new cache every time (i.e. its empty and
> *schemaRegistryClient.*register() falls back to an HTTP request to the
> Registry).
>
> Simply changing the above line to:
>
> schemaCoder.writeSchema(getSchema(), getOutputStream());
>
>
> solved the issue. Since *RegistryAvroSerializationSchema.*checkAvroInitialized()
> is called first inside *RegistryAvroSerializationSchema*.serializer(), we
> do not have to worry about the *schemaCoder* object being null. Happy to
> open a PR for the ticket created if this makes sense.
>
>
> [1]
> https://github.com/apache/flink/blob/37a818ce8714adf14153587bf99c0900e5af42b7/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/SchemaCoder.java
>
>
> On Thu, Feb 6, 2020 at 10:38 AM Dawid Wysakowicz <dw...@apache.org>
> wrote:
>
>> Hi Steve,
>>
>> I think your observation is correct. If I am not mistaken we should use *schemaRegistryClient.getId(subject,
>> schema); *instead of
>> *schemaRegistryClient.register(subject, schema);. *The former should
>> perform an http request only if the schema is not in the cache.
>>
>> I created an issue to track it
>> https://issues.apache.org/jira/browse/FLINK-15941
>>
>> Would you maybe like to check it and prepare a fix for it ;) ?
>>
>> Best,
>>
>> Dawid
>>
>>
>> On 06/02/2020 16:11, Robert Metzger wrote:
>>
>> Hi,
>> thanks a lot for your message. It's certainly not intentional to do a
>> HTTP request for every single message :)
>>
>> Isn't the *schemaRegistryClient *an instance of
>> CachedSchemaRegistryClient, which, as the name says, caches?
>> Can you check with a debugger at runtime what registry client is used,
>> and if there are indeed no cache hits?
>> Alternatively, you could check the log of the schema registry service.
>>
>> Best,
>> Robert
>>
>>
>> On Tue, Feb 4, 2020 at 7:13 AM Steve Whelan <sw...@jwplayer.com> wrote:
>>
>>> Hi,
>>>
>>> I'm running Flink v1.9. I backported the commit adding serialization
>>> support for Confluent's schema registry[1]. Using the code as is, I saw a
>>> nearly 50% drop in peak throughput for my job compared to using
>>> *AvroRowSerializationSchema*.
>>>
>>> Looking at the code, *RegistryAvroSerializationSchema.serialize()*
>>>  executes:
>>>
>>>
>>> public byte[] serialize(T object) {
>>>   checkAvroInitialized();
>>>
>>>   if (object == null) {
>>>     return null;
>>>   } else {
>>>     try {
>>>       Encoder encoder = getEncoder();
>>>
>>> *schemaCoderProvider.get()         .writeSchema(getSchema(),
>>> getOutputStream());*
>>>       getDatumWriter().write(object, encoder);
>>>       encoder.flush();
>>>       byte[] bytes = getOutputStream().toByteArray();
>>>       getOutputStream().reset();
>>>       return bytes;
>>>     } catch (IOException e) {
>>>       throw new WrappingRuntimeException("Failed to serialize schema
>>> registry.", e);
>>>     }
>>>   }
>>> }
>>>
>>>
>>> For every single message. *ConfluentSchemaRegistryCoder.writeSchema()* attempts
>>> to register the schema.
>>>
>>>
>>> public void writeSchema(Schema schema, OutputStream out) throws
>>> IOException {
>>>   try {
>>> *    int registeredId = schemaRegistryClient.register(subject, schema);*
>>>     out.write(CONFLUENT_MAGIC_BYTE);
>>>     byte[] schemaIdBytes =
>>> ByteBuffer.allocate(4).putInt(registeredId).array();
>>>     out.write(schemaIdBytes);
>>>   } catch (RestClientException e) {
>>>     throw new IOException("Could not register schema in registry", e);
>>>   }
>>> }
>>>
>>>
>>> It's making an HTTP request to the Schema Registry for every single
>>> message. Since the output schema does not change over the course of a
>>> streaming job, it seems you should only need to register the schema once.
>>>
>>> I moved the schema registration call into
>>> *RegistryAvroSerializationSchema.checkAvroInitialized()* and added a
>>> helper function to add the magic byte and schema id bytes to be called from
>>> *RegistryAvroSerializationSchema.serialize()*. After this change, the
>>> jobs performance returned to comparable levels to using
>>> *AvroRowSerializationSchema.*
>>>
>>> Am I right in thinking this was perhaps a design flaw and not
>>> intentionally done?
>>>
>>>
>>> [1]
>>> https://github.com/apache/flink/commit/37a818ce8714adf14153587bf99c0900e5af42b7#diff-c60754c7ce564f4229c22913d783c339
>>>
>>

Re: Performance issue with RegistryAvroSerializationSchema

Posted by Steve Whelan <sw...@jwplayer.com>.
Robert,

You are correct that it is using a *CachedSchemaRegistryClient* object.
Therefore, *schemaRegistryClient.*register() should be checking the cache
first before sending a request to the Registry. However, turning on debug
logging of my Registry, I can see a request being sent for every serialized
message. Therefore, this meant the cache in *schemaRegistryClient.*register()
was empty.

By adding some more debug logging, I think I found the issue within
*RegistryAvroSerializationSchema.serialize():*

public byte[] serialize(T object) {
  checkAvroInitialized();

  if (object == null) {
    return null;
  } else {
    try {
      Encoder encoder = getEncoder();

*schemaCoderProvider.get()        .writeSchema(getSchema(),
getOutputStream());   // get()  *Creates a new instance of {@link
SchemaCoder}[1]
      getDatumWriter().write(object, encoder);
      encoder.flush();
      byte[] bytes = getOutputStream().toByteArray();
      getOutputStream().reset();
      return bytes;
    } catch (IOException e) {
      throw new WrappingRuntimeException("Failed to serialize schema
registry.", e);
    }
  }
}


This *schemaCoderProvider*.get() call is creating a new instance of
*SchemaCoder* every time, instead of using the one that was instantiated
inside *RegistryAvroSerializationSchema.*checkAvroInitialized(). This
means, we get an object with a new cache every time (i.e. its empty and
*schemaRegistryClient.*register() falls back to an HTTP request to the
Registry).

Simply changing the above line to:

schemaCoder.writeSchema(getSchema(), getOutputStream());


solved the issue. Since
*RegistryAvroSerializationSchema.*checkAvroInitialized()
is called first inside *RegistryAvroSerializationSchema*.serializer(), we
do not have to worry about the *schemaCoder* object being null. Happy to
open a PR for the ticket created if this makes sense.


[1]
https://github.com/apache/flink/blob/37a818ce8714adf14153587bf99c0900e5af42b7/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/SchemaCoder.java


On Thu, Feb 6, 2020 at 10:38 AM Dawid Wysakowicz <dw...@apache.org>
wrote:

> Hi Steve,
>
> I think your observation is correct. If I am not mistaken we should use *schemaRegistryClient.getId(subject,
> schema); *instead of
> *schemaRegistryClient.register(subject, schema);. *The former should
> perform an http request only if the schema is not in the cache.
>
> I created an issue to track it
> https://issues.apache.org/jira/browse/FLINK-15941
>
> Would you maybe like to check it and prepare a fix for it ;) ?
>
> Best,
>
> Dawid
>
>
> On 06/02/2020 16:11, Robert Metzger wrote:
>
> Hi,
> thanks a lot for your message. It's certainly not intentional to do a HTTP
> request for every single message :)
>
> Isn't the *schemaRegistryClient *an instance of CachedSchemaRegistryClient,
> which, as the name says, caches?
> Can you check with a debugger at runtime what registry client is used, and
> if there are indeed no cache hits?
> Alternatively, you could check the log of the schema registry service.
>
> Best,
> Robert
>
>
> On Tue, Feb 4, 2020 at 7:13 AM Steve Whelan <sw...@jwplayer.com> wrote:
>
>> Hi,
>>
>> I'm running Flink v1.9. I backported the commit adding serialization
>> support for Confluent's schema registry[1]. Using the code as is, I saw a
>> nearly 50% drop in peak throughput for my job compared to using
>> *AvroRowSerializationSchema*.
>>
>> Looking at the code, *RegistryAvroSerializationSchema.serialize()*
>>  executes:
>>
>>
>> public byte[] serialize(T object) {
>>   checkAvroInitialized();
>>
>>   if (object == null) {
>>     return null;
>>   } else {
>>     try {
>>       Encoder encoder = getEncoder();
>>
>> *schemaCoderProvider.get()         .writeSchema(getSchema(),
>> getOutputStream());*
>>       getDatumWriter().write(object, encoder);
>>       encoder.flush();
>>       byte[] bytes = getOutputStream().toByteArray();
>>       getOutputStream().reset();
>>       return bytes;
>>     } catch (IOException e) {
>>       throw new WrappingRuntimeException("Failed to serialize schema
>> registry.", e);
>>     }
>>   }
>> }
>>
>>
>> For every single message. *ConfluentSchemaRegistryCoder.writeSchema()* attempts
>> to register the schema.
>>
>>
>> public void writeSchema(Schema schema, OutputStream out) throws
>> IOException {
>>   try {
>> *    int registeredId = schemaRegistryClient.register(subject, schema);*
>>     out.write(CONFLUENT_MAGIC_BYTE);
>>     byte[] schemaIdBytes =
>> ByteBuffer.allocate(4).putInt(registeredId).array();
>>     out.write(schemaIdBytes);
>>   } catch (RestClientException e) {
>>     throw new IOException("Could not register schema in registry", e);
>>   }
>> }
>>
>>
>> It's making an HTTP request to the Schema Registry for every single
>> message. Since the output schema does not change over the course of a
>> streaming job, it seems you should only need to register the schema once.
>>
>> I moved the schema registration call into
>> *RegistryAvroSerializationSchema.checkAvroInitialized()* and added a
>> helper function to add the magic byte and schema id bytes to be called from
>> *RegistryAvroSerializationSchema.serialize()*. After this change, the
>> jobs performance returned to comparable levels to using
>> *AvroRowSerializationSchema.*
>>
>> Am I right in thinking this was perhaps a design flaw and not
>> intentionally done?
>>
>>
>> [1]
>> https://github.com/apache/flink/commit/37a818ce8714adf14153587bf99c0900e5af42b7#diff-c60754c7ce564f4229c22913d783c339
>>
>

Re: Performance issue with RegistryAvroSerializationSchema

Posted by Dawid Wysakowicz <dw...@apache.org>.
Hi Steve,

I think your observation is correct. If I am not mistaken we should use
*schemaRegistryClient.getId(subject, schema); *instead of**
**schemaRegistryClient.register(subject, schema);. **The former should
perform an http request only if the schema is not in the cache.

I created an issue to track it
https://issues.apache.org/jira/browse/FLINK-15941

Would you maybe like to check it and prepare a fix for it ;) ?

Best,

Dawid


****

On 06/02/2020 16:11, Robert Metzger wrote:
> Hi,
> thanks a lot for your message. It's certainly not intentional to do a
> HTTP request for every single message :)
>
> Isn't the *schemaRegistryClient *an instance
> of CachedSchemaRegistryClient, which, as the name says, caches?
> Can you check with a debugger at runtime what registry client is used,
> and if there are indeed no cache hits?
> Alternatively, you could check the log of the schema registry service.
>
> Best,
> Robert 
>
>
> On Tue, Feb 4, 2020 at 7:13 AM Steve Whelan <swhelan@jwplayer.com
> <ma...@jwplayer.com>> wrote:
>
>     Hi,
>
>     I'm running Flink v1.9. I backported the commit adding
>     serialization support for Confluent's schema registry[1]. Using
>     the code as is, I saw a nearly 50% drop in peak throughput for my
>     job compared to using /AvroRowSerializationSchema/.
>
>     Looking at the
>     code, /RegistryAvroSerializationSchema.serialize()/ executes:
>
>
>     public byte[] serialize(T object) {
>       checkAvroInitialized();
>
>       if (object == null) {
>         return null;
>       } else {
>         try {
>           Encoder encoder = getEncoder();
>           *schemaCoderProvider.get()
>             .writeSchema(getSchema(), getOutputStream());*
>           getDatumWriter().write(object, encoder); 
>           encoder.flush();
>           byte[] bytes = getOutputStream().toByteArray();
>           getOutputStream().reset();
>           return bytes;
>         } catch (IOException e) {
>           throw new WrappingRuntimeException("Failed to serialize
>     schema registry.", e);
>         }
>       }
>     }
>
>
>     For every single
>     message. /ConfluentSchemaRegistryCoder.writeSchema()/ attempts to
>     register the schema.
>
>
>     public void writeSchema(Schema schema, OutputStream out) throws
>     IOException {
>       try {
>     *    int registeredId = schemaRegistryClient.register(subject,
>     schema);*
>         out.write(CONFLUENT_MAGIC_BYTE);
>         byte[] schemaIdBytes =
>     ByteBuffer.allocate(4).putInt(registeredId).array();
>         out.write(schemaIdBytes);
>       } catch (RestClientException e) {
>         throw new IOException("Could not register schema in registry", e);
>       }
>     }
>
>
>     It's making an HTTP request to the Schema Registry for every
>     single message. Since the output schema does not change over the
>     course of a streaming job, it seems you should only need to
>     register the schema once. 
>
>     I moved the schema registration call
>     into /RegistryAvroSerializationSchema.checkAvroInitialized()/ and
>     added a helper function to add the magic byte and schema id bytes
>     to be called from /RegistryAvroSerializationSchema.serialize()/.
>     After this change, the jobs performance returned to comparable
>     levels to using /AvroRowSerializationSchema./
>
>     Am I right in thinking this was perhaps a design flaw and not
>     intentionally done?
>
>
>     [1] https://github.com/apache/flink/commit/37a818ce8714adf14153587bf99c0900e5af42b7#diff-c60754c7ce564f4229c22913d783c339
>

Re: Performance issue with RegistryAvroSerializationSchema

Posted by Robert Metzger <rm...@apache.org>.
Hi,
thanks a lot for your message. It's certainly not intentional to do a HTTP
request for every single message :)

Isn't the *schemaRegistryClient *an instance of CachedSchemaRegistryClient,
which, as the name says, caches?
Can you check with a debugger at runtime what registry client is used, and
if there are indeed no cache hits?
Alternatively, you could check the log of the schema registry service.

Best,
Robert


On Tue, Feb 4, 2020 at 7:13 AM Steve Whelan <sw...@jwplayer.com> wrote:

> Hi,
>
> I'm running Flink v1.9. I backported the commit adding serialization
> support for Confluent's schema registry[1]. Using the code as is, I saw a
> nearly 50% drop in peak throughput for my job compared to using
> *AvroRowSerializationSchema*.
>
> Looking at the code, *RegistryAvroSerializationSchema.serialize()*
>  executes:
>
>
> public byte[] serialize(T object) {
>   checkAvroInitialized();
>
>   if (object == null) {
>     return null;
>   } else {
>     try {
>       Encoder encoder = getEncoder();
>
> *schemaCoderProvider.get()        .writeSchema(getSchema(),
> getOutputStream());*
>       getDatumWriter().write(object, encoder);
>       encoder.flush();
>       byte[] bytes = getOutputStream().toByteArray();
>       getOutputStream().reset();
>       return bytes;
>     } catch (IOException e) {
>       throw new WrappingRuntimeException("Failed to serialize schema
> registry.", e);
>     }
>   }
> }
>
>
> For every single message. *ConfluentSchemaRegistryCoder.writeSchema()* attempts
> to register the schema.
>
>
> public void writeSchema(Schema schema, OutputStream out) throws
> IOException {
>   try {
> *    int registeredId = schemaRegistryClient.register(subject, schema);*
>     out.write(CONFLUENT_MAGIC_BYTE);
>     byte[] schemaIdBytes =
> ByteBuffer.allocate(4).putInt(registeredId).array();
>     out.write(schemaIdBytes);
>   } catch (RestClientException e) {
>     throw new IOException("Could not register schema in registry", e);
>   }
> }
>
>
> It's making an HTTP request to the Schema Registry for every single
> message. Since the output schema does not change over the course of a
> streaming job, it seems you should only need to register the schema once.
>
> I moved the schema registration call into
> *RegistryAvroSerializationSchema.checkAvroInitialized()* and added a
> helper function to add the magic byte and schema id bytes to be called from
> *RegistryAvroSerializationSchema.serialize()*. After this change, the
> jobs performance returned to comparable levels to using
> *AvroRowSerializationSchema.*
>
> Am I right in thinking this was perhaps a design flaw and not
> intentionally done?
>
>
> [1]
> https://github.com/apache/flink/commit/37a818ce8714adf14153587bf99c0900e5af42b7#diff-c60754c7ce564f4229c22913d783c339
>