You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Loren Abigail Sion <si...@gmail.com> on 2021/03/17 15:17:25 UTC

Inquiry about usage of Kafka Compression

Good day,


We're currently in the process of implementing our application with kafka
compression type ZStandard (zstd).

However during the testing process the consumer encountered this error:

 [ERROR] (consumer-1)
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer
   - Container exception
org.apache.kafka.common.KafkaException: Received exception when fetching
the next record from dp.-------------------. If needed, please seek past
the record to continue consumption.
        at
org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1228)
        at
org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1400(Fetcher.java:1096)
        at
org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:544)
        at
org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:505)
        at
org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1225)
        at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1188)
        at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1123)
        at
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:532)
        at
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at
java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.kafka.common.KafkaException:
java.lang.ExceptionInInitializerError: Cannot unpack libzstd-jni

*Here's the version for the producer and consumer:*

Producer Kafka Client Version (using ZStandard compression): 2.5.1
Consumer Kafka Client Version: 2.1.0

Could you help us identify what caused this error? Do we need to upgrade
the version on the consumer side?


Best Regards,

Loren Sion

Re: Inquiry about usage of Kafka Compression

Posted by Loren Abigail Sion <si...@gmail.com>.
Hi Dongjin,


Thank you for your very informative reply!

Here's the answer to your questions:


   - What is the platform you are using?
      - We're currently using the following: Linux 4.4.0-89-generic
      #112-Ubuntu


   - Do you have enough space or permission for the temp directory?
      - Yes our application has permission and we have an approximate
      available space of 7.7gb inside the temp directory.


   - The error occurs repeatedly?
      - This error has only occurred once so far and we were not able to
      replicate this issue.




Best regards,

Loren Sion




On Thu, Mar 18, 2021 at 12:19 PM Dongjin Lee <do...@apache.org> wrote:

> Hi Loren,
>
>
> This error occurs when your application fails to load the libzstd-jni
> library file from the temporary library.
>
>
>
>    - zstd-jni v1.3.5-4 (included in Kafka 2.1.0):
>
> https://github.com/luben/zstd-jni/blob/v1.3.5-4/src/main/java/com/github/luben/zstd/util/Native.java#L101
>    - zstd-jni lastest:
>
> https://github.com/luben/zstd-jni/blob/master/src/main/java/com/github/luben/zstd/util/Native.java#L137
>
>
> In short, zstd-jni works like the following:
>
>
>
>    1. It includes all supported platform's shared library in its jar.
>    2. When initializing, it copies the appropriate library into the temp
>    directory.
>    3. Load the extracted library into the memory.
>
>
> So, It would be good to check:
>
>
>
>    1. What is the platform you are using?
>    2. Do you have enough space or permission for the temp directory?
>    3. The error occurs repeatedly?
>
>
> Please have a check and give me a reply. (disclaimer: I added zstd support
> to Apache Kafka.)
>
>
> Thanks,
>
> Dongjin
>
> On Thu, Mar 18, 2021 at 12:17 AM Loren Abigail Sion <si...@gmail.com>
> wrote:
>
> > Good day,
> >
> >
> > We're currently in the process of implementing our application with kafka
> > compression type ZStandard (zstd).
> >
> > However during the testing process the consumer encountered this error:
> >
> >  [ERROR] (consumer-1)
> >
> >
> org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer
> >    - Container exception
> > org.apache.kafka.common.KafkaException: Received exception when fetching
> > the next record from dp.-------------------. If needed, please seek past
> > the record to continue consumption.
> >         at
> >
> >
> org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1228)
> >         at
> >
> >
> org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1400(Fetcher.java:1096)
> >         at
> >
> >
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:544)
> >         at
> >
> >
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:505)
> >         at
> >
> >
> org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1225)
> >         at
> >
> >
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1188)
> >         at
> >
> >
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1123)
> >         at
> >
> >
> org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:532)
> >         at
> >
> >
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
> >         at
> > java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
> >         at java.base/java.lang.Thread.run(Thread.java:834)
> > Caused by: org.apache.kafka.common.KafkaException:
> > java.lang.ExceptionInInitializerError: Cannot unpack libzstd-jni
> >
> > *Here's the version for the producer and consumer:*
> >
> > Producer Kafka Client Version (using ZStandard compression): 2.5.1
> > Consumer Kafka Client Version: 2.1.0
> >
> > Could you help us identify what caused this error? Do we need to upgrade
> > the version on the consumer side?
> >
> >
> > Best Regards,
> >
> > Loren Sion
> >
>
>
> --
> *Dongjin Lee*
>
> *A hitchhiker in the mathematical world.*
>
>
>
> *github:  <http://goog_969573159/>github.com/dongjinleekr
> <https://github.com/dongjinleekr>keybase: https://keybase.io/dongjinleekr
> <https://keybase.io/dongjinleekr>linkedin: kr.linkedin.com/in/dongjinleekr
> <https://kr.linkedin.com/in/dongjinleekr>speakerdeck:
> speakerdeck.com/dongjin
> <https://speakerdeck.com/dongjin>*
>

