You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Ramana <ra...@gmail.com> on 2022/07/21 11:13:07 UTC

Decompressing RMQ streaming messages

Hi - We have a requirement to read the compressed messages emitting out of
RabbitMQ and to have them processed using PyFlink. However, I am not
finding any out of the box functionality in PyFlink which can help
decompress the messages.

Could anybody help me with an example of how to go about this?

Appreciate any help here.

Thanks

~Venkat

Re: Decompressing RMQ streaming messages

Posted by Francis Conroy <fr...@switchdin.com>.
Hi Venkat,

I guess you're using another compression algorithm which isn't zlib, you'll
have to adapt the code to work with your algorithm of choice.

Kind regards,
Francis

On Fri, 22 Jul 2022 at 17:27, Ramana <ra...@gmail.com> wrote:

> Hi Francis - Thanks for the snippet. I tried using the same, however I get
> an error.
>
> Following is the error -
>
> java.util.zip.DataFormatException: incorrect header check.
>
> I see multiple errors, i beleive for every message i am seeing this stack
> trace?
>
> Any idea as to what could be causing this?
>
> Thanks
> Venkat
>
> On Fri, Jul 22, 2022, 06:05 Francis Conroy <fr...@switchdin.com>
> wrote:
>
>> Hi Venkat,
>>
>> there's nothing that I know of, but I've written a zlib decompressor for
>> our payloads which was pretty straightforward.
>>
>> public class ZlibDeserializationSchema extends AbstractDeserializationSchema<byte[]> {
>>     @Override
>>     public byte[] deserialize(byte[] message) throws IOException {
>>         Inflater decompressor = new Inflater();
>>         ByteArrayOutputStream bos = new ByteArrayOutputStream();
>>         decompressor.setInput(message);
>>         byte[] buffer = new byte[1024];
>>         int len=0;
>>         do {
>>             try {
>>                 len = decompressor.inflate(buffer);
>>             } catch (DataFormatException e) {
>>                 e.printStackTrace();
>>             }
>>             bos.write(buffer, 0, len);
>>         } while (len > 0);
>>         decompressor.end();
>>         bos.close();
>>         return bos.toByteArray();
>>     }
>> }
>>
>> hope that helps.
>>
>> On Thu, 21 Jul 2022 at 21:13, Ramana <ra...@gmail.com> wrote:
>>
>>> Hi - We have a requirement to read the compressed messages emitting out
>>> of RabbitMQ and to have them processed using PyFlink. However, I am not
>>> finding any out of the box functionality in PyFlink which can help
>>> decompress the messages.
>>>
>>> Could anybody help me with an example of how to go about this?
>>>
>>> Appreciate any help here.
>>>
>>> Thanks
>>>
>>> ~Venkat
>>>
>>>
>> This email and any attachments are proprietary and confidential and are
>> intended solely for the use of the individual to whom it is addressed. Any
>> views or opinions expressed are solely those of the author and do not
>> necessarily reflect or represent those of SwitchDin Pty Ltd. If you have
>> received this email in error, please let us know immediately by reply email
>> and delete it from your system. You may not use, disseminate, distribute or
>> copy this message nor disclose its contents to anyone.
>> SwitchDin Pty Ltd (ABN 29 154893857) PO Box 1165, Newcastle NSW 2300
>> Australia
>>
>

-- 
This email and any attachments are proprietary and confidential and are 
intended solely for the use of the individual to whom it is addressed. Any 
views or opinions expressed are solely those of the author and do not 
necessarily reflect or represent those of SwitchDin Pty Ltd. If you have 
received this email in error, please let us know immediately by reply email 
and delete it from your system. You may not use, disseminate, distribute or 
copy this message nor disclose its contents to anyone. 
SwitchDin Pty Ltd 
(ABN 29 154893857) PO Box 1165, Newcastle NSW 2300 Australia

Re: Decompressing RMQ streaming messages

Posted by Ramana <ra...@gmail.com>.
Hi Francis - Thanks for the snippet. I tried using the same, however I get
an error.

Following is the error -

