You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Scott Wang <sc...@rumbleentertainment.com> on 2013/07/08 23:53:52 UTC

having problem with 0.8 gzip compression

I am testing with Kafka 0.8 beta and having problem of receiving message in
consumer.  There is no error so does anyone have any insights.  When I
commented out the "compression.code" everything works fine.

My producer:
public class TestKafka08Prod {

    public static void main(String [] args) {

        Producer<Integer, String> producer = null;
        try {
            Properties props = new Properties();
            props.put("metadata.broker.list", "localhost:9092");
            props.put("serializer.class", "kafka.serializer.StringEncoder");
            props.put("producer.type", "sync");
            props.put("request.required.acks","1");
            props.put("compression.codec", "gzip");
            ProducerConfig config = new ProducerConfig(props);
            producer = new Producer<Integer, String>(config);
            int j=0;
            for(int i=0; i<10; i++) {
                KeyedMessage<Integer, String> data = new
KeyedMessage<Integer, String>("test-topic", "test-message: "+i+"
"+System.currentTimeMillis());
                producer.send(data);

            }

        } catch (Exception e) {
            System.out.println("Error happened: ");
            e.printStackTrace();
        } finally {
            if(null != null) {
                producer.close();
            }

            System.out.println("Ened of Sending");
        }

        System.exit(0);
    }
}


My consumer:

public class TestKafka08Consumer {
    public static void main(String [] args) throws UnknownHostException,
SocketException {

        Properties props = new Properties();
        props.put("zookeeper.connect", "localhost:2181/kafka_0_8");
        props.put("group.id", "test08ConsumerId");
        props.put("zk.sessiontimeout.ms", "4000");
        props.put("zk.synctime.ms", "2000");
        props.put("autocommit.interval.ms", "1000");

        ConsumerConfig consumerConfig = new ConsumerConfig(props);

        ConsumerConnector consumerConnector =
kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig);

        String topic = "test-topic";
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, new Integer(1));
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
consumerConnector.createMessageStreams(topicCountMap);
        KafkaStream<byte[], byte[]> stream =  consumerMap.get(topic).get(0);

        ConsumerIterator<byte[], byte[]> it = stream.iterator();

        int counter=0;
        while(it.hasNext()) {
            try {
                String fromPlatform = new String(it.next().message());
                System.out.println("The messages: "+fromPlatform);
            } catch(Exception e) {
                e.printStackTrace();
            }
        }
        System.out.println("SystemOut");
    }
}


Thanks

Fwd: having problem with 0.8 gzip compression

Posted by Scott Wang <sc...@rumbleentertainment.com>.
I am testing with Kafka 0.8 beta and having problem of receiving message in
consumer.  There is no error so does anyone have any insights.  When I
commented out the "compression.code" everything works fine.

My producer:
public class TestKafka08Prod {

    public static void main(String [] args) {

        Producer<Integer, String> producer = null;
        try {
            Properties props = new Properties();
            props.put("metadata.broker.list", "localhost:9092");
            props.put("serializer.class", "kafka.serializer.StringEncoder");
            props.put("producer.type", "sync");
            props.put("request.required.acks","1");
            props.put("compression.codec", "gzip");
            ProducerConfig config = new ProducerConfig(props);
            producer = new Producer<Integer, String>(config);
            int j=0;
            for(int i=0; i<10; i++) {
                KeyedMessage<Integer, String> data = new
KeyedMessage<Integer, String>("test-topic", "test-message: "+i+"
"+System.currentTimeMillis());
                producer.send(data);

            }

        } catch (Exception e) {
            System.out.println("Error happened: ");
            e.printStackTrace();
        } finally {
            if(null != null) {
                producer.close();
            }

            System.out.println("Ened of Sending");
        }

        System.exit(0);
    }
}


My consumer:

public class TestKafka08Consumer {
    public static void main(String [] args) throws UnknownHostException,
SocketException {

        Properties props = new Properties();
        props.put("zookeeper.connect", "localhost:2181/kafka_0_8");
        props.put("group.id", "test08ConsumerId");
        props.put("zk.sessiontimeout.ms", "4000");
        props.put("zk.synctime.ms", "2000");
        props.put("autocommit.interval.ms", "1000");

        ConsumerConfig consumerConfig = new ConsumerConfig(props);

        ConsumerConnector consumerConnector =
kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig);

        String topic = "test-topic";
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, new Integer(1));
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
consumerConnector.createMessageStreams(topicCountMap);
        KafkaStream<byte[], byte[]> stream =  consumerMap.get(topic).get(0);

        ConsumerIterator<byte[], byte[]> it = stream.iterator();

        int counter=0;
        while(it.hasNext()) {
            try {
                String fromPlatform = new String(it.next().message());
                System.out.println("The messages: "+fromPlatform);
            } catch(Exception e) {
                e.printStackTrace();
            }
        }
        System.out.println("SystemOut");
    }
}


Thanks

Re: having problem with 0.8 gzip compression

Posted by Scott Wang <sc...@rumbleentertainment.com>.
Ok, the problem solved, I think it might be because some of the jar files
that I was using were "OLD".  I was building the producer and consumer
under the 0.7 environment except swapping out the kafka jar file.   Now, I
created a whole new environment and pull in all the jar files from the
0.8.  That seems to solve my 0.8 gzip problem.   Thank you for all the
help.

Re: having problem with 0.8 gzip compression

Posted by Scott Wang <sc...@rumbleentertainment.com>.
Joel,

Would you mind point me to how I would be able to enable the trace logs in
the producer and broker?

Thanks,
Scott


On Wed, Jul 10, 2013 at 5:33 PM, Joel Koshy <jj...@gmail.com> wrote:

