You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Jérôme BAROTIN <je...@barotin.fr> on 2015/10/17 10:21:15 UTC

Kafka 8.2.2 doesn't want to compress in snappy

Hello,

I want to check if the snappy compression works well with the java Kafka client.

In order to handle this, I set up a small program. This program
generate 1024 messages of readable data. Their size are of 1024 bytes
each. I send these messages on tree new topics and after I check the
size of these topic directly on the broker filesystem.

You can find this program through the following java code :

    package unit_test.testCompress;

    import java.util.HashMap;
    import java.util.Map;
    import java.util.Random;
    import java.util.concurrent.Future;

    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;


    /**
     * Can be use in order to execute some unit test on compression
     */
    public class TestCompress {

        public static void compress(String type, String version){
            Map<String,Object> configs = new HashMap<String,Object>();
            configs.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
            configs.put("producer.type", "async");
            configs.put("compression.type", type);
            configs.put("value.serializer",
"org.apache.kafka.common.serialization.ByteArraySerializer");
            configs.put("partitioner.class",
"com.astellia.astkafkaproducer.RecordPartitioner");
            configs.put("bootstrap.servers", "kafka:9092");


            KafkaProducer<String, byte[]> producer = new
KafkaProducer<String, byte[]>(configs);

            Random r = new Random(15415485);
            int size = 1024; //1 Ko
            byte[] buffer = new byte[size];
            for(int i = 0; i < size; i++){
                buffer[i] = (byte) ('A' + (r.nextInt() % 26));
            }
            buffer[size-1] = 0;
            //System.out.println(new String(buffer));
            for(int i = 0; i < size; i++ ){
                Future<RecordMetadata> result = producer.send( new
ProducerRecord<String, byte[]>("unit_test_compress_"+version+ "_" +
type , buffer));
            }

            producer.close();
        }

        public static void main(String[] args) {

            String version = "v10";
            compress("snappy",version);
            compress("gzip",version);
            compress("none",version);

        }

    }


I'm compiling this code with this following maven pom file :

        <project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
      <modelVersion>4.0.0</modelVersion>

      <groupId>unit_test</groupId>
      <artifactId>testCompress</artifactId>
      <version>0.0.1-SNAPSHOT</version>
      <packaging>jar</packaging>

      <name>testCompress</name>
      <url>http://maven.apache.org</url>

      <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
      </properties>

      <dependencies>
         <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.10</artifactId>
            <version>0.8.2.2</version>
        </dependency>
      </dependencies>
    </project>

This program executes very well on my computer.