java.util.zip.DataFormatException: incorrect header check.

I see multiple errors, i beleive for every message i am seeing this stack
trace?

Any idea as to what could be causing this?

Thanks
Venkat

On Fri, Jul 22, 2022, 06:05 Francis Conroy <fr...@switchdin.com>
wrote:

> Hi Venkat,
>
> there's nothing that I know of, but I've written a zlib decompressor for
> our payloads which was pretty straightforward.
>
> public class ZlibDeserializationSchema extends AbstractDeserializationSchema<byte[]> {
>     @Override
>     public byte[] deserialize(byte[] message) throws IOException {
>         Inflater decompressor = new Inflater();
>         ByteArrayOutputStream bos = new ByteArrayOutputStream();
>         decompressor.setInput(message);
>         byte[] buffer = new byte[1024];
>         int len=0;
>         do {
>             try {
>                 len = decompressor.inflate(buffer);
>             } catch (DataFormatException e) {
>                 e.printStackTrace();
>             }
>             bos.write(buffer, 0, len);
>         } while (len > 0);
>         decompressor.end();
>         bos.close();
>         return bos.toByteArray();
>     }
> }
>
> hope that helps.
>
> On Thu, 21 Jul 2022 at 21:13, Ramana <ra...@gmail.com> wrote:
>
>> Hi - We have a requirement to read the compressed messages emitting out
>> of RabbitMQ and to have them processed using PyFlink. However, I am not
>> finding any out of the box functionality in PyFlink which can help
>> decompress the messages.
>>
>> Could anybody help me with an example of how to go about this?
>>
>> Appreciate any help here.
>>
>> Thanks
>>
>> ~Venkat
>>
>>
> This email and any attachments are proprietary and confidential and are
> intended solely for the use of the individual to whom it is addressed. Any
> views or opinions expressed are solely those of the author and do not
> necessarily reflect or represent those of SwitchDin Pty Ltd. If you have
> received this email in error, please let us know immediately by reply email
> and delete it from your system. You may not use, disseminate, distribute or
> copy this message nor disclose its contents to anyone.
> SwitchDin Pty Ltd (ABN 29 154893857) PO Box 1165, Newcastle NSW 2300
> Australia
>

Re: Decompressing RMQ streaming messages

Posted by Francis Conroy <fr...@switchdin.com>.
Hi Venkat,

there's nothing that I know of, but I've written a zlib decompressor for
our payloads which was pretty straightforward.

public class ZlibDeserializationSchema extends
AbstractDeserializationSchema<byte[]> {
    @Override
    public byte[] deserialize(byte[] message) throws IOException {
        Inflater decompressor = new Inflater();
        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        decompressor.setInput(message);
        byte[] buffer = new byte[1024];
        int len=0;
        do {
            try {
                len = decompressor.inflate(buffer);
            } catch (DataFormatException e) {
                e.printStackTrace();
            }
            bos.write(buffer, 0, len);
        } while (len > 0);
        decompressor.end();
        bos.close();
        return bos.toByteArray();
    }
}

hope that helps.

On Thu, 21 Jul 2022 at 21:13, Ramana <ra...@gmail.com> wrote:

> Hi - We have a requirement to read the compressed messages emitting out of
> RabbitMQ and to have them processed using PyFlink. However, I am not
> finding any out of the box functionality in PyFlink which can help
> decompress the messages.
>
> Could anybody help me with an example of how to go about this?
>
> Appreciate any help here.
>
> Thanks
>
> ~Venkat
>
>

-- 
This email and any attachments are proprietary and confidential and are 
intended solely for the use of the individual to whom it is addressed. Any 
views or opinions expressed are solely those of the author and do not 
necessarily reflect or represent those of SwitchDin Pty Ltd. If you have 
received this email in error, please let us know immediately by reply email 
and delete it from your system. You may not use, disseminate, distribute or 
copy this message nor disclose its contents to anyone. 
SwitchDin Pty Ltd 
(ABN 29 154893857) PO Box 1165, Newcastle NSW 2300 Australia