> Weird - I tried your exact code and it worked for me (although I was
> using 0.8 head and not the beta). Can you re-run with trace logs
> enabled in your producer and paste that output? Broker logs also if
> you can?
>
> Thanks,
>
> Joel
>
> On Wed, Jul 10, 2013 at 10:23 AM, Scott Wang
> <sc...@rumbleentertainment.com> wrote:
> > Jun,
> >
> > I did a test this morning and got a very interesting result with you
> > command.  I started by wipe all the log files and clean up all zookeeper
> > data files.
> >
> > Once I restarted both server, producer and consumer then execute your
> > command, what I got is a empty log as following:
> >
> > Dumping /Users/scott/Temp/kafka/test-topic-0/00000000000000000000.log
> > Starting offset: 0
> >
> > One observation, the 00000000000000000000.index file was getting huge but
> > there was nothing in 00000000000000000000.log file.
> >
> > Thanks,
> > Scott
> >
> >
> >
> >
> > On Tue, Jul 9, 2013 at 8:40 PM, Jun Rao <ju...@gmail.com> wrote:
> >
> >> Could you run the following command on one of the log files of your
> topic
> >> and attach the output?
> >>
> >> bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files
> >> /tmp/kafka-logs/testtopic-0/00000000000000000000.log
> >>
> >> Thanks,
> >>
> >> Jun
> >>
> >>
> >> On Tue, Jul 9, 2013 at 3:23 PM, Scott Wang <
> >> scott.wang@rumbleentertainment.com> wrote:
> >>
> >> > Another piece of information, the snappy compression also does not
> work.
> >> >
> >> > Thanks,
> >> > Scott
> >> >
> >> >
> >> > On Tue, Jul 9, 2013 at 11:07 AM, Scott Wang <
> >> > scott.wang@rumbleentertainment.com> wrote:
> >> >
> >> > > I just try it and it still not showing up, thanks for looking into
> >> this.
> >> > >
> >> > > Thanks,
> >> > > Scott
> >> > >
> >> > >
> >> > > On Tue, Jul 9, 2013 at 8:06 AM, Jun Rao <ju...@gmail.com> wrote:
> >> > >
> >> > >> Could you try starting the consumer first (and enable gzip in the
> >> > >> producer)?
> >> > >>
> >> > >> Thanks,
> >> > >>
> >> > >> Jun
> >> > >>
> >> > >>
> >> > >> On Mon, Jul 8, 2013 at 9:37 PM, Scott Wang <
> >> > >> scott.wang@rumbleentertainment.com> wrote:
> >> > >>
> >> > >> > No, I did not start the consumer before the producer.  I actually
> >> > >> started
> >> > >> > the producer first and nothing showed up in the consumer unless I
> >> > >> commented
> >> > >> > out this line -- props.put("compression.codec", "gzip").    If I
> >> > >> commented
> >> > >> > out the compression codec, everything just works.
> >> > >> >
> >> > >> >
> >> > >> > On Mon, Jul 8, 2013 at 9:07 PM, Jun Rao <ju...@gmail.com>
> wrote:
> >> > >> >
> >> > >> > > Did you start the consumer before the producer? Be default, the
> >> > >> consumer
> >> > >> > > gets only the new data?
> >> > >> > >
> >> > >> > > Thanks,
> >> > >> > >
> >> > >> > > Jun
> >> > >> > >
> >> > >> > >
> >> > >> > > On Mon, Jul 8, 2013 at 2:53 PM, Scott Wang <
> >> > >> > > scott.wang@rumbleentertainment.com> wrote:
> >> > >> > >
> >> > >> > > > I am testing with Kafka 0.8 beta and having problem of
> receiving
> >> > >> > message
> >> > >> > > in
> >> > >> > > > consumer.  There is no error so does anyone have any
> insights.
> >> > >>  When I
> >> > >> > > > commented out the "compression.code" everything works fine.
> >> > >> > > >
> >> > >> > > > My producer:
> >> > >> > > > public class TestKafka08Prod {
> >> > >> > > >
> >> > >> > > >     public static void main(String [] args) {
> >> > >> > > >
> >> > >> > > >         Producer<Integer, String> producer = null;
> >> > >> > > >         try {
> >> > >> > > >             Properties props = new Properties();
> >> > >> > > >             props.put("metadata.broker.list",
> "localhost:9092");
> >> > >> > > >             props.put("serializer.class",
> >> > >> > > > "kafka.serializer.StringEncoder");
> >> > >> > > >             props.put("producer.type", "sync");
> >> > >> > > >             props.put("request.required.acks","1");
> >> > >> > > >             props.put("compression.codec", "gzip");
> >> > >> > > >             ProducerConfig config = new
> ProducerConfig(props);
> >> > >> > > >             producer = new Producer<Integer, String>(config);
> >> > >> > > >             int j=0;
> >> > >> > > >             for(int i=0; i<10; i++) {
> >> > >> > > >                 KeyedMessage<Integer, String> data = new
> >> > >> > > > KeyedMessage<Integer, String>("test-topic", "test-message:
> "+i+"
> >> > >> > > > "+System.currentTimeMillis());
> >> > >> > > >                 producer.send(data);
> >> > >> > > >
> >> > >> > > >             }
> >> > >> > > >
> >> > >> > > >         } catch (Exception e) {
> >> > >> > > >             System.out.println("Error happened: ");
> >> > >> > > >             e.printStackTrace();
> >> > >> > > >         } finally {
> >> > >> > > >             if(null != null) {
> >> > >> > > >                 producer.close();
> >> > >> > > >             }
> >> > >> > > >
> >> > >> > > >             System.out.println("Ened of Sending");
> >> > >> > > >         }
> >> > >> > > >
> >> > >> > > >         System.exit(0);
> >> > >> > > >     }
> >> > >> > > > }
> >> > >> > > >
> >> > >> > > >
> >> > >> > > > My consumer:
> >> > >> > > >
> >> > >> > > > public class TestKafka08Consumer {
> >> > >> > > >     public static void main(String [] args) throws
> >> > >> > UnknownHostException,
> >> > >> > > > SocketException {
> >> > >> > > >
> >> > >> > > >         Properties props = new Properties();
> >> > >> > > >         props.put("zookeeper.connect",
> >> > "localhost:2181/kafka_0_8");
> >> > >> > > >         props.put("group.id", "test08ConsumerId");
> >> > >> > > >         props.put("zk.sessiontimeout.ms", "4000");
> >> > >> > > >         props.put("zk.synctime.ms", "2000");
> >> > >> > > >         props.put("autocommit.interval.ms", "1000");
> >> > >> > > >
> >> > >> > > >         ConsumerConfig consumerConfig = new
> >> ConsumerConfig(props);
> >> > >> > > >
> >> > >> > > >         ConsumerConnector consumerConnector =
> >> > >> > > >
> >> > kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig);
> >> > >> > > >
> >> > >> > > >         String topic = "test-topic";
> >> > >> > > >         Map<String, Integer> topicCountMap = new
> HashMap<String,
> >> > >> > > > Integer>();
> >> > >> > > >         topicCountMap.put(topic, new Integer(1));
> >> > >> > > >         Map<String, List<KafkaStream<byte[], byte[]>>>
> >> > consumerMap =
> >> > >> > > > consumerConnector.createMessageStreams(topicCountMap);
> >> > >> > > >         KafkaStream<byte[], byte[]> stream =
> >> > >> > > >  consumerMap.get(topic).get(0);
> >> > >> > > >
> >> > >> > > >         ConsumerIterator<byte[], byte[]> it =
> stream.iterator();
> >> > >> > > >
> >> > >> > > >         int counter=0;
> >> > >> > > >         while(it.hasNext()) {
> >> > >> > > >             try {
> >> > >> > > >                 String fromPlatform = new
> >> > >> String(it.next().message());
> >> > >> > > >                 System.out.println("The messages:
> >> "+fromPlatform);
> >> > >> > > >             } catch(Exception e) {
> >> > >> > > >                 e.printStackTrace();
> >> > >> > > >             }
> >> > >> > > >         }
> >> > >> > > >         System.out.println("SystemOut");
> >> > >> > > >     }
> >> > >> > > > }
> >> > >> > > >
> >> > >> > > >
> >> > >> > > > Thanks
> >> > >> > > >
> >> > >> > >
> >> > >> >
> >> > >>
> >> > >
> >> > >
> >> >
> >>
>

Re: having problem with 0.8 gzip compression

Posted by Joel Koshy <jj...@gmail.com>.
Weird - I tried your exact code and it worked for me (although I was
using 0.8 head and not the beta). Can you re-run with trace logs
enabled in your producer and paste that output? Broker logs also if
you can?

Thanks,

Joel

On Wed, Jul 10, 2013 at 10:23 AM, Scott Wang
<sc...@rumbleentertainment.com> wrote:
> Jun,
>
> I did a test this morning and got a very interesting result with you
> command.  I started by wipe all the log files and clean up all zookeeper
> data files.
>
> Once I restarted both server, producer and consumer then execute your
> command, what I got is a empty log as following:
>
> Dumping /Users/scott/Temp/kafka/test-topic-0/00000000000000000000.log
> Starting offset: 0
>
> One observation, the 00000000000000000000.index file was getting huge but
> there was nothing in 00000000000000000000.log file.
>
> Thanks,
> Scott
>
>
>
>
> On Tue, Jul 9, 2013 at 8:40 PM, Jun Rao <ju...@gmail.com> wrote:
>
>> Could you run the following command on one of the log files of your topic
>> and attach the output?
>>
>> bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files
>> /tmp/kafka-logs/testtopic-0/00000000000000000000.log
>>
>> Thanks,
>>
>> Jun
>>
>>
>> On Tue, Jul 9, 2013 at 3:23 PM, Scott Wang <
>> scott.wang@rumbleentertainment.com> wrote:
>>
>> > Another piece of information, the snappy compression also does not work.
>> >
>> > Thanks,
>> > Scott
>> >
>> >
>> > On Tue, Jul 9, 2013 at 11:07 AM, Scott Wang <
>> > scott.wang@rumbleentertainment.com> wrote:
>> >
>> > > I just try it and it still not showing up, thanks for looking into
>> this.
>> > >
>> > > Thanks,
>> > > Scott
>> > >
>> > >
>> > > On Tue, Jul 9, 2013 at 8:06 AM, Jun Rao <ju...@gmail.com> wrote:
>> > >
>> > >> Could you try starting the consumer first (and enable gzip in the
>> > >> producer)?
>> > >>
>> > >> Thanks,
>> > >>
>> > >> Jun
>> > >>
>> > >>
>> > >> On Mon, Jul 8, 2013 at 9:37 PM, Scott Wang <
>> > >> scott.wang@rumbleentertainment.com> wrote:
>> > >>
>> > >> > No, I did not start the consumer before the producer.  I actually
>> > >> started
>> > >> > the producer first and nothing showed up in the consumer unless I
>> > >> commented
>> > >> > out this line -- props.put("compression.codec", "gzip").    If I
>> > >> commented
>> > >> > out the compression codec, everything just works.
>> > >> >
>> > >> >
>> > >> > On Mon, Jul 8, 2013 at 9:07 PM, Jun Rao <ju...@gmail.com> wrote:
>> > >> >
>> > >> > > Did you start the consumer before the producer? Be default, the
>> > >> consumer
>> > >> > > gets only the new data?
>> > >> > >
>> > >> > > Thanks,
>> > >> > >
>> > >> > > Jun
>> > >> > >
>> > >> > >
>> > >> > > On Mon, Jul 8, 2013 at 2:53 PM, Scott Wang <
>> > >> > > scott.wang@rumbleentertainment.com> wrote:
>> > >> > >
>> > >> > > > I am testing with Kafka 0.8 beta and having problem of receiving
>> > >> > message
>> > >> > > in
>> > >> > > > consumer.  There is no error so does anyone have any insights.
>> > >>  When I
>> > >> > > > commented out the "compression.code" everything works fine.
>> > >> > > >
>> > >> > > > My producer:
>> > >> > > > public class TestKafka08Prod {
>> > >> > > >
>> > >> > > >     public static void main(String [] args) {
>> > >> > > >
>> > >> > > >         Producer<Integer, String> producer = null;
>> > >> > > >         try {
>> > >> > > >             Properties props = new Properties();
>> > >> > > >             props.put("metadata.broker.list", "localhost:9092");
>> > >> > > >             props.put("serializer.class",
>> > >> > > > "kafka.serializer.StringEncoder");
>> > >> > > >             props.put("producer.type", "sync");
>> > >> > > >             props.put("request.required.acks","1");
>> > >> > > >             props.put("compression.codec", "gzip");
>> > >> > > >             ProducerConfig config = new ProducerConfig(props);
>> > >> > > >             producer = new Producer<Integer, String>(config);
>> > >> > > >             int j=0;
>> > >> > > >             for(int i=0; i<10; i++) {
>> > >> > > >                 KeyedMessage<Integer, String> data = new
>> > >> > > > KeyedMessage<Integer, String>("test-topic", "test-message: "+i+"
>> > >> > > > "+System.currentTimeMillis());
>> > >> > > >                 producer.send(data);
>> > >> > > >
>> > >> > > >             }
>> > >> > > >
>> > >> > > >         } catch (Exception e) {
>> > >> > > >             System.out.println("Error happened: ");
>> > >> > > >             e.printStackTrace();
>> > >> > > >         } finally {
>> > >> > > >             if(null != null) {
>> > >> > > >                 producer.close();
>> > >> > > >             }
>> > >> > > >
>> > >> > > >             System.out.println("Ened of Sending");
>> > >> > > >         }
>> > >> > > >
>> > >> > > >         System.exit(0);
>> > >> > > >     }
>> > >> > > > }
>> > >> > > >
>> > >> > > >
>> > >> > > > My consumer:
>> > >> > > >
>> > >> > > > public class TestKafka08Consumer {
>> > >> > > >     public static void main(String [] args) throws
>> > >> > UnknownHostException,
>> > >> > > > SocketException {
>> > >> > > >
>> > >> > > >         Properties props = new Properties();
>> > >> > > >         props.put("zookeeper.connect",
>> > "localhost:2181/kafka_0_8");
>> > >> > > >         props.put("group.id", "test08ConsumerId");
>> > >> > > >         props.put("zk.sessiontimeout.ms", "4000");
>> > >> > > >         props.put("zk.synctime.ms", "2000");
>> > >> > > >         props.put("autocommit.interval.ms", "1000");
>> > >> > > >
>> > >> > > >         ConsumerConfig consumerConfig = new
>> ConsumerConfig(props);
>> > >> > > >
>> > >> > > >         ConsumerConnector consumerConnector =
>> > >> > > >
>> > kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig);
>> > >> > > >
>> > >> > > >         String topic = "test-topic";
>> > >> > > >         Map<String, Integer> topicCountMap = new HashMap<String,
>> > >> > > > Integer>();
>> > >> > > >         topicCountMap.put(topic, new Integer(1));
>> > >> > > >         Map<String, List<KafkaStream<byte[], byte[]>>>
>> > consumerMap =
>> > >> > > > consumerConnector.createMessageStreams(topicCountMap);
>> > >> > > >         KafkaStream<byte[], byte[]> stream =
>> > >> > > >  consumerMap.get(topic).get(0);
>> > >> > > >
>> > >> > > >         ConsumerIterator<byte[], byte[]> it = stream.iterator();
>> > >> > > >
>> > >> > > >         int counter=0;
>> > >> > > >         while(it.hasNext()) {
>> > >> > > >             try {
>> > >> > > >                 String fromPlatform = new
>> > >> String(it.next().message());
>> > >> > > >                 System.out.println("The messages:
>> "+fromPlatform);
>> > >> > > >             } catch(Exception e) {
>> > >> > > >                 e.printStackTrace();
>> > >> > > >             }
>> > >> > > >         }
>> > >> > > >         System.out.println("SystemOut");
>> > >> > > >     }
>> > >> > > > }
>> > >> > > >
>> > >> > > >
>> > >> > > > Thanks
>> > >> > > >
>> > >> > >
>> > >> >
>> > >>
>> > >
>> > >
>> >
>>

Re: having problem with 0.8 gzip compression

Posted by Scott Wang <sc...@rumbleentertainment.com>.
Jun,

I did a test this morning and got a very interesting result with you
command.  I started by wipe all the log files and clean up all zookeeper
data files.

Once I restarted both server, producer and consumer then execute your
command, what I got is a empty log as following:

Dumping /Users/scott/Temp/kafka/test-topic-0/00000000000000000000.log
Starting offset: 0

One observation, the 00000000000000000000.index file was getting huge but
there was nothing in 00000000000000000000.log file.

Thanks,
Scott




On Tue, Jul 9, 2013 at 8:40 PM, Jun Rao <ju...@gmail.com> wrote:

> Could you run the following command on one of the log files of your topic
> and attach the output?
>
> bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files
> /tmp/kafka-logs/testtopic-0/00000000000000000000.log
>
> Thanks,
>
> Jun
>
>
> On Tue, Jul 9, 2013 at 3:23 PM, Scott Wang <
> scott.wang@rumbleentertainment.com> wrote:
>
> > Another piece of information, the snappy compression also does not work.
> >
> > Thanks,
> > Scott
> >
> >
> > On Tue, Jul 9, 2013 at 11:07 AM, Scott Wang <
> > scott.wang@rumbleentertainment.com> wrote:
> >
> > > I just try it and it still not showing up, thanks for looking into
> this.
> > >
> > > Thanks,
> > > Scott
> > >
> > >
> > > On Tue, Jul 9, 2013 at 8:06 AM, Jun Rao <ju...@gmail.com> wrote:
> > >
> > >> Could you try starting the consumer first (and enable gzip in the
> > >> producer)?
> > >>
> > >> Thanks,
> > >>
> > >> Jun
> > >>
> > >>
> > >> On Mon, Jul 8, 2013 at 9:37 PM, Scott Wang <
> > >> scott.wang@rumbleentertainment.com> wrote:
> > >>
> > >> > No, I did not start the consumer before the producer.  I actually
> > >> started
> > >> > the producer first and nothing showed up in the consumer unless I
> > >> commented
> > >> > out this line -- props.put("compression.codec", "gzip").    If I
> > >> commented
> > >> > out the compression codec, everything just works.
> > >> >
> > >> >
> > >> > On Mon, Jul 8, 2013 at 9:07 PM, Jun Rao <ju...@gmail.com> wrote:
> > >> >
> > >> > > Did you start the consumer before the producer? Be default, the
> > >> consumer
> > >> > > gets only the new data?
> > >> > >
> > >> > > Thanks,
> > >> > >
> > >> > > Jun
> > >> > >
> > >> > >
> > >> > > On Mon, Jul 8, 2013 at 2:53 PM, Scott Wang <
> > >> > > scott.wang@rumbleentertainment.com> wrote:
> > >> > >
> > >> > > > I am testing with Kafka 0.8 beta and having problem of receiving
> > >> > message
> > >> > > in
> > >> > > > consumer.  There is no error so does anyone have any insights.
> > >>  When I
> > >> > > > commented out the "compression.code" everything works fine.
> > >> > > >
> > >> > > > My producer:
> > >> > > > public class TestKafka08Prod {
> > >> > > >
> > >> > > >     public static void main(String [] args) {
> > >> > > >
> > >> > > >         Producer<Integer, String> producer = null;
> > >> > > >         try {
> > >> > > >             Properties props = new Properties();
> > >> > > >             props.put("metadata.broker.list", "localhost:9092");
> > >> > > >             props.put("serializer.class",
> > >> > > > "kafka.serializer.StringEncoder");
> > >> > > >             props.put("producer.type", "sync");
> > >> > > >             props.put("request.required.acks","1");
> > >> > > >             props.put("compression.codec", "gzip");
> > >> > > >             ProducerConfig config = new ProducerConfig(props);
> > >> > > >             producer = new Producer<Integer, String>(config);
> > >> > > >             int j=0;
> > >> > > >             for(int i=0; i<10; i++) {
> > >> > > >                 KeyedMessage<Integer, String> data = new
> > >> > > > KeyedMessage<Integer, String>("test-topic", "test-message: "+i+"
> > >> > > > "+System.currentTimeMillis());
> > >> > > >                 producer.send(data);
> > >> > > >
> > >> > > >             }
> > >> > > >
> > >> > > >         } catch (Exception e) {
> > >> > > >             System.out.println("Error happened: ");
> > >> > > >             e.printStackTrace();
> > >> > > >         } finally {
> > >> > > >             if(null != null) {
> > >> > > >                 producer.close();
> > >> > > >             }
> > >> > > >
> > >> > > >             System.out.println("Ened of Sending");
> > >> > > >         }
> > >> > > >
> > >> > > >         System.exit(0);
> > >> > > >     }
> > >> > > > }
> > >> > > >
> > >> > > >
> > >> > > > My consumer:
> > >> > > >
> > >> > > > public class TestKafka08Consumer {
> > >> > > >     public static void main(String [] args) throws
> > >> > UnknownHostException,
> > >> > > > SocketException {
> > >> > > >
> > >> > > >         Properties props = new Properties();
> > >> > > >         props.put("zookeeper.connect",
> > "localhost:2181/kafka_0_8");
> > >> > > >         props.put("group.id", "test08ConsumerId");
> > >> > > >         props.put("zk.sessiontimeout.ms", "4000");
> > >> > > >         props.put("zk.synctime.ms", "2000");
> > >> > > >         props.put("autocommit.interval.ms", "1000");
> > >> > > >
> > >> > > >         ConsumerConfig consumerConfig = new
> ConsumerConfig(props);
> > >> > > >
> > >> > > >         ConsumerConnector consumerConnector =
> > >> > > >
> > kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig);
> > >> > > >
> > >> > > >         String topic = "test-topic";
> > >> > > >         Map<String, Integer> topicCountMap = new HashMap<String,
> > >> > > > Integer>();
> > >> > > >         topicCountMap.put(topic, new Integer(1));
> > >> > > >         Map<String, List<KafkaStream<byte[], byte[]>>>
> > consumerMap =
> > >> > > > consumerConnector.createMessageStreams(topicCountMap);
> > >> > > >         KafkaStream<byte[], byte[]> stream =
> > >> > > >  consumerMap.get(topic).get(0);
> > >> > > >
> > >> > > >         ConsumerIterator<byte[], byte[]> it = stream.iterator();
> > >> > > >
> > >> > > >         int counter=0;
> > >> > > >         while(it.hasNext()) {
> > >> > > >             try {
> > >> > > >                 String fromPlatform = new
> > >> String(it.next().message());
> > >> > > >                 System.out.println("The messages:
> "+fromPlatform);
> > >> > > >             } catch(Exception e) {
> > >> > > >                 e.printStackTrace();
> > >> > > >             }
> > >> > > >         }
> > >> > > >         System.out.println("SystemOut");
> > >> > > >     }
> > >> > > > }
> > >> > > >
> > >> > > >
> > >> > > > Thanks
> > >> > > >
> > >> > >
> > >> >
> > >>
> > >
> > >
> >
>

Re: having problem with 0.8 gzip compression

Posted by Jun Rao <ju...@gmail.com>.
Could you run the following command on one of the log files of your topic
and attach the output?

bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files
/tmp/kafka-logs/testtopic-0/00000000000000000000.log

Thanks,

Jun


On Tue, Jul 9, 2013 at 3:23 PM, Scott Wang <
scott.wang@rumbleentertainment.com> wrote:

> Another piece of information, the snappy compression also does not work.
>
> Thanks,
> Scott
>
>
> On Tue, Jul 9, 2013 at 11:07 AM, Scott Wang <
> scott.wang@rumbleentertainment.com> wrote:
>
> > I just try it and it still not showing up, thanks for looking into this.
> >
> > Thanks,
> > Scott
> >
> >
> > On Tue, Jul 9, 2013 at 8:06 AM, Jun Rao <ju...@gmail.com> wrote:
> >
> >> Could you try starting the consumer first (and enable gzip in the
> >> producer)?
> >>
> >> Thanks,
> >>
> >> Jun
> >>
> >>
> >> On Mon, Jul 8, 2013 at 9:37 PM, Scott Wang <
> >> scott.wang@rumbleentertainment.com> wrote:
> >>
> >> > No, I did not start the consumer before the producer.  I actually
> >> started
> >> > the producer first and nothing showed up in the consumer unless I
> >> commented
> >> > out this line -- props.put("compression.codec", "gzip").    If I
> >> commented
> >> > out the compression codec, everything just works.
> >> >
> >> >
> >> > On Mon, Jul 8, 2013 at 9:07 PM, Jun Rao <ju...@gmail.com> wrote:
> >> >
> >> > > Did you start the consumer before the producer? Be default, the
> >> consumer
> >> > > gets only the new data?
> >> > >
> >> > > Thanks,
> >> > >
> >> > > Jun
> >> > >
> >> > >
> >> > > On Mon, Jul 8, 2013 at 2:53 PM, Scott Wang <
> >> > > scott.wang@rumbleentertainment.com> wrote:
> >> > >
> >> > > > I am testing with Kafka 0.8 beta and having problem of receiving
> >> > message
> >> > > in
> >> > > > consumer.  There is no error so does anyone have any insights.
> >>  When I
> >> > > > commented out the "compression.code" everything works fine.
> >> > > >
> >> > > > My producer:
> >> > > > public class TestKafka08Prod {
> >> > > >
> >> > > >     public static void main(String [] args) {
> >> > > >
> >> > > >         Producer<Integer, String> producer = null;
> >> > > >         try {
> >> > > >             Properties props = new Properties();
> >> > > >             props.put("metadata.broker.list", "localhost:9092");
> >> > > >             props.put("serializer.class",
> >> > > > "kafka.serializer.StringEncoder");
> >> > > >             props.put("producer.type", "sync");
> >> > > >             props.put("request.required.acks","1");
> >> > > >             props.put("compression.codec", "gzip");
> >> > > >             ProducerConfig config = new ProducerConfig(props);
> >> > > >             producer = new Producer<Integer, String>(config);
> >> > > >             int j=0;
> >> > > >             for(int i=0; i<10; i++) {
> >> > > >                 KeyedMessage<Integer, String> data = new
> >> > > > KeyedMessage<Integer, String>("test-topic", "test-message: "+i+"
> >> > > > "+System.currentTimeMillis());
> >> > > >                 producer.send(data);
> >> > > >
> >> > > >             }
> >> > > >
> >> > > >         } catch (Exception e) {
> >> > > >             System.out.println("Error happened: ");
> >> > > >             e.printStackTrace();
> >> > > >         } finally {
> >> > > >             if(null != null) {
> >> > > >                 producer.close();
> >> > > >             }
> >> > > >
> >> > > >             System.out.println("Ened of Sending");
> >> > > >         }
> >> > > >
> >> > > >         System.exit(0);
> >> > > >     }
> >> > > > }
> >> > > >
> >> > > >
> >> > > > My consumer:
> >> > > >
> >> > > > public class TestKafka08Consumer {
> >> > > >     public static void main(String [] args) throws
> >> > UnknownHostException,
> >> > > > SocketException {
> >> > > >
> >> > > >         Properties props = new Properties();
> >> > > >         props.put("zookeeper.connect",
> "localhost:2181/kafka_0_8");
> >> > > >         props.put("group.id", "test08ConsumerId");
> >> > > >         props.put("zk.sessiontimeout.ms", "4000");
> >> > > >         props.put("zk.synctime.ms", "2000");
> >> > > >         props.put("autocommit.interval.ms", "1000");
> >> > > >
> >> > > >         ConsumerConfig consumerConfig = new ConsumerConfig(props);
> >> > > >
> >> > > >         ConsumerConnector consumerConnector =
> >> > > >
> kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig);
> >> > > >
> >> > > >         String topic = "test-topic";
> >> > > >         Map<String, Integer> topicCountMap = new HashMap<String,
> >> > > > Integer>();
> >> > > >         topicCountMap.put(topic, new Integer(1));
> >> > > >         Map<String, List<KafkaStream<byte[], byte[]>>>
> consumerMap =
> >> > > > consumerConnector.createMessageStreams(topicCountMap);
> >> > > >         KafkaStream<byte[], byte[]> stream =
> >> > > >  consumerMap.get(topic).get(0);
> >> > > >
> >> > > >         ConsumerIterator<byte[], byte[]> it = stream.iterator();
> >> > > >
> >> > > >         int counter=0;
> >> > > >         while(it.hasNext()) {
> >> > > >             try {
> >> > > >                 String fromPlatform = new
> >> String(it.next().message());
> >> > > >                 System.out.println("The messages: "+fromPlatform);
> >> > > >             } catch(Exception e) {
> >> > > >                 e.printStackTrace();
> >> > > >             }
> >> > > >         }
> >> > > >         System.out.println("SystemOut");
> >> > > >     }
> >> > > > }
> >> > > >
> >> > > >
> >> > > > Thanks
> >> > > >
> >> > >
> >> >
> >>
> >
> >
>

Re: having problem with 0.8 gzip compression

Posted by Scott Wang <sc...@rumbleentertainment.com>.
Another piece of information, the snappy compression also does not work.

Thanks,
Scott


On Tue, Jul 9, 2013 at 11:07 AM, Scott Wang <
scott.wang@rumbleentertainment.com> wrote:

> I just try it and it still not showing up, thanks for looking into this.
>
> Thanks,
> Scott
>
>
> On Tue, Jul 9, 2013 at 8:06 AM, Jun Rao <ju...@gmail.com> wrote:
>
>> Could you try starting the consumer first (and enable gzip in the
>> producer)?
>>
>> Thanks,
>>
>> Jun
>>
>>
>> On Mon, Jul 8, 2013 at 9:37 PM, Scott Wang <
>> scott.wang@rumbleentertainment.com> wrote:
>>
>> > No, I did not start the consumer before the producer.  I actually
>> started
>> > the producer first and nothing showed up in the consumer unless I
>> commented
>> > out this line -- props.put("compression.codec", "gzip").    If I
>> commented
>> > out the compression codec, everything just works.
>> >
>> >
>> > On Mon, Jul 8, 2013 at 9:07 PM, Jun Rao <ju...@gmail.com> wrote:
>> >
>> > > Did you start the consumer before the producer? Be default, the
>> consumer
>> > > gets only the new data?
>> > >
>> > > Thanks,
>> > >
>> > > Jun
>> > >
>> > >
>> > > On Mon, Jul 8, 2013 at 2:53 PM, Scott Wang <
>> > > scott.wang@rumbleentertainment.com> wrote:
>> > >
>> > > > I am testing with Kafka 0.8 beta and having problem of receiving
>> > message
>> > > in
>> > > > consumer.  There is no error so does anyone have any insights.
>>  When I
>> > > > commented out the "compression.code" everything works fine.
>> > > >
>> > > > My producer:
>> > > > public class TestKafka08Prod {
>> > > >
>> > > >     public static void main(String [] args) {
>> > > >
>> > > >         Producer<Integer, String> producer = null;
>> > > >         try {
>> > > >             Properties props = new Properties();
>> > > >             props.put("metadata.broker.list", "localhost:9092");
>> > > >             props.put("serializer.class",
>> > > > "kafka.serializer.StringEncoder");
>> > > >             props.put("producer.type", "sync");
>> > > >             props.put("request.required.acks","1");
>> > > >             props.put("compression.codec", "gzip");
>> > > >             ProducerConfig config = new ProducerConfig(props);
>> > > >             producer = new Producer<Integer, String>(config);
>> > > >             int j=0;
>> > > >             for(int i=0; i<10; i++) {
>> > > >                 KeyedMessage<Integer, String> data = new
>> > > > KeyedMessage<Integer, String>("test-topic", "test-message: "+i+"
>> > > > "+System.currentTimeMillis());
>> > > >                 producer.send(data);
>> > > >
>> > > >             }
>> > > >
>> > > >         } catch (Exception e) {
>> > > >             System.out.println("Error happened: ");
>> > > >             e.printStackTrace();
>> > > >         } finally {
>> > > >             if(null != null) {
>> > > >                 producer.close();
>> > > >             }
>> > > >
>> > > >             System.out.println("Ened of Sending");
>> > > >         }
>> > > >
>> > > >         System.exit(0);
>> > > >     }
>> > > > }
>> > > >
>> > > >
>> > > > My consumer:
>> > > >
>> > > > public class TestKafka08Consumer {
>> > > >     public static void main(String [] args) throws
>> > UnknownHostException,
>> > > > SocketException {
>> > > >
>> > > >         Properties props = new Properties();
>> > > >         props.put("zookeeper.connect", "localhost:2181/kafka_0_8");
>> > > >         props.put("group.id", "test08ConsumerId");
>> > > >         props.put("zk.sessiontimeout.ms", "4000");
>> > > >         props.put("zk.synctime.ms", "2000");
>> > > >         props.put("autocommit.interval.ms", "1000");
>> > > >
>> > > >         ConsumerConfig consumerConfig = new ConsumerConfig(props);
>> > > >
>> > > >         ConsumerConnector consumerConnector =
>> > > > kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig);
>> > > >
>> > > >         String topic = "test-topic";
>> > > >         Map<String, Integer> topicCountMap = new HashMap<String,
>> > > > Integer>();
>> > > >         topicCountMap.put(topic, new Integer(1));
>> > > >         Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
>> > > > consumerConnector.createMessageStreams(topicCountMap);
>> > > >         KafkaStream<byte[], byte[]> stream =
>> > > >  consumerMap.get(topic).get(0);
>> > > >
>> > > >         ConsumerIterator<byte[], byte[]> it = stream.iterator();
>> > > >
>> > > >         int counter=0;
>> > > >         while(it.hasNext()) {
>> > > >             try {
>> > > >                 String fromPlatform = new
>> String(it.next().message());
>> > > >                 System.out.println("The messages: "+fromPlatform);
>> > > >             } catch(Exception e) {
>> > > >                 e.printStackTrace();
>> > > >             }
>> > > >         }
>> > > >         System.out.println("SystemOut");
>> > > >     }
>> > > > }
>> > > >
>> > > >
>> > > > Thanks
>> > > >
>> > >
>> >
>>
>
>

Re: having problem with 0.8 gzip compression

Posted by Scott Wang <sc...@rumbleentertainment.com>.
I just try it and it still not showing up, thanks for looking into this.

Thanks,
Scott


On Tue, Jul 9, 2013 at 8:06 AM, Jun Rao <ju...@gmail.com> wrote:

> Could you try starting the consumer first (and enable gzip in the
> producer)?
>
> Thanks,
>
> Jun
>
>
> On Mon, Jul 8, 2013 at 9:37 PM, Scott Wang <
> scott.wang@rumbleentertainment.com> wrote:
>
> > No, I did not start the consumer before the producer.  I actually started
> > the producer first and nothing showed up in the consumer unless I
> commented
> > out this line -- props.put("compression.codec", "gzip").    If I
> commented
> > out the compression codec, everything just works.
> >
> >
> > On Mon, Jul 8, 2013 at 9:07 PM, Jun Rao <ju...@gmail.com> wrote:
> >
> > > Did you start the consumer before the producer? Be default, the
> consumer
> > > gets only the new data?
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > >
> > > On Mon, Jul 8, 2013 at 2:53 PM, Scott Wang <
> > > scott.wang@rumbleentertainment.com> wrote:
> > >
> > > > I am testing with Kafka 0.8 beta and having problem of receiving
> > message
> > > in
> > > > consumer.  There is no error so does anyone have any insights.  When
> I
> > > > commented out the "compression.code" everything works fine.
> > > >
> > > > My producer:
> > > > public class TestKafka08Prod {
> > > >
> > > >     public static void main(String [] args) {
> > > >
> > > >         Producer<Integer, String> producer = null;
> > > >         try {
> > > >             Properties props = new Properties();
> > > >             props.put("metadata.broker.list", "localhost:9092");
> > > >             props.put("serializer.class",
> > > > "kafka.serializer.StringEncoder");
> > > >             props.put("producer.type", "sync");
> > > >             props.put("request.required.acks","1");
> > > >             props.put("compression.codec", "gzip");
> > > >             ProducerConfig config = new ProducerConfig(props);
> > > >             producer = new Producer<Integer, String>(config);
> > > >             int j=0;
> > > >             for(int i=0; i<10; i++) {
> > > >                 KeyedMessage<Integer, String> data = new
> > > > KeyedMessage<Integer, String>("test-topic", "test-message: "+i+"
> > > > "+System.currentTimeMillis());
> > > >                 producer.send(data);
> > > >
> > > >             }
> > > >
> > > >         } catch (Exception e) {
> > > >             System.out.println("Error happened: ");
> > > >             e.printStackTrace();
> > > >         } finally {
> > > >             if(null != null) {
> > > >                 producer.close();
> > > >             }
> > > >
> > > >             System.out.println("Ened of Sending");
> > > >         }
> > > >
> > > >         System.exit(0);
> > > >     }
> > > > }
> > > >
> > > >
> > > > My consumer:
> > > >
> > > > public class TestKafka08Consumer {
> > > >     public static void main(String [] args) throws
> > UnknownHostException,
> > > > SocketException {
> > > >
> > > >         Properties props = new Properties();
> > > >         props.put("zookeeper.connect", "localhost:2181/kafka_0_8");
> > > >         props.put("group.id", "test08ConsumerId");
> > > >         props.put("zk.sessiontimeout.ms", "4000");
> > > >         props.put("zk.synctime.ms", "2000");
> > > >         props.put("autocommit.interval.ms", "1000");
> > > >
> > > >         ConsumerConfig consumerConfig = new ConsumerConfig(props);
> > > >
> > > >         ConsumerConnector consumerConnector =
> > > > kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig);
> > > >
> > > >         String topic = "test-topic";
> > > >         Map<String, Integer> topicCountMap = new HashMap<String,
> > > > Integer>();
> > > >         topicCountMap.put(topic, new Integer(1));
> > > >         Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
> > > > consumerConnector.createMessageStreams(topicCountMap);
> > > >         KafkaStream<byte[], byte[]> stream =
> > > >  consumerMap.get(topic).get(0);
> > > >
> > > >         ConsumerIterator<byte[], byte[]> it = stream.iterator();
> > > >
> > > >         int counter=0;
> > > >         while(it.hasNext()) {
> > > >             try {
> > > >                 String fromPlatform = new
> String(it.next().message());
> > > >                 System.out.println("The messages: "+fromPlatform);
> > > >             } catch(Exception e) {
> > > >                 e.printStackTrace();
> > > >             }
> > > >         }
> > > >         System.out.println("SystemOut");
> > > >     }
> > > > }
> > > >
> > > >
> > > > Thanks
> > > >
> > >
> >
>

Re: having problem with 0.8 gzip compression

Posted by Jun Rao <ju...@gmail.com>.
Could you try starting the consumer first (and enable gzip in the producer)?

Thanks,

Jun


On Mon, Jul 8, 2013 at 9:37 PM, Scott Wang <
scott.wang@rumbleentertainment.com> wrote:

> No, I did not start the consumer before the producer.  I actually started
> the producer first and nothing showed up in the consumer unless I commented
> out this line -- props.put("compression.codec", "gzip").    If I commented
> out the compression codec, everything just works.
>
>
> On Mon, Jul 8, 2013 at 9:07 PM, Jun Rao <ju...@gmail.com> wrote:
>
> > Did you start the consumer before the producer? Be default, the consumer
> > gets only the new data?
> >
> > Thanks,
> >
> > Jun
> >
> >
> > On Mon, Jul 8, 2013 at 2:53 PM, Scott Wang <
> > scott.wang@rumbleentertainment.com> wrote:
> >
> > > I am testing with Kafka 0.8 beta and having problem of receiving
> message
> > in
> > > consumer.  There is no error so does anyone have any insights.  When I
> > > commented out the "compression.code" everything works fine.
> > >
> > > My producer:
> > > public class TestKafka08Prod {
> > >
> > >     public static void main(String [] args) {
> > >
> > >         Producer<Integer, String> producer = null;
> > >         try {
> > >             Properties props = new Properties();
> > >             props.put("metadata.broker.list", "localhost:9092");
> > >             props.put("serializer.class",
> > > "kafka.serializer.StringEncoder");
> > >             props.put("producer.type", "sync");
> > >             props.put("request.required.acks","1");
> > >             props.put("compression.codec", "gzip");
> > >             ProducerConfig config = new ProducerConfig(props);
> > >             producer = new Producer<Integer, String>(config);
> > >             int j=0;
> > >             for(int i=0; i<10; i++) {
> > >                 KeyedMessage<Integer, String> data = new
> > > KeyedMessage<Integer, String>("test-topic", "test-message: "+i+"
> > > "+System.currentTimeMillis());
> > >                 producer.send(data);
> > >
> > >             }
> > >
> > >         } catch (Exception e) {
> > >             System.out.println("Error happened: ");
> > >             e.printStackTrace();
> > >         } finally {
> > >             if(null != null) {
> > >                 producer.close();
> > >             }
> > >
> > >             System.out.println("Ened of Sending");
> > >         }
> > >
> > >         System.exit(0);
> > >     }
> > > }
> > >
> > >
> > > My consumer:
> > >
> > > public class TestKafka08Consumer {
> > >     public static void main(String [] args) throws
> UnknownHostException,
> > > SocketException {
> > >
> > >         Properties props = new Properties();
> > >         props.put("zookeeper.connect", "localhost:2181/kafka_0_8");
> > >         props.put("group.id", "test08ConsumerId");
> > >         props.put("zk.sessiontimeout.ms", "4000");
> > >         props.put("zk.synctime.ms", "2000");
> > >         props.put("autocommit.interval.ms", "1000");
> > >
> > >         ConsumerConfig consumerConfig = new ConsumerConfig(props);
> > >
> > >         ConsumerConnector consumerConnector =
> > > kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig);
> > >
> > >         String topic = "test-topic";
> > >         Map<String, Integer> topicCountMap = new HashMap<String,
> > > Integer>();
> > >         topicCountMap.put(topic, new Integer(1));
> > >         Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
> > > consumerConnector.createMessageStreams(topicCountMap);
> > >         KafkaStream<byte[], byte[]> stream =
> > >  consumerMap.get(topic).get(0);
> > >
> > >         ConsumerIterator<byte[], byte[]> it = stream.iterator();
> > >
> > >         int counter=0;
> > >         while(it.hasNext()) {
> > >             try {
> > >                 String fromPlatform = new String(it.next().message());
> > >                 System.out.println("The messages: "+fromPlatform);
> > >             } catch(Exception e) {
> > >                 e.printStackTrace();
> > >             }
> > >         }
> > >         System.out.println("SystemOut");
> > >     }
> > > }
> > >
> > >
> > > Thanks
> > >
> >
>

Re: having problem with 0.8 gzip compression

Posted by Scott Wang <sc...@rumbleentertainment.com>.
No, I did not start the consumer before the producer.  I actually started
the producer first and nothing showed up in the consumer unless I commented
out this line -- props.put("compression.codec", "gzip").    If I commented
out the compression codec, everything just works.


On Mon, Jul 8, 2013 at 9:07 PM, Jun Rao <ju...@gmail.com> wrote:

> Did you start the consumer before the producer? Be default, the consumer
> gets only the new data?
>
> Thanks,
>
> Jun
>
>
> On Mon, Jul 8, 2013 at 2:53 PM, Scott Wang <
> scott.wang@rumbleentertainment.com> wrote:
>
> > I am testing with Kafka 0.8 beta and having problem of receiving message
> in
> > consumer.  There is no error so does anyone have any insights.  When I
> > commented out the "compression.code" everything works fine.
> >
> > My producer:
> > public class TestKafka08Prod {
> >
> >     public static void main(String [] args) {
> >
> >         Producer<Integer, String> producer = null;
> >         try {
> >             Properties props = new Properties();
> >             props.put("metadata.broker.list", "localhost:9092");
> >             props.put("serializer.class",
> > "kafka.serializer.StringEncoder");
> >             props.put("producer.type", "sync");
> >             props.put("request.required.acks","1");
> >             props.put("compression.codec", "gzip");
> >             ProducerConfig config = new ProducerConfig(props);
> >             producer = new Producer<Integer, String>(config);
> >             int j=0;
> >             for(int i=0; i<10; i++) {
> >                 KeyedMessage<Integer, String> data = new
> > KeyedMessage<Integer, String>("test-topic", "test-message: "+i+"
> > "+System.currentTimeMillis());
> >                 producer.send(data);
> >
> >             }
> >
> >         } catch (Exception e) {
> >             System.out.println("Error happened: ");
> >             e.printStackTrace();
> >         } finally {
> >             if(null != null) {
> >                 producer.close();
> >             }
> >
> >             System.out.println("Ened of Sending");
> >         }
> >
> >         System.exit(0);
> >     }
> > }
> >
> >
> > My consumer:
> >
> > public class TestKafka08Consumer {
> >     public static void main(String [] args) throws UnknownHostException,
> > SocketException {
> >
> >         Properties props = new Properties();
> >         props.put("zookeeper.connect", "localhost:2181/kafka_0_8");
> >         props.put("group.id", "test08ConsumerId");
> >         props.put("zk.sessiontimeout.ms", "4000");
> >         props.put("zk.synctime.ms", "2000");
> >         props.put("autocommit.interval.ms", "1000");
> >
> >         ConsumerConfig consumerConfig = new ConsumerConfig(props);
> >
> >         ConsumerConnector consumerConnector =
> > kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig);
> >
> >         String topic = "test-topic";
> >         Map<String, Integer> topicCountMap = new HashMap<String,
> > Integer>();
> >         topicCountMap.put(topic, new Integer(1));
> >         Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
> > consumerConnector.createMessageStreams(topicCountMap);
> >         KafkaStream<byte[], byte[]> stream =
> >  consumerMap.get(topic).get(0);
> >
> >         ConsumerIterator<byte[], byte[]> it = stream.iterator();
> >
> >         int counter=0;
> >         while(it.hasNext()) {
> >             try {
> >                 String fromPlatform = new String(it.next().message());
> >                 System.out.println("The messages: "+fromPlatform);
> >             } catch(Exception e) {
> >                 e.printStackTrace();
> >             }
> >         }
> >         System.out.println("SystemOut");
> >     }
> > }
> >
> >
> > Thanks
> >
>

Re: having problem with 0.8 gzip compression

Posted by Jun Rao <ju...@gmail.com>.
Did you start the consumer before the producer? Be default, the consumer
gets only the new data?

Thanks,

Jun


On Mon, Jul 8, 2013 at 2:53 PM, Scott Wang <
scott.wang@rumbleentertainment.com> wrote:

> I am testing with Kafka 0.8 beta and having problem of receiving message in
> consumer.  There is no error so does anyone have any insights.  When I
> commented out the "compression.code" everything works fine.
>
> My producer:
> public class TestKafka08Prod {
>
>     public static void main(String [] args) {
>
>         Producer<Integer, String> producer = null;
>         try {
>             Properties props = new Properties();
>             props.put("metadata.broker.list", "localhost:9092");
>             props.put("serializer.class",
> "kafka.serializer.StringEncoder");
>             props.put("producer.type", "sync");
>             props.put("request.required.acks","1");
>             props.put("compression.codec", "gzip");
>             ProducerConfig config = new ProducerConfig(props);
>             producer = new Producer<Integer, String>(config);
>             int j=0;
>             for(int i=0; i<10; i++) {
>                 KeyedMessage<Integer, String> data = new
> KeyedMessage<Integer, String>("test-topic", "test-message: "+i+"
> "+System.currentTimeMillis());
>                 producer.send(data);
>
>             }
>
>         } catch (Exception e) {
>             System.out.println("Error happened: ");
>             e.printStackTrace();
>         } finally {
>             if(null != null) {
>                 producer.close();
>             }
>
>             System.out.println("Ened of Sending");
>         }
>
>         System.exit(0);
>     }
> }
>
>
> My consumer:
>
> public class TestKafka08Consumer {
>     public static void main(String [] args) throws UnknownHostException,
> SocketException {
>
>         Properties props = new Properties();
>         props.put("zookeeper.connect", "localhost:2181/kafka_0_8");
>         props.put("group.id", "test08ConsumerId");
>         props.put("zk.sessiontimeout.ms", "4000");
>         props.put("zk.synctime.ms", "2000");
>         props.put("autocommit.interval.ms", "1000");
>
>         ConsumerConfig consumerConfig = new ConsumerConfig(props);
>
>         ConsumerConnector consumerConnector =
> kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig);
>
>         String topic = "test-topic";
>         Map<String, Integer> topicCountMap = new HashMap<String,
> Integer>();
>         topicCountMap.put(topic, new Integer(1));
>         Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
> consumerConnector.createMessageStreams(topicCountMap);
>         KafkaStream<byte[], byte[]> stream =
>  consumerMap.get(topic).get(0);
>
>         ConsumerIterator<byte[], byte[]> it = stream.iterator();
>
>         int counter=0;
>         while(it.hasNext()) {
>             try {
>                 String fromPlatform = new String(it.next().message());
>                 System.out.println("The messages: "+fromPlatform);
>             } catch(Exception e) {
>                 e.printStackTrace();
>             }
>         }
>         System.out.println("SystemOut");
>     }
> }
>
>
> Thanks
>