Re: Inquiry about usage of Kafka Compression

Posted by Dongjin Lee <do...@apache.org>.
Hi Loren,


This error occurs when your application fails to load the libzstd-jni
library file from the temporary library.



   - zstd-jni v1.3.5-4 (included in Kafka 2.1.0):
   https://github.com/luben/zstd-jni/blob/v1.3.5-4/src/main/java/com/github/luben/zstd/util/Native.java#L101
   - zstd-jni lastest:
   https://github.com/luben/zstd-jni/blob/master/src/main/java/com/github/luben/zstd/util/Native.java#L137


In short, zstd-jni works like the following:



   1. It includes all supported platform's shared library in its jar.
   2. When initializing, it copies the appropriate library into the temp
   directory.
   3. Load the extracted library into the memory.


So, It would be good to check:



   1. What is the platform you are using?
   2. Do you have enough space or permission for the temp directory?
   3. The error occurs repeatedly?


Please have a check and give me a reply. (disclaimer: I added zstd support
to Apache Kafka.)


Thanks,

Dongjin

On Thu, Mar 18, 2021 at 12:17 AM Loren Abigail Sion <si...@gmail.com>
wrote:

> Good day,
>
>
> We're currently in the process of implementing our application with kafka
> compression type ZStandard (zstd).
>
> However during the testing process the consumer encountered this error:
>
>  [ERROR] (consumer-1)
>
> org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer
>    - Container exception
> org.apache.kafka.common.KafkaException: Received exception when fetching
> the next record from dp.-------------------. If needed, please seek past
> the record to continue consumption.
>         at
>
> org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1228)
>         at
>
> org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1400(Fetcher.java:1096)
>         at
>
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:544)
>         at
>
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:505)
>         at
>
> org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1225)
>         at
>
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1188)
>         at
>
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1123)
>         at
>
> org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:532)
>         at
>
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>         at
> java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>         at java.base/java.lang.Thread.run(Thread.java:834)
> Caused by: org.apache.kafka.common.KafkaException:
> java.lang.ExceptionInInitializerError: Cannot unpack libzstd-jni
>
> *Here's the version for the producer and consumer:*
>
> Producer Kafka Client Version (using ZStandard compression): 2.5.1
> Consumer Kafka Client Version: 2.1.0
>
> Could you help us identify what caused this error? Do we need to upgrade
> the version on the consumer side?
>
>
> Best Regards,
>
> Loren Sion
>


-- 
*Dongjin Lee*

*A hitchhiker in the mathematical world.*



*github:  <http://goog_969573159/>github.com/dongjinleekr
<https://github.com/dongjinleekr>keybase: https://keybase.io/dongjinleekr
<https://keybase.io/dongjinleekr>linkedin: kr.linkedin.com/in/dongjinleekr
<https://kr.linkedin.com/in/dongjinleekr>speakerdeck: speakerdeck.com/dongjin
<https://speakerdeck.com/dongjin>*