You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by Scott Wang <sc...@rumbleentertainment.com> on 2013/07/08 23:53:52 UTC
having problem with 0.8 gzip compression
I am testing with Kafka 0.8 beta and having problem of receiving message in
consumer. There is no error so does anyone have any insights. When I
commented out the "compression.code" everything works fine.
My producer:
public class TestKafka08Prod {
public static void main(String [] args) {
Producer<Integer, String> producer = null;
try {
Properties props = new Properties();
props.put("metadata.broker.list", "localhost:9092");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("producer.type", "sync");
props.put("request.required.acks","1");
props.put("compression.codec", "gzip");
ProducerConfig config = new ProducerConfig(props);
producer = new Producer<Integer, String>(config);
int j=0;
for(int i=0; i<10; i++) {
KeyedMessage<Integer, String> data = new
KeyedMessage<Integer, String>("test-topic", "test-message: "+i+"
"+System.currentTimeMillis());
producer.send(data);
}
} catch (Exception e) {
System.out.println("Error happened: ");
e.printStackTrace();
} finally {
if(null != null) {
producer.close();
}
System.out.println("Ened of Sending");
}
System.exit(0);
}
}
My consumer:
public class TestKafka08Consumer {
public static void main(String [] args) throws UnknownHostException,
SocketException {
Properties props = new Properties();
props.put("zookeeper.connect", "localhost:2181/kafka_0_8");
props.put("group.id", "test08ConsumerId");
props.put("zk.sessiontimeout.ms", "4000");
props.put("zk.synctime.ms", "2000");
props.put("autocommit.interval.ms", "1000");
ConsumerConfig consumerConfig = new ConsumerConfig(props);
ConsumerConnector consumerConnector =
kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig);
String topic = "test-topic";
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(1));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
consumerConnector.createMessageStreams(topicCountMap);
KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);
ConsumerIterator<byte[], byte[]> it = stream.iterator();
int counter=0;
while(it.hasNext()) {
try {
String fromPlatform = new String(it.next().message());
System.out.println("The messages: "+fromPlatform);
} catch(Exception e) {
e.printStackTrace();
}
}
System.out.println("SystemOut");
}
}
Thanks
Fwd: having problem with 0.8 gzip compression
Posted by Scott Wang <sc...@rumbleentertainment.com>.
I am testing with Kafka 0.8 beta and having problem of receiving message in
consumer. There is no error so does anyone have any insights. When I
commented out the "compression.code" everything works fine.
My producer:
public class TestKafka08Prod {
public static void main(String [] args) {
Producer<Integer, String> producer = null;
try {
Properties props = new Properties();
props.put("metadata.broker.list", "localhost:9092");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("producer.type", "sync");
props.put("request.required.acks","1");
props.put("compression.codec", "gzip");
ProducerConfig config = new ProducerConfig(props);
producer = new Producer<Integer, String>(config);
int j=0;
for(int i=0; i<10; i++) {
KeyedMessage<Integer, String> data = new
KeyedMessage<Integer, String>("test-topic", "test-message: "+i+"
"+System.currentTimeMillis());
producer.send(data);
}
} catch (Exception e) {
System.out.println("Error happened: ");
e.printStackTrace();
} finally {
if(null != null) {
producer.close();
}
System.out.println("Ened of Sending");
}
System.exit(0);
}
}
My consumer:
public class TestKafka08Consumer {
public static void main(String [] args) throws UnknownHostException,
SocketException {
Properties props = new Properties();
props.put("zookeeper.connect", "localhost:2181/kafka_0_8");
props.put("group.id", "test08ConsumerId");
props.put("zk.sessiontimeout.ms", "4000");
props.put("zk.synctime.ms", "2000");
props.put("autocommit.interval.ms", "1000");
ConsumerConfig consumerConfig = new ConsumerConfig(props);
ConsumerConnector consumerConnector =
kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig);
String topic = "test-topic";
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(1));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
consumerConnector.createMessageStreams(topicCountMap);
KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);
ConsumerIterator<byte[], byte[]> it = stream.iterator();
int counter=0;
while(it.hasNext()) {
try {
String fromPlatform = new String(it.next().message());
System.out.println("The messages: "+fromPlatform);
} catch(Exception e) {
e.printStackTrace();
}
}
System.out.println("SystemOut");
}
}
Thanks
Re: having problem with 0.8 gzip compression
Posted by Scott Wang <sc...@rumbleentertainment.com>.
Ok, the problem solved, I think it might be because some of the jar files
that I was using were "OLD". I was building the producer and consumer
under the 0.7 environment except swapping out the kafka jar file. Now, I
created a whole new environment and pull in all the jar files from the
0.8. That seems to solve my 0.8 gzip problem. Thank you for all the
help.
Re: having problem with 0.8 gzip compression
Posted by Scott Wang <sc...@rumbleentertainment.com>.
Joel,
Would you mind point me to how I would be able to enable the trace logs in
the producer and broker?
Thanks,
Scott
On Wed, Jul 10, 2013 at 5:33 PM, Joel Koshy <jj...@gmail.com> wrote:
> Weird - I tried your exact code and it worked for me (although I was
> using 0.8 head and not the beta). Can you re-run with trace logs
> enabled in your producer and paste that output? Broker logs also if
> you can?
>
> Thanks,
>
> Joel
>
> On Wed, Jul 10, 2013 at 10:23 AM, Scott Wang
> <sc...@rumbleentertainment.com> wrote:
> > Jun,
> >
> > I did a test this morning and got a very interesting result with you
> > command. I started by wipe all the log files and clean up all zookeeper
> > data files.
> >
> > Once I restarted both server, producer and consumer then execute your
> > command, what I got is a empty log as following:
> >
> > Dumping /Users/scott/Temp/kafka/test-topic-0/00000000000000000000.log
> > Starting offset: 0
> >
> > One observation, the 00000000000000000000.index file was getting huge but
> > there was nothing in 00000000000000000000.log file.
> >
> > Thanks,
> > Scott
> >
> >
> >
> >
> > On Tue, Jul 9, 2013 at 8:40 PM, Jun Rao <ju...@gmail.com> wrote:
> >
> >> Could you run the following command on one of the log files of your
> topic
> >> and attach the output?
> >>
> >> bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files
> >> /tmp/kafka-logs/testtopic-0/00000000000000000000.log
> >>
> >> Thanks,
> >>
> >> Jun
> >>
> >>
> >> On Tue, Jul 9, 2013 at 3:23 PM, Scott Wang <
> >> scott.wang@rumbleentertainment.com> wrote:
> >>
> >> > Another piece of information, the snappy compression also does not
> work.
> >> >
> >> > Thanks,
> >> > Scott
> >> >
> >> >
> >> > On Tue, Jul 9, 2013 at 11:07 AM, Scott Wang <
> >> > scott.wang@rumbleentertainment.com> wrote:
> >> >
> >> > > I just try it and it still not showing up, thanks for looking into
> >> this.
> >> > >
> >> > > Thanks,
> >> > > Scott
> >> > >
> >> > >
> >> > > On Tue, Jul 9, 2013 at 8:06 AM, Jun Rao <ju...@gmail.com> wrote:
> >> > >
> >> > >> Could you try starting the consumer first (and enable gzip in the
> >> > >> producer)?
> >> > >>
> >> > >> Thanks,
> >> > >>
> >> > >> Jun
> >> > >>
> >> > >>
> >> > >> On Mon, Jul 8, 2013 at 9:37 PM, Scott Wang <
> >> > >> scott.wang@rumbleentertainment.com> wrote:
> >> > >>
> >> > >> > No, I did not start the consumer before the producer. I actually
> >> > >> started
> >> > >> > the producer first and nothing showed up in the consumer unless I
> >> > >> commented
> >> > >> > out this line -- props.put("compression.codec", "gzip"). If I
> >> > >> commented
> >> > >> > out the compression codec, everything just works.
> >> > >> >
> >> > >> >
> >> > >> > On Mon, Jul 8, 2013 at 9:07 PM, Jun Rao <ju...@gmail.com>
> wrote:
> >> > >> >
> >> > >> > > Did you start the consumer before the producer? Be default, the
> >> > >> consumer
> >> > >> > > gets only the new data?
> >> > >> > >
> >> > >> > > Thanks,
> >> > >> > >
> >> > >> > > Jun
> >> > >> > >
> >> > >> > >
> >> > >> > > On Mon, Jul 8, 2013 at 2:53 PM, Scott Wang <
> >> > >> > > scott.wang@rumbleentertainment.com> wrote:
> >> > >> > >
> >> > >> > > > I am testing with Kafka 0.8 beta and having problem of
> receiving
> >> > >> > message
> >> > >> > > in
> >> > >> > > > consumer. There is no error so does anyone have any
> insights.
> >> > >> When I
> >> > >> > > > commented out the "compression.code" everything works fine.
> >> > >> > > >
> >> > >> > > > My producer:
> >> > >> > > > public class TestKafka08Prod {
> >> > >> > > >
> >> > >> > > > public static void main(String [] args) {
> >> > >> > > >
> >> > >> > > > Producer<Integer, String> producer = null;
> >> > >> > > > try {
> >> > >> > > > Properties props = new Properties();
> >> > >> > > > props.put("metadata.broker.list",
> "localhost:9092");
> >> > >> > > > props.put("serializer.class",
> >> > >> > > > "kafka.serializer.StringEncoder");
> >> > >> > > > props.put("producer.type", "sync");
> >> > >> > > > props.put("request.required.acks","1");
> >> > >> > > > props.put("compression.codec", "gzip");
> >> > >> > > > ProducerConfig config = new
> ProducerConfig(props);
> >> > >> > > > producer = new Producer<Integer, String>(config);
> >> > >> > > > int j=0;
> >> > >> > > > for(int i=0; i<10; i++) {
> >> > >> > > > KeyedMessage<Integer, String> data = new
> >> > >> > > > KeyedMessage<Integer, String>("test-topic", "test-message:
> "+i+"
> >> > >> > > > "+System.currentTimeMillis());
> >> > >> > > > producer.send(data);
> >> > >> > > >
> >> > >> > > > }
> >> > >> > > >
> >> > >> > > > } catch (Exception e) {
> >> > >> > > > System.out.println("Error happened: ");
> >> > >> > > > e.printStackTrace();
> >> > >> > > > } finally {
> >> > >> > > > if(null != null) {
> >> > >> > > > producer.close();
> >> > >> > > > }
> >> > >> > > >
> >> > >> > > > System.out.println("Ened of Sending");
> >> > >> > > > }
> >> > >> > > >
> >> > >> > > > System.exit(0);
> >> > >> > > > }
> >> > >> > > > }
> >> > >> > > >
> >> > >> > > >
> >> > >> > > > My consumer:
> >> > >> > > >
> >> > >> > > > public class TestKafka08Consumer {
> >> > >> > > > public static void main(String [] args) throws
> >> > >> > UnknownHostException,
> >> > >> > > > SocketException {
> >> > >> > > >
> >> > >> > > > Properties props = new Properties();
> >> > >> > > > props.put("zookeeper.connect",
> >> > "localhost:2181/kafka_0_8");
> >> > >> > > > props.put("group.id", "test08ConsumerId");
> >> > >> > > > props.put("zk.sessiontimeout.ms", "4000");
> >> > >> > > > props.put("zk.synctime.ms", "2000");
> >> > >> > > > props.put("autocommit.interval.ms", "1000");
> >> > >> > > >
> >> > >> > > > ConsumerConfig consumerConfig = new
> >> ConsumerConfig(props);
> >> > >> > > >
> >> > >> > > > ConsumerConnector consumerConnector =
> >> > >> > > >
> >> > kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig);
> >> > >> > > >
> >> > >> > > > String topic = "test-topic";
> >> > >> > > > Map<String, Integer> topicCountMap = new
> HashMap<String,
> >> > >> > > > Integer>();
> >> > >> > > > topicCountMap.put(topic, new Integer(1));
> >> > >> > > > Map<String, List<KafkaStream<byte[], byte[]>>>
> >> > consumerMap =
> >> > >> > > > consumerConnector.createMessageStreams(topicCountMap);
> >> > >> > > > KafkaStream<byte[], byte[]> stream =
> >> > >> > > > consumerMap.get(topic).get(0);
> >> > >> > > >
> >> > >> > > > ConsumerIterator<byte[], byte[]> it =
> stream.iterator();
> >> > >> > > >
> >> > >> > > > int counter=0;
> >> > >> > > > while(it.hasNext()) {
> >> > >> > > > try {
> >> > >> > > > String fromPlatform = new
> >> > >> String(it.next().message());
> >> > >> > > > System.out.println("The messages:
> >> "+fromPlatform);
> >> > >> > > > } catch(Exception e) {
> >> > >> > > > e.printStackTrace();
> >> > >> > > > }
> >> > >> > > > }
> >> > >> > > > System.out.println("SystemOut");
> >> > >> > > > }
> >> > >> > > > }
> >> > >> > > >
> >> > >> > > >
> >> > >> > > > Thanks
> >> > >> > > >
> >> > >> > >
> >> > >> >
> >> > >>
> >> > >
> >> > >
> >> >
> >>
>
Re: having problem with 0.8 gzip compression
Posted by Joel Koshy <jj...@gmail.com>.
Weird - I tried your exact code and it worked for me (although I was
using 0.8 head and not the beta). Can you re-run with trace logs
enabled in your producer and paste that output? Broker logs also if
you can?
Thanks,
Joel
On Wed, Jul 10, 2013 at 10:23 AM, Scott Wang
<sc...@rumbleentertainment.com> wrote:
> Jun,
>
> I did a test this morning and got a very interesting result with you
> command. I started by wipe all the log files and clean up all zookeeper
> data files.
>
> Once I restarted both server, producer and consumer then execute your
> command, what I got is a empty log as following:
>
> Dumping /Users/scott/Temp/kafka/test-topic-0/00000000000000000000.log
> Starting offset: 0
>
> One observation, the 00000000000000000000.index file was getting huge but
> there was nothing in 00000000000000000000.log file.
>
> Thanks,
> Scott
>
>
>
>
> On Tue, Jul 9, 2013 at 8:40 PM, Jun Rao <ju...@gmail.com> wrote:
>
>> Could you run the following command on one of the log files of your topic
>> and attach the output?
>>
>> bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files
>> /tmp/kafka-logs/testtopic-0/00000000000000000000.log
>>
>> Thanks,
>>
>> Jun
>>
>>
>> On Tue, Jul 9, 2013 at 3:23 PM, Scott Wang <
>> scott.wang@rumbleentertainment.com> wrote:
>>
>> > Another piece of information, the snappy compression also does not work.
>> >
>> > Thanks,
>> > Scott
>> >
>> >
>> > On Tue, Jul 9, 2013 at 11:07 AM, Scott Wang <
>> > scott.wang@rumbleentertainment.com> wrote:
>> >
>> > > I just try it and it still not showing up, thanks for looking into
>> this.
>> > >
>> > > Thanks,
>> > > Scott
>> > >
>> > >
>> > > On Tue, Jul 9, 2013 at 8:06 AM, Jun Rao <ju...@gmail.com> wrote:
>> > >
>> > >> Could you try starting the consumer first (and enable gzip in the
>> > >> producer)?
>> > >>
>> > >> Thanks,
>> > >>
>> > >> Jun
>> > >>
>> > >>
>> > >> On Mon, Jul 8, 2013 at 9:37 PM, Scott Wang <
>> > >> scott.wang@rumbleentertainment.com> wrote:
>> > >>
>> > >> > No, I did not start the consumer before the producer. I actually
>> > >> started
>> > >> > the producer first and nothing showed up in the consumer unless I
>> > >> commented
>> > >> > out this line -- props.put("compression.codec", "gzip"). If I
>> > >> commented
>> > >> > out the compression codec, everything just works.
>> > >> >
>> > >> >
>> > >> > On Mon, Jul 8, 2013 at 9:07 PM, Jun Rao <ju...@gmail.com> wrote:
>> > >> >
>> > >> > > Did you start the consumer before the producer? Be default, the
>> > >> consumer
>> > >> > > gets only the new data?
>> > >> > >
>> > >> > > Thanks,
>> > >> > >
>> > >> > > Jun
>> > >> > >
>> > >> > >
>> > >> > > On Mon, Jul 8, 2013 at 2:53 PM, Scott Wang <
>> > >> > > scott.wang@rumbleentertainment.com> wrote:
>> > >> > >
>> > >> > > > I am testing with Kafka 0.8 beta and having problem of receiving
>> > >> > message
>> > >> > > in
>> > >> > > > consumer. There is no error so does anyone have any insights.
>> > >> When I
>> > >> > > > commented out the "compression.code" everything works fine.
>> > >> > > >
>> > >> > > > My producer:
>> > >> > > > public class TestKafka08Prod {
>> > >> > > >
>> > >> > > > public static void main(String [] args) {
>> > >> > > >
>> > >> > > > Producer<Integer, String> producer = null;
>> > >> > > > try {
>> > >> > > > Properties props = new Properties();
>> > >> > > > props.put("metadata.broker.list", "localhost:9092");
>> > >> > > > props.put("serializer.class",
>> > >> > > > "kafka.serializer.StringEncoder");
>> > >> > > > props.put("producer.type", "sync");
>> > >> > > > props.put("request.required.acks","1");
>> > >> > > > props.put("compression.codec", "gzip");
>> > >> > > > ProducerConfig config = new ProducerConfig(props);
>> > >> > > > producer = new Producer<Integer, String>(config);
>> > >> > > > int j=0;
>> > >> > > > for(int i=0; i<10; i++) {
>> > >> > > > KeyedMessage<Integer, String> data = new
>> > >> > > > KeyedMessage<Integer, String>("test-topic", "test-message: "+i+"
>> > >> > > > "+System.currentTimeMillis());
>> > >> > > > producer.send(data);
>> > >> > > >
>> > >> > > > }
>> > >> > > >
>> > >> > > > } catch (Exception e) {
>> > >> > > > System.out.println("Error happened: ");
>> > >> > > > e.printStackTrace();
>> > >> > > > } finally {
>> > >> > > > if(null != null) {
>> > >> > > > producer.close();
>> > >> > > > }
>> > >> > > >
>> > >> > > > System.out.println("Ened of Sending");
>> > >> > > > }
>> > >> > > >
>> > >> > > > System.exit(0);
>> > >> > > > }
>> > >> > > > }
>> > >> > > >
>> > >> > > >
>> > >> > > > My consumer:
>> > >> > > >
>> > >> > > > public class TestKafka08Consumer {
>> > >> > > > public static void main(String [] args) throws
>> > >> > UnknownHostException,
>> > >> > > > SocketException {
>> > >> > > >
>> > >> > > > Properties props = new Properties();
>> > >> > > > props.put("zookeeper.connect",
>> > "localhost:2181/kafka_0_8");
>> > >> > > > props.put("group.id", "test08ConsumerId");
>> > >> > > > props.put("zk.sessiontimeout.ms", "4000");
>> > >> > > > props.put("zk.synctime.ms", "2000");
>> > >> > > > props.put("autocommit.interval.ms", "1000");
>> > >> > > >
>> > >> > > > ConsumerConfig consumerConfig = new
>> ConsumerConfig(props);
>> > >> > > >
>> > >> > > > ConsumerConnector consumerConnector =
>> > >> > > >
>> > kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig);
>> > >> > > >
>> > >> > > > String topic = "test-topic";
>> > >> > > > Map<String, Integer> topicCountMap = new HashMap<String,
>> > >> > > > Integer>();
>> > >> > > > topicCountMap.put(topic, new Integer(1));
>> > >> > > > Map<String, List<KafkaStream<byte[], byte[]>>>
>> > consumerMap =
>> > >> > > > consumerConnector.createMessageStreams(topicCountMap);
>> > >> > > > KafkaStream<byte[], byte[]> stream =
>> > >> > > > consumerMap.get(topic).get(0);
>> > >> > > >
>> > >> > > > ConsumerIterator<byte[], byte[]> it = stream.iterator();
>> > >> > > >
>> > >> > > > int counter=0;
>> > >> > > > while(it.hasNext()) {
>> > >> > > > try {
>> > >> > > > String fromPlatform = new
>> > >> String(it.next().message());
>> > >> > > > System.out.println("The messages:
>> "+fromPlatform);
>> > >> > > > } catch(Exception e) {
>> > >> > > > e.printStackTrace();
>> > >> > > > }
>> > >> > > > }
>> > >> > > > System.out.println("SystemOut");
>> > >> > > > }
>> > >> > > > }
>> > >> > > >
>> > >> > > >
>> > >> > > > Thanks
>> > >> > > >
>> > >> > >
>> > >> >
>> > >>
>> > >
>> > >
>> >
>>
Re: having problem with 0.8 gzip compression
Posted by Scott Wang <sc...@rumbleentertainment.com>.
Jun,
I did a test this morning and got a very interesting result with you
command. I started by wipe all the log files and clean up all zookeeper
data files.
Once I restarted both server, producer and consumer then execute your
command, what I got is a empty log as following:
Dumping /Users/scott/Temp/kafka/test-topic-0/00000000000000000000.log
Starting offset: 0
One observation, the 00000000000000000000.index file was getting huge but
there was nothing in 00000000000000000000.log file.
Thanks,
Scott
On Tue, Jul 9, 2013 at 8:40 PM, Jun Rao <ju...@gmail.com> wrote:
> Could you run the following command on one of the log files of your topic
> and attach the output?
>
> bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files
> /tmp/kafka-logs/testtopic-0/00000000000000000000.log
>
> Thanks,
>
> Jun
>
>
> On Tue, Jul 9, 2013 at 3:23 PM, Scott Wang <
> scott.wang@rumbleentertainment.com> wrote:
>
> > Another piece of information, the snappy compression also does not work.
> >
> > Thanks,
> > Scott
> >
> >
> > On Tue, Jul 9, 2013 at 11:07 AM, Scott Wang <
> > scott.wang@rumbleentertainment.com> wrote:
> >
> > > I just try it and it still not showing up, thanks for looking into
> this.
> > >
> > > Thanks,
> > > Scott
> > >
> > >
> > > On Tue, Jul 9, 2013 at 8:06 AM, Jun Rao <ju...@gmail.com> wrote:
> > >
> > >> Could you try starting the consumer first (and enable gzip in the
> > >> producer)?
> > >>
> > >> Thanks,
> > >>
> > >> Jun
> > >>
> > >>
> > >> On Mon, Jul 8, 2013 at 9:37 PM, Scott Wang <
> > >> scott.wang@rumbleentertainment.com> wrote:
> > >>
> > >> > No, I did not start the consumer before the producer. I actually
> > >> started
> > >> > the producer first and nothing showed up in the consumer unless I
> > >> commented
> > >> > out this line -- props.put("compression.codec", "gzip"). If I
> > >> commented
> > >> > out the compression codec, everything just works.
> > >> >
> > >> >
> > >> > On Mon, Jul 8, 2013 at 9:07 PM, Jun Rao <ju...@gmail.com> wrote:
> > >> >
> > >> > > Did you start the consumer before the producer? Be default, the
> > >> consumer
> > >> > > gets only the new data?
> > >> > >
> > >> > > Thanks,
> > >> > >
> > >> > > Jun
> > >> > >
> > >> > >
> > >> > > On Mon, Jul 8, 2013 at 2:53 PM, Scott Wang <
> > >> > > scott.wang@rumbleentertainment.com> wrote:
> > >> > >
> > >> > > > I am testing with Kafka 0.8 beta and having problem of receiving
> > >> > message
> > >> > > in
> > >> > > > consumer. There is no error so does anyone have any insights.
> > >> When I
> > >> > > > commented out the "compression.code" everything works fine.
> > >> > > >
> > >> > > > My producer:
> > >> > > > public class TestKafka08Prod {
> > >> > > >
> > >> > > > public static void main(String [] args) {
> > >> > > >
> > >> > > > Producer<Integer, String> producer = null;
> > >> > > > try {
> > >> > > > Properties props = new Properties();
> > >> > > > props.put("metadata.broker.list", "localhost:9092");
> > >> > > > props.put("serializer.class",
> > >> > > > "kafka.serializer.StringEncoder");
> > >> > > > props.put("producer.type", "sync");
> > >> > > > props.put("request.required.acks","1");
> > >> > > > props.put("compression.codec", "gzip");
> > >> > > > ProducerConfig config = new ProducerConfig(props);
> > >> > > > producer = new Producer<Integer, String>(config);
> > >> > > > int j=0;
> > >> > > > for(int i=0; i<10; i++) {
> > >> > > > KeyedMessage<Integer, String> data = new
> > >> > > > KeyedMessage<Integer, String>("test-topic", "test-message: "+i+"
> > >> > > > "+System.currentTimeMillis());
> > >> > > > producer.send(data);
> > >> > > >
> > >> > > > }
> > >> > > >
> > >> > > > } catch (Exception e) {
> > >> > > > System.out.println("Error happened: ");
> > >> > > > e.printStackTrace();
> > >> > > > } finally {
> > >> > > > if(null != null) {
> > >> > > > producer.close();
> > >> > > > }
> > >> > > >
> > >> > > > System.out.println("Ened of Sending");
> > >> > > > }
> > >> > > >
> > >> > > > System.exit(0);
> > >> > > > }
> > >> > > > }
> > >> > > >
> > >> > > >
> > >> > > > My consumer:
> > >> > > >
> > >> > > > public class TestKafka08Consumer {
> > >> > > > public static void main(String [] args) throws
> > >> > UnknownHostException,
> > >> > > > SocketException {
> > >> > > >
> > >> > > > Properties props = new Properties();
> > >> > > > props.put("zookeeper.connect",
> > "localhost:2181/kafka_0_8");
> > >> > > > props.put("group.id", "test08ConsumerId");
> > >> > > > props.put("zk.sessiontimeout.ms", "4000");
> > >> > > > props.put("zk.synctime.ms", "2000");
> > >> > > > props.put("autocommit.interval.ms", "1000");
> > >> > > >
> > >> > > > ConsumerConfig consumerConfig = new
> ConsumerConfig(props);
> > >> > > >
> > >> > > > ConsumerConnector consumerConnector =
> > >> > > >
> > kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig);
> > >> > > >
> > >> > > > String topic = "test-topic";
> > >> > > > Map<String, Integer> topicCountMap = new HashMap<String,
> > >> > > > Integer>();
> > >> > > > topicCountMap.put(topic, new Integer(1));
> > >> > > > Map<String, List<KafkaStream<byte[], byte[]>>>
> > consumerMap =
> > >> > > > consumerConnector.createMessageStreams(topicCountMap);
> > >> > > > KafkaStream<byte[], byte[]> stream =
> > >> > > > consumerMap.get(topic).get(0);
> > >> > > >
> > >> > > > ConsumerIterator<byte[], byte[]> it = stream.iterator();
> > >> > > >
> > >> > > > int counter=0;
> > >> > > > while(it.hasNext()) {
> > >> > > > try {
> > >> > > > String fromPlatform = new
> > >> String(it.next().message());
> > >> > > > System.out.println("The messages:
> "+fromPlatform);
> > >> > > > } catch(Exception e) {
> > >> > > > e.printStackTrace();
> > >> > > > }
> > >> > > > }
> > >> > > > System.out.println("SystemOut");
> > >> > > > }
> > >> > > > }
> > >> > > >
> > >> > > >
> > >> > > > Thanks
> > >> > > >
> > >> > >
> > >> >
> > >>
> > >
> > >
> >
>
Re: having problem with 0.8 gzip compression
Posted by Jun Rao <ju...@gmail.com>.
Could you run the following command on one of the log files of your topic
and attach the output?
bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files
/tmp/kafka-logs/testtopic-0/00000000000000000000.log
Thanks,
Jun
On Tue, Jul 9, 2013 at 3:23 PM, Scott Wang <
scott.wang@rumbleentertainment.com> wrote:
> Another piece of information, the snappy compression also does not work.
>
> Thanks,
> Scott
>
>
> On Tue, Jul 9, 2013 at 11:07 AM, Scott Wang <
> scott.wang@rumbleentertainment.com> wrote:
>
> > I just try it and it still not showing up, thanks for looking into this.
> >
> > Thanks,
> > Scott
> >
> >
> > On Tue, Jul 9, 2013 at 8:06 AM, Jun Rao <ju...@gmail.com> wrote:
> >
> >> Could you try starting the consumer first (and enable gzip in the
> >> producer)?
> >>
> >> Thanks,
> >>
> >> Jun
> >>
> >>
> >> On Mon, Jul 8, 2013 at 9:37 PM, Scott Wang <
> >> scott.wang@rumbleentertainment.com> wrote:
> >>
> >> > No, I did not start the consumer before the producer. I actually
> >> started
> >> > the producer first and nothing showed up in the consumer unless I
> >> commented
> >> > out this line -- props.put("compression.codec", "gzip"). If I
> >> commented
> >> > out the compression codec, everything just works.
> >> >
> >> >
> >> > On Mon, Jul 8, 2013 at 9:07 PM, Jun Rao <ju...@gmail.com> wrote:
> >> >
> >> > > Did you start the consumer before the producer? Be default, the
> >> consumer
> >> > > gets only the new data?
> >> > >
> >> > > Thanks,
> >> > >
> >> > > Jun
> >> > >
> >> > >
> >> > > On Mon, Jul 8, 2013 at 2:53 PM, Scott Wang <
> >> > > scott.wang@rumbleentertainment.com> wrote:
> >> > >
> >> > > > I am testing with Kafka 0.8 beta and having problem of receiving
> >> > message
> >> > > in
> >> > > > consumer. There is no error so does anyone have any insights.
> >> When I
> >> > > > commented out the "compression.code" everything works fine.
> >> > > >
> >> > > > My producer:
> >> > > > public class TestKafka08Prod {
> >> > > >
> >> > > > public static void main(String [] args) {
> >> > > >
> >> > > > Producer<Integer, String> producer = null;
> >> > > > try {
> >> > > > Properties props = new Properties();
> >> > > > props.put("metadata.broker.list", "localhost:9092");
> >> > > > props.put("serializer.class",
> >> > > > "kafka.serializer.StringEncoder");
> >> > > > props.put("producer.type", "sync");
> >> > > > props.put("request.required.acks","1");
> >> > > > props.put("compression.codec", "gzip");
> >> > > > ProducerConfig config = new ProducerConfig(props);
> >> > > > producer = new Producer<Integer, String>(config);
> >> > > > int j=0;
> >> > > > for(int i=0; i<10; i++) {
> >> > > > KeyedMessage<Integer, String> data = new
> >> > > > KeyedMessage<Integer, String>("test-topic", "test-message: "+i+"
> >> > > > "+System.currentTimeMillis());
> >> > > > producer.send(data);
> >> > > >
> >> > > > }
> >> > > >
> >> > > > } catch (Exception e) {
> >> > > > System.out.println("Error happened: ");
> >> > > > e.printStackTrace();
> >> > > > } finally {
> >> > > > if(null != null) {
> >> > > > producer.close();
> >> > > > }
> >> > > >
> >> > > > System.out.println("Ened of Sending");
> >> > > > }
> >> > > >
> >> > > > System.exit(0);
> >> > > > }
> >> > > > }
> >> > > >
> >> > > >
> >> > > > My consumer:
> >> > > >
> >> > > > public class TestKafka08Consumer {
> >> > > > public static void main(String [] args) throws
> >> > UnknownHostException,
> >> > > > SocketException {
> >> > > >
> >> > > > Properties props = new Properties();
> >> > > > props.put("zookeeper.connect",
> "localhost:2181/kafka_0_8");
> >> > > > props.put("group.id", "test08ConsumerId");
> >> > > > props.put("zk.sessiontimeout.ms", "4000");
> >> > > > props.put("zk.synctime.ms", "2000");
> >> > > > props.put("autocommit.interval.ms", "1000");
> >> > > >
> >> > > > ConsumerConfig consumerConfig = new ConsumerConfig(props);
> >> > > >
> >> > > > ConsumerConnector consumerConnector =
> >> > > >
> kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig);
> >> > > >
> >> > > > String topic = "test-topic";
> >> > > > Map<String, Integer> topicCountMap = new HashMap<String,
> >> > > > Integer>();
> >> > > > topicCountMap.put(topic, new Integer(1));
> >> > > > Map<String, List<KafkaStream<byte[], byte[]>>>
> consumerMap =
> >> > > > consumerConnector.createMessageStreams(topicCountMap);
> >> > > > KafkaStream<byte[], byte[]> stream =
> >> > > > consumerMap.get(topic).get(0);
> >> > > >
> >> > > > ConsumerIterator<byte[], byte[]> it = stream.iterator();
> >> > > >
> >> > > > int counter=0;
> >> > > > while(it.hasNext()) {
> >> > > > try {
> >> > > > String fromPlatform = new
> >> String(it.next().message());
> >> > > > System.out.println("The messages: "+fromPlatform);
> >> > > > } catch(Exception e) {
> >> > > > e.printStackTrace();
> >> > > > }
> >> > > > }
> >> > > > System.out.println("SystemOut");
> >> > > > }
> >> > > > }
> >> > > >
> >> > > >
> >> > > > Thanks
> >> > > >
> >> > >
> >> >
> >>
> >
> >
>
Re: having problem with 0.8 gzip compression
Posted by Scott Wang <sc...@rumbleentertainment.com>.
Another piece of information, the snappy compression also does not work.
Thanks,
Scott
On Tue, Jul 9, 2013 at 11:07 AM, Scott Wang <
scott.wang@rumbleentertainment.com> wrote:
> I just try it and it still not showing up, thanks for looking into this.
>
> Thanks,
> Scott
>
>
> On Tue, Jul 9, 2013 at 8:06 AM, Jun Rao <ju...@gmail.com> wrote:
>
>> Could you try starting the consumer first (and enable gzip in the
>> producer)?
>>
>> Thanks,
>>
>> Jun
>>
>>
>> On Mon, Jul 8, 2013 at 9:37 PM, Scott Wang <
>> scott.wang@rumbleentertainment.com> wrote:
>>
>> > No, I did not start the consumer before the producer. I actually
>> started
>> > the producer first and nothing showed up in the consumer unless I
>> commented
>> > out this line -- props.put("compression.codec", "gzip"). If I
>> commented
>> > out the compression codec, everything just works.
>> >
>> >
>> > On Mon, Jul 8, 2013 at 9:07 PM, Jun Rao <ju...@gmail.com> wrote:
>> >
>> > > Did you start the consumer before the producer? Be default, the
>> consumer
>> > > gets only the new data?
>> > >
>> > > Thanks,
>> > >
>> > > Jun
>> > >
>> > >
>> > > On Mon, Jul 8, 2013 at 2:53 PM, Scott Wang <
>> > > scott.wang@rumbleentertainment.com> wrote:
>> > >
>> > > > I am testing with Kafka 0.8 beta and having problem of receiving
>> > message
>> > > in
>> > > > consumer. There is no error so does anyone have any insights.
>> When I
>> > > > commented out the "compression.code" everything works fine.
>> > > >
>> > > > My producer:
>> > > > public class TestKafka08Prod {
>> > > >
>> > > > public static void main(String [] args) {
>> > > >
>> > > > Producer<Integer, String> producer = null;
>> > > > try {
>> > > > Properties props = new Properties();
>> > > > props.put("metadata.broker.list", "localhost:9092");
>> > > > props.put("serializer.class",
>> > > > "kafka.serializer.StringEncoder");
>> > > > props.put("producer.type", "sync");
>> > > > props.put("request.required.acks","1");
>> > > > props.put("compression.codec", "gzip");
>> > > > ProducerConfig config = new ProducerConfig(props);
>> > > > producer = new Producer<Integer, String>(config);
>> > > > int j=0;
>> > > > for(int i=0; i<10; i++) {
>> > > > KeyedMessage<Integer, String> data = new
>> > > > KeyedMessage<Integer, String>("test-topic", "test-message: "+i+"
>> > > > "+System.currentTimeMillis());
>> > > > producer.send(data);
>> > > >
>> > > > }
>> > > >
>> > > > } catch (Exception e) {
>> > > > System.out.println("Error happened: ");
>> > > > e.printStackTrace();
>> > > > } finally {
>> > > > if(null != null) {
>> > > > producer.close();
>> > > > }
>> > > >
>> > > > System.out.println("Ened of Sending");
>> > > > }
>> > > >
>> > > > System.exit(0);
>> > > > }
>> > > > }
>> > > >
>> > > >
>> > > > My consumer:
>> > > >
>> > > > public class TestKafka08Consumer {
>> > > > public static void main(String [] args) throws
>> > UnknownHostException,
>> > > > SocketException {
>> > > >
>> > > > Properties props = new Properties();
>> > > > props.put("zookeeper.connect", "localhost:2181/kafka_0_8");
>> > > > props.put("group.id", "test08ConsumerId");
>> > > > props.put("zk.sessiontimeout.ms", "4000");
>> > > > props.put("zk.synctime.ms", "2000");
>> > > > props.put("autocommit.interval.ms", "1000");
>> > > >
>> > > > ConsumerConfig consumerConfig = new ConsumerConfig(props);
>> > > >
>> > > > ConsumerConnector consumerConnector =
>> > > > kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig);
>> > > >
>> > > > String topic = "test-topic";
>> > > > Map<String, Integer> topicCountMap = new HashMap<String,
>> > > > Integer>();
>> > > > topicCountMap.put(topic, new Integer(1));
>> > > > Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
>> > > > consumerConnector.createMessageStreams(topicCountMap);
>> > > > KafkaStream<byte[], byte[]> stream =
>> > > > consumerMap.get(topic).get(0);
>> > > >
>> > > > ConsumerIterator<byte[], byte[]> it = stream.iterator();
>> > > >
>> > > > int counter=0;
>> > > > while(it.hasNext()) {
>> > > > try {
>> > > > String fromPlatform = new
>> String(it.next().message());
>> > > > System.out.println("The messages: "+fromPlatform);
>> > > > } catch(Exception e) {
>> > > > e.printStackTrace();
>> > > > }
>> > > > }
>> > > > System.out.println("SystemOut");
>> > > > }
>> > > > }
>> > > >
>> > > >
>> > > > Thanks
>> > > >
>> > >
>> >
>>
>
>
Re: having problem with 0.8 gzip compression
Posted by Scott Wang <sc...@rumbleentertainment.com>.
I just try it and it still not showing up, thanks for looking into this.
Thanks,
Scott
On Tue, Jul 9, 2013 at 8:06 AM, Jun Rao <ju...@gmail.com> wrote:
> Could you try starting the consumer first (and enable gzip in the
> producer)?
>
> Thanks,
>
> Jun
>
>
> On Mon, Jul 8, 2013 at 9:37 PM, Scott Wang <
> scott.wang@rumbleentertainment.com> wrote:
>
> > No, I did not start the consumer before the producer. I actually started
> > the producer first and nothing showed up in the consumer unless I
> commented
> > out this line -- props.put("compression.codec", "gzip"). If I
> commented
> > out the compression codec, everything just works.
> >
> >
> > On Mon, Jul 8, 2013 at 9:07 PM, Jun Rao <ju...@gmail.com> wrote:
> >
> > > Did you start the consumer before the producer? Be default, the
> consumer
> > > gets only the new data?
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > >
> > > On Mon, Jul 8, 2013 at 2:53 PM, Scott Wang <
> > > scott.wang@rumbleentertainment.com> wrote:
> > >
> > > > I am testing with Kafka 0.8 beta and having problem of receiving
> > message
> > > in
> > > > consumer. There is no error so does anyone have any insights. When
> I
> > > > commented out the "compression.code" everything works fine.
> > > >
> > > > My producer:
> > > > public class TestKafka08Prod {
> > > >
> > > > public static void main(String [] args) {
> > > >
> > > > Producer<Integer, String> producer = null;
> > > > try {
> > > > Properties props = new Properties();
> > > > props.put("metadata.broker.list", "localhost:9092");
> > > > props.put("serializer.class",
> > > > "kafka.serializer.StringEncoder");
> > > > props.put("producer.type", "sync");
> > > > props.put("request.required.acks","1");
> > > > props.put("compression.codec", "gzip");
> > > > ProducerConfig config = new ProducerConfig(props);
> > > > producer = new Producer<Integer, String>(config);
> > > > int j=0;
> > > > for(int i=0; i<10; i++) {
> > > > KeyedMessage<Integer, String> data = new
> > > > KeyedMessage<Integer, String>("test-topic", "test-message: "+i+"
> > > > "+System.currentTimeMillis());
> > > > producer.send(data);
> > > >
> > > > }
> > > >
> > > > } catch (Exception e) {
> > > > System.out.println("Error happened: ");
> > > > e.printStackTrace();
> > > > } finally {
> > > > if(null != null) {
> > > > producer.close();
> > > > }
> > > >
> > > > System.out.println("Ened of Sending");
> > > > }
> > > >
> > > > System.exit(0);
> > > > }
> > > > }
> > > >
> > > >
> > > > My consumer:
> > > >
> > > > public class TestKafka08Consumer {
> > > > public static void main(String [] args) throws
> > UnknownHostException,
> > > > SocketException {
> > > >
> > > > Properties props = new Properties();
> > > > props.put("zookeeper.connect", "localhost:2181/kafka_0_8");
> > > > props.put("group.id", "test08ConsumerId");
> > > > props.put("zk.sessiontimeout.ms", "4000");
> > > > props.put("zk.synctime.ms", "2000");
> > > > props.put("autocommit.interval.ms", "1000");
> > > >
> > > > ConsumerConfig consumerConfig = new ConsumerConfig(props);
> > > >
> > > > ConsumerConnector consumerConnector =
> > > > kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig);
> > > >
> > > > String topic = "test-topic";
> > > > Map<String, Integer> topicCountMap = new HashMap<String,
> > > > Integer>();
> > > > topicCountMap.put(topic, new Integer(1));
> > > > Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
> > > > consumerConnector.createMessageStreams(topicCountMap);
> > > > KafkaStream<byte[], byte[]> stream =
> > > > consumerMap.get(topic).get(0);
> > > >
> > > > ConsumerIterator<byte[], byte[]> it = stream.iterator();
> > > >
> > > > int counter=0;
> > > > while(it.hasNext()) {
> > > > try {
> > > > String fromPlatform = new
> String(it.next().message());
> > > > System.out.println("The messages: "+fromPlatform);
> > > > } catch(Exception e) {
> > > > e.printStackTrace();
> > > > }
> > > > }
> > > > System.out.println("SystemOut");
> > > > }
> > > > }
> > > >
> > > >
> > > > Thanks
> > > >
> > >
> >
>
Re: having problem with 0.8 gzip compression
Posted by Jun Rao <ju...@gmail.com>.
Could you try starting the consumer first (and enable gzip in the producer)?
Thanks,
Jun
On Mon, Jul 8, 2013 at 9:37 PM, Scott Wang <
scott.wang@rumbleentertainment.com> wrote:
> No, I did not start the consumer before the producer. I actually started
> the producer first and nothing showed up in the consumer unless I commented
> out this line -- props.put("compression.codec", "gzip"). If I commented
> out the compression codec, everything just works.
>
>
> On Mon, Jul 8, 2013 at 9:07 PM, Jun Rao <ju...@gmail.com> wrote:
>
> > Did you start the consumer before the producer? Be default, the consumer
> > gets only the new data?
> >
> > Thanks,
> >
> > Jun
> >
> >
> > On Mon, Jul 8, 2013 at 2:53 PM, Scott Wang <
> > scott.wang@rumbleentertainment.com> wrote:
> >
> > > I am testing with Kafka 0.8 beta and having problem of receiving
> message
> > in
> > > consumer. There is no error so does anyone have any insights. When I
> > > commented out the "compression.code" everything works fine.
> > >
> > > My producer:
> > > public class TestKafka08Prod {
> > >
> > > public static void main(String [] args) {
> > >
> > > Producer<Integer, String> producer = null;
> > > try {
> > > Properties props = new Properties();
> > > props.put("metadata.broker.list", "localhost:9092");
> > > props.put("serializer.class",
> > > "kafka.serializer.StringEncoder");
> > > props.put("producer.type", "sync");
> > > props.put("request.required.acks","1");
> > > props.put("compression.codec", "gzip");
> > > ProducerConfig config = new ProducerConfig(props);
> > > producer = new Producer<Integer, String>(config);
> > > int j=0;
> > > for(int i=0; i<10; i++) {
> > > KeyedMessage<Integer, String> data = new
> > > KeyedMessage<Integer, String>("test-topic", "test-message: "+i+"
> > > "+System.currentTimeMillis());
> > > producer.send(data);
> > >
> > > }
> > >
> > > } catch (Exception e) {
> > > System.out.println("Error happened: ");
> > > e.printStackTrace();
> > > } finally {
> > > if(null != null) {
> > > producer.close();
> > > }
> > >
> > > System.out.println("Ened of Sending");
> > > }
> > >
> > > System.exit(0);
> > > }
> > > }
> > >
> > >
> > > My consumer:
> > >
> > > public class TestKafka08Consumer {
> > > public static void main(String [] args) throws
> UnknownHostException,
> > > SocketException {
> > >
> > > Properties props = new Properties();
> > > props.put("zookeeper.connect", "localhost:2181/kafka_0_8");
> > > props.put("group.id", "test08ConsumerId");
> > > props.put("zk.sessiontimeout.ms", "4000");
> > > props.put("zk.synctime.ms", "2000");
> > > props.put("autocommit.interval.ms", "1000");
> > >
> > > ConsumerConfig consumerConfig = new ConsumerConfig(props);
> > >
> > > ConsumerConnector consumerConnector =
> > > kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig);
> > >
> > > String topic = "test-topic";
> > > Map<String, Integer> topicCountMap = new HashMap<String,
> > > Integer>();
> > > topicCountMap.put(topic, new Integer(1));
> > > Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
> > > consumerConnector.createMessageStreams(topicCountMap);
> > > KafkaStream<byte[], byte[]> stream =
> > > consumerMap.get(topic).get(0);
> > >
> > > ConsumerIterator<byte[], byte[]> it = stream.iterator();
> > >
> > > int counter=0;
> > > while(it.hasNext()) {
> > > try {
> > > String fromPlatform = new String(it.next().message());
> > > System.out.println("The messages: "+fromPlatform);
> > > } catch(Exception e) {
> > > e.printStackTrace();
> > > }
> > > }
> > > System.out.println("SystemOut");
> > > }
> > > }
> > >
> > >
> > > Thanks
> > >
> >
>
Re: having problem with 0.8 gzip compression
Posted by Scott Wang <sc...@rumbleentertainment.com>.
No, I did not start the consumer before the producer. I actually started
the producer first and nothing showed up in the consumer unless I commented
out this line -- props.put("compression.codec", "gzip"). If I commented
out the compression codec, everything just works.
On Mon, Jul 8, 2013 at 9:07 PM, Jun Rao <ju...@gmail.com> wrote:
> Did you start the consumer before the producer? Be default, the consumer
> gets only the new data?
>
> Thanks,
>
> Jun
>
>
> On Mon, Jul 8, 2013 at 2:53 PM, Scott Wang <
> scott.wang@rumbleentertainment.com> wrote:
>
> > I am testing with Kafka 0.8 beta and having problem of receiving message
> in
> > consumer. There is no error so does anyone have any insights. When I
> > commented out the "compression.code" everything works fine.
> >
> > My producer:
> > public class TestKafka08Prod {
> >
> > public static void main(String [] args) {
> >
> > Producer<Integer, String> producer = null;
> > try {
> > Properties props = new Properties();
> > props.put("metadata.broker.list", "localhost:9092");
> > props.put("serializer.class",
> > "kafka.serializer.StringEncoder");
> > props.put("producer.type", "sync");
> > props.put("request.required.acks","1");
> > props.put("compression.codec", "gzip");
> > ProducerConfig config = new ProducerConfig(props);
> > producer = new Producer<Integer, String>(config);
> > int j=0;
> > for(int i=0; i<10; i++) {
> > KeyedMessage<Integer, String> data = new
> > KeyedMessage<Integer, String>("test-topic", "test-message: "+i+"
> > "+System.currentTimeMillis());
> > producer.send(data);
> >
> > }
> >
> > } catch (Exception e) {
> > System.out.println("Error happened: ");
> > e.printStackTrace();
> > } finally {
> > if(null != null) {
> > producer.close();
> > }
> >
> > System.out.println("Ened of Sending");
> > }
> >
> > System.exit(0);
> > }
> > }
> >
> >
> > My consumer:
> >
> > public class TestKafka08Consumer {
> > public static void main(String [] args) throws UnknownHostException,
> > SocketException {
> >
> > Properties props = new Properties();
> > props.put("zookeeper.connect", "localhost:2181/kafka_0_8");
> > props.put("group.id", "test08ConsumerId");
> > props.put("zk.sessiontimeout.ms", "4000");
> > props.put("zk.synctime.ms", "2000");
> > props.put("autocommit.interval.ms", "1000");
> >
> > ConsumerConfig consumerConfig = new ConsumerConfig(props);
> >
> > ConsumerConnector consumerConnector =
> > kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig);
> >
> > String topic = "test-topic";
> > Map<String, Integer> topicCountMap = new HashMap<String,
> > Integer>();
> > topicCountMap.put(topic, new Integer(1));
> > Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
> > consumerConnector.createMessageStreams(topicCountMap);
> > KafkaStream<byte[], byte[]> stream =
> > consumerMap.get(topic).get(0);
> >
> > ConsumerIterator<byte[], byte[]> it = stream.iterator();
> >
> > int counter=0;
> > while(it.hasNext()) {
> > try {
> > String fromPlatform = new String(it.next().message());
> > System.out.println("The messages: "+fromPlatform);
> > } catch(Exception e) {
> > e.printStackTrace();
> > }
> > }
> > System.out.println("SystemOut");
> > }
> > }
> >
> >
> > Thanks
> >
>
Re: having problem with 0.8 gzip compression
Posted by Jun Rao <ju...@gmail.com>.
Did you start the consumer before the producer? Be default, the consumer
gets only the new data?
Thanks,
Jun
On Mon, Jul 8, 2013 at 2:53 PM, Scott Wang <
scott.wang@rumbleentertainment.com> wrote:
> I am testing with Kafka 0.8 beta and having problem of receiving message in
> consumer. There is no error so does anyone have any insights. When I
> commented out the "compression.code" everything works fine.
>
> My producer:
> public class TestKafka08Prod {
>
> public static void main(String [] args) {
>
> Producer<Integer, String> producer = null;
> try {
> Properties props = new Properties();
> props.put("metadata.broker.list", "localhost:9092");
> props.put("serializer.class",
> "kafka.serializer.StringEncoder");
> props.put("producer.type", "sync");
> props.put("request.required.acks","1");
> props.put("compression.codec", "gzip");
> ProducerConfig config = new ProducerConfig(props);
> producer = new Producer<Integer, String>(config);
> int j=0;
> for(int i=0; i<10; i++) {
> KeyedMessage<Integer, String> data = new
> KeyedMessage<Integer, String>("test-topic", "test-message: "+i+"
> "+System.currentTimeMillis());
> producer.send(data);
>
> }
>
> } catch (Exception e) {
> System.out.println("Error happened: ");
> e.printStackTrace();
> } finally {
> if(null != null) {
> producer.close();
> }
>
> System.out.println("Ened of Sending");
> }
>
> System.exit(0);
> }
> }
>
>
> My consumer:
>
> public class TestKafka08Consumer {
> public static void main(String [] args) throws UnknownHostException,
> SocketException {
>
> Properties props = new Properties();
> props.put("zookeeper.connect", "localhost:2181/kafka_0_8");
> props.put("group.id", "test08ConsumerId");
> props.put("zk.sessiontimeout.ms", "4000");
> props.put("zk.synctime.ms", "2000");
> props.put("autocommit.interval.ms", "1000");
>
> ConsumerConfig consumerConfig = new ConsumerConfig(props);
>
> ConsumerConnector consumerConnector =
> kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig);
>
> String topic = "test-topic";
> Map<String, Integer> topicCountMap = new HashMap<String,
> Integer>();
> topicCountMap.put(topic, new Integer(1));
> Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =
> consumerConnector.createMessageStreams(topicCountMap);
> KafkaStream<byte[], byte[]> stream =
> consumerMap.get(topic).get(0);
>
> ConsumerIterator<byte[], byte[]> it = stream.iterator();
>
> int counter=0;
> while(it.hasNext()) {
> try {
> String fromPlatform = new String(it.next().message());
> System.out.println("The messages: "+fromPlatform);
> } catch(Exception e) {
> e.printStackTrace();
> }
> }
> System.out.println("SystemOut");
> }
> }
>
>
> Thanks
>