But when I check the results directly on my kafka broker through the
command line tool "du" the space took by each topics. I found  :
- gzip topic is compressed that's ok
- none topic is not compressed that's ok
- but snappy topic is not compressed, that's not ok
(screenshot can be found here : http://i.stack.imgur.com/7W1f5.png)


I checked though vi the stored file and data are still clear.

I'm aware about this issue on Kafka 8.2.1 :
https://issues.apache.org/jira/browse/KAFKA-2189

But I'm using Kafka 8.2.2 on producer and kafka 8.2.1 on broker.

I checked the dependency of Snappy as well. I'm using the 1.1.1.7

Have you an idea of how to enable snappy compression on Kafak ?
Did I forget a parameter to enable snappy compression on kafka ?
Are my kafka version not compatible ?

Re: Kafka 8.2.2 doesn't want to compress in snappy

Posted by Gwen Shapira <gw...@confluent.io>.
We compress a batch of messages together, but we need to give each
message its own offset (and know its key if we want to use topic
compaction), so messages are un-compressed and re-compressed.

We are working on an improvement to add relative offsets which will
allow the broker to skip this re-compression.

Gwen

On Tue, Oct 20, 2015 at 3:18 AM, Jérôme BAROTIN <je...@barotin.fr> wrote:
> Hi,
>
> I was 100% sure that Kafka broker didn't compress data and I didn't
> think that I had to upgrade my broker to 8.2.2.
>
> I tried the upgrade and It works right now!
>
> I still don't understand, why the broker need to compress data again
> (if the data compression is already done in the producer). Have you a
> link for wiki, documentation or other to share about that?
>
> Anyway, thanks for your help to solve this mistake.
>
> Regards,
>
> Jérôme
>
> 2015-10-20 1:26 GMT+02:00 Jun Rao <ju...@confluent.io>:
>> You will need to upgrade the broker to 0.8.2.2. Broker currently
>> recompresses messages. In 0.8.2.1, the snappy jar has a bug that causes
>> data explosion. We fixed the snappy jar in 0.8.2.2. If you upgrade the
>> broker to 0.8.2.2, it will pick up the fixed snappy jar.
>>
>> Thanks,
>>
>> Jun
>>
>> On Sat, Oct 17, 2015 at 1:21 AM, Jérôme BAROTIN <je...@barotin.fr> wrote:
>>
>>> Hello,
>>>
>>> I want to check if the snappy compression works well with the java Kafka
>>> client.
>>>
>>> In order to handle this, I set up a small program. This program
>>> generate 1024 messages of readable data. Their size are of 1024 bytes
>>> each. I send these messages on tree new topics and after I check the
>>> size of these topic directly on the broker filesystem.
>>>
>>> You can find this program through the following java code :
>>>
>>>     package unit_test.testCompress;
>>>
>>>     import java.util.HashMap;
>>>     import java.util.Map;
>>>     import java.util.Random;
>>>     import java.util.concurrent.Future;
>>>
>>>     import org.apache.kafka.clients.producer.KafkaProducer;
>>>     import org.apache.kafka.clients.producer.ProducerRecord;
>>>     import org.apache.kafka.clients.producer.RecordMetadata;
>>>
>>>
>>>     /**
>>>      * Can be use in order to execute some unit test on compression
>>>      */
>>>     public class TestCompress {
>>>
>>>         public static void compress(String type, String version){
>>>             Map<String,Object> configs = new HashMap<String,Object>();
>>>             configs.put("key.serializer",
>>> "org.apache.kafka.common.serialization.StringSerializer");
>>>             configs.put("producer.type", "async");
>>>             configs.put("compression.type", type);
>>>             configs.put("value.serializer",
>>> "org.apache.kafka.common.serialization.ByteArraySerializer");
>>>             configs.put("partitioner.class",
>>> "com.astellia.astkafkaproducer.RecordPartitioner");
>>>             configs.put("bootstrap.servers", "kafka:9092");
>>>
>>>
>>>             KafkaProducer<String, byte[]> producer = new
>>> KafkaProducer<String, byte[]>(configs);
>>>
>>>             Random r = new Random(15415485);
>>>             int size = 1024; //1 Ko
>>>             byte[] buffer = new byte[size];
>>>             for(int i = 0; i < size; i++){
>>>                 buffer[i] = (byte) ('A' + (r.nextInt() % 26));
>>>             }
>>>             buffer[size-1] = 0;
>>>             //System.out.println(new String(buffer));
>>>             for(int i = 0; i < size; i++ ){
>>>                 Future<RecordMetadata> result = producer.send( new
>>> ProducerRecord<String, byte[]>("unit_test_compress_"+version+ "_" +
>>> type , buffer));
>>>             }
>>>
>>>             producer.close();
>>>         }
>>>
>>>         public static void main(String[] args) {
>>>
>>>             String version = "v10";
>>>             compress("snappy",version);
>>>             compress("gzip",version);
>>>             compress("none",version);
>>>
>>>         }
>>>
>>>     }
>>>
>>>
>>> I'm compiling this code with this following maven pom file :
>>>
>>>         <project xmlns="http://maven.apache.org/POM/4.0.0"
>>> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
>>>       xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
>>> http://maven.apache.org/xsd/maven-4.0.0.xsd">
>>>       <modelVersion>4.0.0</modelVersion>
>>>
>>>       <groupId>unit_test</groupId>
>>>       <artifactId>testCompress</artifactId>
>>>       <version>0.0.1-SNAPSHOT</version>
>>>       <packaging>jar</packaging>
>>>
>>>       <name>testCompress</name>
>>>       <url>http://maven.apache.org</url>
>>>
>>>       <properties>
>>>         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
>>>       </properties>
>>>
>>>       <dependencies>
>>>          <dependency>
>>>             <groupId>org.apache.kafka</groupId>
>>>             <artifactId>kafka_2.10</artifactId>
>>>             <version>0.8.2.2</version>
>>>         </dependency>
>>>       </dependencies>
>>>     </project>
>>>
>>> This program executes very well on my computer.
>>>
>>> But when I check the results directly on my kafka broker through the
>>> command line tool "du" the space took by each topics. I found  :
>>> - gzip topic is compressed that's ok
>>> - none topic is not compressed that's ok
>>> - but snappy topic is not compressed, that's not ok
>>> (screenshot can be found here : http://i.stack.imgur.com/7W1f5.png)
>>>
>>>
>>> I checked though vi the stored file and data are still clear.
>>>
>>> I'm aware about this issue on Kafka 8.2.1 :
>>> https://issues.apache.org/jira/browse/KAFKA-2189
>>>
>>> But I'm using Kafka 8.2.2 on producer and kafka 8.2.1 on broker.
>>>
>>> I checked the dependency of Snappy as well. I'm using the 1.1.1.7
>>>
>>> Have you an idea of how to enable snappy compression on Kafak ?
>>> Did I forget a parameter to enable snappy compression on kafka ?
>>> Are my kafka version not compatible ?
>>>

Re: Kafka 8.2.2 doesn't want to compress in snappy

Posted by Jérôme BAROTIN <je...@barotin.fr>.
Hi,

I was 100% sure that Kafka broker didn't compress data and I didn't
think that I had to upgrade my broker to 8.2.2.

I tried the upgrade and It works right now!

I still don't understand, why the broker need to compress data again
(if the data compression is already done in the producer). Have you a
link for wiki, documentation or other to share about that?

Anyway, thanks for your help to solve this mistake.

Regards,

Jérôme

2015-10-20 1:26 GMT+02:00 Jun Rao <ju...@confluent.io>:
> You will need to upgrade the broker to 0.8.2.2. Broker currently
> recompresses messages. In 0.8.2.1, the snappy jar has a bug that causes
> data explosion. We fixed the snappy jar in 0.8.2.2. If you upgrade the
> broker to 0.8.2.2, it will pick up the fixed snappy jar.
>
> Thanks,
>
> Jun
>
> On Sat, Oct 17, 2015 at 1:21 AM, Jérôme BAROTIN <je...@barotin.fr> wrote:
>
>> Hello,
>>
>> I want to check if the snappy compression works well with the java Kafka
>> client.
>>
>> In order to handle this, I set up a small program. This program
>> generate 1024 messages of readable data. Their size are of 1024 bytes
>> each. I send these messages on tree new topics and after I check the
>> size of these topic directly on the broker filesystem.
>>
>> You can find this program through the following java code :
>>
>>     package unit_test.testCompress;
>>
>>     import java.util.HashMap;
>>     import java.util.Map;
>>     import java.util.Random;
>>     import java.util.concurrent.Future;
>>
>>     import org.apache.kafka.clients.producer.KafkaProducer;
>>     import org.apache.kafka.clients.producer.ProducerRecord;
>>     import org.apache.kafka.clients.producer.RecordMetadata;
>>
>>
>>     /**
>>      * Can be use in order to execute some unit test on compression
>>      */
>>     public class TestCompress {
>>
>>         public static void compress(String type, String version){
>>             Map<String,Object> configs = new HashMap<String,Object>();
>>             configs.put("key.serializer",
>> "org.apache.kafka.common.serialization.StringSerializer");
>>             configs.put("producer.type", "async");
>>             configs.put("compression.type", type);
>>             configs.put("value.serializer",
>> "org.apache.kafka.common.serialization.ByteArraySerializer");
>>             configs.put("partitioner.class",
>> "com.astellia.astkafkaproducer.RecordPartitioner");
>>             configs.put("bootstrap.servers", "kafka:9092");
>>
>>
>>             KafkaProducer<String, byte[]> producer = new
>> KafkaProducer<String, byte[]>(configs);
>>
>>             Random r = new Random(15415485);
>>             int size = 1024; //1 Ko
>>             byte[] buffer = new byte[size];
>>             for(int i = 0; i < size; i++){
>>                 buffer[i] = (byte) ('A' + (r.nextInt() % 26));
>>             }
>>             buffer[size-1] = 0;
>>             //System.out.println(new String(buffer));
>>             for(int i = 0; i < size; i++ ){
>>                 Future<RecordMetadata> result = producer.send( new
>> ProducerRecord<String, byte[]>("unit_test_compress_"+version+ "_" +
>> type , buffer));
>>             }
>>
>>             producer.close();
>>         }
>>
>>         public static void main(String[] args) {
>>
>>             String version = "v10";
>>             compress("snappy",version);
>>             compress("gzip",version);
>>             compress("none",version);
>>
>>         }
>>
>>     }
>>
>>
>> I'm compiling this code with this following maven pom file :
>>
>>         <project xmlns="http://maven.apache.org/POM/4.0.0"
>> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
>>       xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
>> http://maven.apache.org/xsd/maven-4.0.0.xsd">
>>       <modelVersion>4.0.0</modelVersion>
>>
>>       <groupId>unit_test</groupId>
>>       <artifactId>testCompress</artifactId>
>>       <version>0.0.1-SNAPSHOT</version>
>>       <packaging>jar</packaging>
>>
>>       <name>testCompress</name>
>>       <url>http://maven.apache.org</url>
>>
>>       <properties>
>>         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
>>       </properties>
>>
>>       <dependencies>
>>          <dependency>
>>             <groupId>org.apache.kafka</groupId>
>>             <artifactId>kafka_2.10</artifactId>
>>             <version>0.8.2.2</version>
>>         </dependency>
>>       </dependencies>
>>     </project>
>>
>> This program executes very well on my computer.
>>
>> But when I check the results directly on my kafka broker through the
>> command line tool "du" the space took by each topics. I found  :
>> - gzip topic is compressed that's ok
>> - none topic is not compressed that's ok
>> - but snappy topic is not compressed, that's not ok
>> (screenshot can be found here : http://i.stack.imgur.com/7W1f5.png)
>>
>>
>> I checked though vi the stored file and data are still clear.
>>
>> I'm aware about this issue on Kafka 8.2.1 :
>> https://issues.apache.org/jira/browse/KAFKA-2189
>>
>> But I'm using Kafka 8.2.2 on producer and kafka 8.2.1 on broker.
>>
>> I checked the dependency of Snappy as well. I'm using the 1.1.1.7
>>
>> Have you an idea of how to enable snappy compression on Kafak ?
>> Did I forget a parameter to enable snappy compression on kafka ?
>> Are my kafka version not compatible ?
>>

Re: Kafka 8.2.2 doesn't want to compress in snappy

Posted by Jun Rao <ju...@confluent.io>.
You will need to upgrade the broker to 0.8.2.2. Broker currently
recompresses messages. In 0.8.2.1, the snappy jar has a bug that causes
data explosion. We fixed the snappy jar in 0.8.2.2. If you upgrade the
broker to 0.8.2.2, it will pick up the fixed snappy jar.

Thanks,

Jun

On Sat, Oct 17, 2015 at 1:21 AM, Jérôme BAROTIN <je...@barotin.fr> wrote:

> Hello,
>
> I want to check if the snappy compression works well with the java Kafka
> client.
>
> In order to handle this, I set up a small program. This program
> generate 1024 messages of readable data. Their size are of 1024 bytes
> each. I send these messages on tree new topics and after I check the
> size of these topic directly on the broker filesystem.
>
> You can find this program through the following java code :
>
>     package unit_test.testCompress;
>
>     import java.util.HashMap;
>     import java.util.Map;
>     import java.util.Random;
>     import java.util.concurrent.Future;
>
>     import org.apache.kafka.clients.producer.KafkaProducer;
>     import org.apache.kafka.clients.producer.ProducerRecord;
>     import org.apache.kafka.clients.producer.RecordMetadata;
>
>
>     /**
>      * Can be use in order to execute some unit test on compression
>      */
>     public class TestCompress {
>
>         public static void compress(String type, String version){
>             Map<String,Object> configs = new HashMap<String,Object>();
>             configs.put("key.serializer",
> "org.apache.kafka.common.serialization.StringSerializer");
>             configs.put("producer.type", "async");
>             configs.put("compression.type", type);
>             configs.put("value.serializer",
> "org.apache.kafka.common.serialization.ByteArraySerializer");
>             configs.put("partitioner.class",
> "com.astellia.astkafkaproducer.RecordPartitioner");
>             configs.put("bootstrap.servers", "kafka:9092");
>
>
>             KafkaProducer<String, byte[]> producer = new
> KafkaProducer<String, byte[]>(configs);
>
>             Random r = new Random(15415485);
>             int size = 1024; //1 Ko
>             byte[] buffer = new byte[size];
>             for(int i = 0; i < size; i++){
>                 buffer[i] = (byte) ('A' + (r.nextInt() % 26));
>             }
>             buffer[size-1] = 0;
>             //System.out.println(new String(buffer));
>             for(int i = 0; i < size; i++ ){
>                 Future<RecordMetadata> result = producer.send( new
> ProducerRecord<String, byte[]>("unit_test_compress_"+version+ "_" +
> type , buffer));
>             }
>
>             producer.close();
>         }
>
>         public static void main(String[] args) {
>
>             String version = "v10";
>             compress("snappy",version);
>             compress("gzip",version);
>             compress("none",version);
>
>         }
>
>     }
>
>
> I'm compiling this code with this following maven pom file :
>
>         <project xmlns="http://maven.apache.org/POM/4.0.0"
> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
>       xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
> http://maven.apache.org/xsd/maven-4.0.0.xsd">
>       <modelVersion>4.0.0</modelVersion>
>
>       <groupId>unit_test</groupId>
>       <artifactId>testCompress</artifactId>
>       <version>0.0.1-SNAPSHOT</version>
>       <packaging>jar</packaging>
>
>       <name>testCompress</name>
>       <url>http://maven.apache.org</url>
>
>       <properties>
>         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
>       </properties>
>
>       <dependencies>
>          <dependency>
>             <groupId>org.apache.kafka</groupId>
>             <artifactId>kafka_2.10</artifactId>
>             <version>0.8.2.2</version>
>         </dependency>
>       </dependencies>
>     </project>
>
> This program executes very well on my computer.
>
> But when I check the results directly on my kafka broker through the
> command line tool "du" the space took by each topics. I found  :
> - gzip topic is compressed that's ok
> - none topic is not compressed that's ok
> - but snappy topic is not compressed, that's not ok
> (screenshot can be found here : http://i.stack.imgur.com/7W1f5.png)
>
>
> I checked though vi the stored file and data are still clear.
>
> I'm aware about this issue on Kafka 8.2.1 :
> https://issues.apache.org/jira/browse/KAFKA-2189
>
> But I'm using Kafka 8.2.2 on producer and kafka 8.2.1 on broker.
>
> I checked the dependency of Snappy as well. I'm using the 1.1.1.7
>
> Have you an idea of how to enable snappy compression on Kafak ?
> Did I forget a parameter to enable snappy compression on kafka ?
> Are my kafka version not compatible ?
>

Re: Kafka 8.2.2 doesn't want to compress in snappy

Posted by Lukas Steiblys <lu...@doubledutch.me>.
Sounds similar to my experience right now. We have 110GB of JSON data that I 
convert to AVRO and upload to Kafka 0.8.2.2 using the 0.8.2.2 Java producer 
with snappy compression. The entire logs in Kafka end up 53GB in size, which 
is way bigger than I'd expect. I haven't had time to dig into this problem 
yet.

Lukas

-----Original Message----- 
From: Jérôme BAROTIN
Sent: Saturday, October 17, 2015 1:21 AM
To: users@kafka.apache.org
Subject: Kafka 8.2.2 doesn't want to compress in snappy

Hello,

I want to check if the snappy compression works well with the java Kafka 
client.

In order to handle this, I set up a small program. This program
generate 1024 messages of readable data. Their size are of 1024 bytes
each. I send these messages on tree new topics and after I check the
size of these topic directly on the broker filesystem.

You can find this program through the following java code :

    package unit_test.testCompress;

    import java.util.HashMap;
    import java.util.Map;
    import java.util.Random;
    import java.util.concurrent.Future;

    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;


    /**
     * Can be use in order to execute some unit test on compression
     */
    public class TestCompress {

        public static void compress(String type, String version){
            Map<String,Object> configs = new HashMap<String,Object>();
            configs.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
            configs.put("producer.type", "async");
            configs.put("compression.type", type);
            configs.put("value.serializer",
"org.apache.kafka.common.serialization.ByteArraySerializer");
            configs.put("partitioner.class",
"com.astellia.astkafkaproducer.RecordPartitioner");
            configs.put("bootstrap.servers", "kafka:9092");


            KafkaProducer<String, byte[]> producer = new
KafkaProducer<String, byte[]>(configs);

            Random r = new Random(15415485);
            int size = 1024; //1 Ko
            byte[] buffer = new byte[size];
            for(int i = 0; i < size; i++){
                buffer[i] = (byte) ('A' + (r.nextInt() % 26));
            }
            buffer[size-1] = 0;
            //System.out.println(new String(buffer));
            for(int i = 0; i < size; i++ ){
                Future<RecordMetadata> result = producer.send( new
ProducerRecord<String, byte[]>("unit_test_compress_"+version+ "_" +
type , buffer));
            }

            producer.close();
        }

        public static void main(String[] args) {

            String version = "v10";
            compress("snappy",version);
            compress("gzip",version);
            compress("none",version);

        }

    }


I'm compiling this code with this following maven pom file :

        <project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
      <modelVersion>4.0.0</modelVersion>

      <groupId>unit_test</groupId>
      <artifactId>testCompress</artifactId>
      <version>0.0.1-SNAPSHOT</version>
      <packaging>jar</packaging>

      <name>testCompress</name>
      <url>http://maven.apache.org</url>

      <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
      </properties>

      <dependencies>
         <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.10</artifactId>
            <version>0.8.2.2</version>
        </dependency>
      </dependencies>
    </project>

This program executes very well on my computer.

But when I check the results directly on my kafka broker through the
command line tool "du" the space took by each topics. I found  :
- gzip topic is compressed that's ok
- none topic is not compressed that's ok
- but snappy topic is not compressed, that's not ok
(screenshot can be found here : http://i.stack.imgur.com/7W1f5.png)


I checked though vi the stored file and data are still clear.

I'm aware about this issue on Kafka 8.2.1 :
https://issues.apache.org/jira/browse/KAFKA-2189

But I'm using Kafka 8.2.2 on producer and kafka 8.2.1 on broker.

I checked the dependency of Snappy as well. I'm using the 1.1.1.7

Have you an idea of how to enable snappy compression on Kafak ?
Did I forget a parameter to enable snappy compression on kafka ?
Are my kafka version not compatible ?