You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Edward Capriolo <ed...@gmail.com> on 2014/03/30 18:30:19 UTC
Kicking the tires on 0.8.1...tires kicking back
I am trying to convert a few projects to the latest kafka...
Is this the latest artifact?
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-perf_2.8.0</artifactId>
<version>0.8.1</version>
</dependency>
I have a piece of code
@Test
public void test() throws InterruptedException {
//super.createTopic("mytopic", 1, 1);
KafkaWriter kw = new
KafkaWriter(super.createProducerConfig().props().props()
, "mytopic"
, new DefaultMessagePartitioner());
kw.init();
for (int i =0 ;i< 1000 ; i++){
System.out.println("sending");
kw.send(("bla "+i+" yyy zzz").getBytes());
}
System.out.println("done");
ConsumerConnector consumerConnector =
Consumer.createJavaConsumerConnector(super.createConsumerConfig());
Map<String, Integer> consumers = new HashMap<String, Integer>();
consumers.put("mytopic", 1);
Map<String, List<KafkaStream<byte[], byte []>>> topicMessageStreams =
consumerConnector
.createMessageStreams(consumers);
final List<KafkaStream<byte[], byte[]>> streams =
topicMessageStreams.get("mytopic");
final AtomicInteger in = new AtomicInteger();
final KafkaStream<byte[], byte[]> stream = streams.get(0);
Thread t = new Thread() {
public void run() {
ConsumerIterator<byte[], byte[]> it = stream.iterator();
while (it.hasNext()){
System.out.println("waiting for messages");
MessageAndMetadata<byte[], byte[]> m = it.next();
System.out.println("key " + new String(m.key()));
System.out.println("message " + new String(m.message()));
in.incrementAndGet();
System.out.println("count " + in.get());
if (in.get() == 999) {
break;
}
}
}
};
t.start();
t.join();
}
protected static kafka.producer.ProducerConfig createProducerConfig(){
Properties producerProps = new Properties();
//producerProps.put("serializer.class",
"kafka.serializer.StringEncoder");
putZkConnect(producerProps, "localhost:"+zookeeperTestServer.getPort());
producerProps.setProperty("batch.size", "10");
producerProps.setProperty("producer.type", "async");
producerProps.put("metadata.broker.list", "localhost:9092");
return new kafka.producer.ProducerConfig(producerProps);
}
protected ConsumerConfig createConsumerConfig(){
Properties consumerProps = new Properties();
putZkConnect(consumerProps, "localhost:"+zookeeperTestServer.getPort());
putGroupId(consumerProps, "group1");
consumerProps.put("auto.offset.reset", "smallest");
ConsumerConfig consumerConfig = new ConsumerConfig(consumerProps);
return consumerConfig;
}
The first 200 messages I send seem to get lost in the ether.
I have also tried creating the topic myself.
public static void createTopic(String name, int replica , int partitions
){
ZkClient z = new ZkClient("localhost:"+zookeeperTestServer.getPort());
Properties p = new Properties();
AdminUtils.createTopic(z, name, replica, partitions, p);
}
Which complains when i try to write to the topic.
014-03-30 12:28:35 WARN BrokerPartitionInfo:83 - Error while fetching
metadata [{TopicMetadata for topic mytopic ->
No partition metadata for topic mytopic due to
kafka.common.LeaderNotAvailableException}] for topic [mytopic]: class
kafka.common.LeaderNotAvailableException
2014-03-30 12:28:35 WARN BrokerPartitionInfo:83 - Error while fetching
metadata [{TopicMetadata for topic mytopic ->
No partition metadata for topic mytopic due to
kafka.common.LeaderNotAvailableException}] for topic [mytopic]: class
kafka.common.LeaderNotAvailableException
2014-03-30 12:28:35 ERROR DefaultEventHandler:97 - Failed to collate
messages by topic, partition due to: Failed to fetch topic metadata for
topic: mytopic
Does anyone know what is going on. I went through these pains converting to
0.8.0-betas and I was hoping to be done dealing with this :)
Re: Kicking the tires on 0.8.1...tires kicking back
Posted by Edward Capriolo <ed...@gmail.com>.
The same code works when I switch to 0.8.0
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.9.2</artifactId>
<version>0.8.0</version>
</dependency>
And use my own
public static void createTopic(String name, int replica, int partitions )
{
String[] arguments = new String[8];
arguments[0] = "--zookeeper";
arguments[1] = "localhost:"+zookeeperTestServer.getPort();
arguments[2] = "--replica";
arguments[3] = replica+"";
arguments[4] = "--partition";
arguments[5] = partitions+"";
arguments[6] = "--topic";
arguments[7] = name;
CreateTopicCommand.main(arguments);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
On Sun, Mar 30, 2014 at 12:30 PM, Edward Capriolo <ed...@gmail.com>wrote:
> I am trying to convert a few projects to the latest kafka...
> Is this the latest artifact?
> <dependency>
> <groupId>org.apache.kafka</groupId>
> <artifactId>kafka-perf_2.8.0</artifactId>
> <version>0.8.1</version>
> </dependency>
>
> I have a piece of code
>
> @Test
> public void test() throws InterruptedException {
> //super.createTopic("mytopic", 1, 1);
> KafkaWriter kw = new
> KafkaWriter(super.createProducerConfig().props().props()
> , "mytopic"
> , new DefaultMessagePartitioner());
> kw.init();
> for (int i =0 ;i< 1000 ; i++){
> System.out.println("sending");
> kw.send(("bla "+i+" yyy zzz").getBytes());
> }
> System.out.println("done");
> ConsumerConnector consumerConnector =
> Consumer.createJavaConsumerConnector(super.createConsumerConfig());
>
> Map<String, Integer> consumers = new HashMap<String, Integer>();
> consumers.put("mytopic", 1);
> Map<String, List<KafkaStream<byte[], byte []>>> topicMessageStreams =
> consumerConnector
> .createMessageStreams(consumers);
>
> final List<KafkaStream<byte[], byte[]>> streams =
> topicMessageStreams.get("mytopic");
> final AtomicInteger in = new AtomicInteger();
> final KafkaStream<byte[], byte[]> stream = streams.get(0);
>
> Thread t = new Thread() {
> public void run() {
> ConsumerIterator<byte[], byte[]> it = stream.iterator();
> while (it.hasNext()){
> System.out.println("waiting for messages");
> MessageAndMetadata<byte[], byte[]> m = it.next();
> System.out.println("key " + new String(m.key()));
> System.out.println("message " + new String(m.message()));
> in.incrementAndGet();
> System.out.println("count " + in.get());
> if (in.get() == 999) {
> break;
> }
> }
> }
> };
>
> t.start();
> t.join();
>
> }
>
> protected static kafka.producer.ProducerConfig createProducerConfig(){
> Properties producerProps = new Properties();
> //producerProps.put("serializer.class",
> "kafka.serializer.StringEncoder");
> putZkConnect(producerProps,
> "localhost:"+zookeeperTestServer.getPort());
> producerProps.setProperty("batch.size", "10");
> producerProps.setProperty("producer.type", "async");
> producerProps.put("metadata.broker.list", "localhost:9092");
> return new kafka.producer.ProducerConfig(producerProps);
> }
>
> protected ConsumerConfig createConsumerConfig(){
> Properties consumerProps = new Properties();
> putZkConnect(consumerProps,
> "localhost:"+zookeeperTestServer.getPort());
> putGroupId(consumerProps, "group1");
> consumerProps.put("auto.offset.reset", "smallest");
> ConsumerConfig consumerConfig = new ConsumerConfig(consumerProps);
> return consumerConfig;
> }
>
> The first 200 messages I send seem to get lost in the ether.
>
> I have also tried creating the topic myself.
>
> public static void createTopic(String name, int replica , int partitions
> ){
> ZkClient z = new ZkClient("localhost:"+zookeeperTestServer.getPort());
> Properties p = new Properties();
> AdminUtils.createTopic(z, name, replica, partitions, p);
> }
>
> Which complains when i try to write to the topic.
>
> 014-03-30 12:28:35 WARN BrokerPartitionInfo:83 - Error while fetching
> metadata [{TopicMetadata for topic mytopic ->
> No partition metadata for topic mytopic due to
> kafka.common.LeaderNotAvailableException}] for topic [mytopic]: class
> kafka.common.LeaderNotAvailableException
> 2014-03-30 12:28:35 WARN BrokerPartitionInfo:83 - Error while fetching
> metadata [{TopicMetadata for topic mytopic ->
> No partition metadata for topic mytopic due to
> kafka.common.LeaderNotAvailableException}] for topic [mytopic]: class
> kafka.common.LeaderNotAvailableException
> 2014-03-30 12:28:35 ERROR DefaultEventHandler:97 - Failed to collate
> messages by topic, partition due to: Failed to fetch topic metadata for
> topic: mytopic
>
>
> Does anyone know what is going on. I went through these pains converting
> to 0.8.0-betas and I was hoping to be done dealing with this :)
>
Re: Kicking the tires on 0.8.1...tires kicking back
Posted by "Michael G. Noll" <mi...@michael-noll.com>.
You might be running into the following known bug in 0.8.1:
https://issues.apache.org/jira/browse/KAFKA-1310
The fix is to downgrade to 0.8.0, or to migrate to 0.8.1.1 once it gets
released (IIRC the tentative 0.8.1.1 release date is mid-April).
Best,
Michael
On 30.03.2014 18:30, Edward Capriolo wrote:
> I am trying to convert a few projects to the latest kafka...
> Is this the latest artifact?
> <dependency>
> <groupId>org.apache.kafka</groupId>
> <artifactId>kafka-perf_2.8.0</artifactId>
> <version>0.8.1</version>
> </dependency>
>
> I have a piece of code
>
> @Test
> public void test() throws InterruptedException {
> //super.createTopic("mytopic", 1, 1);
> KafkaWriter kw = new
> KafkaWriter(super.createProducerConfig().props().props()
> , "mytopic"
> , new DefaultMessagePartitioner());
> kw.init();
> for (int i =0 ;i< 1000 ; i++){
> System.out.println("sending");
> kw.send(("bla "+i+" yyy zzz").getBytes());
> }
> System.out.println("done");
> ConsumerConnector consumerConnector =
> Consumer.createJavaConsumerConnector(super.createConsumerConfig());
>
> Map<String, Integer> consumers = new HashMap<String, Integer>();
> consumers.put("mytopic", 1);
> Map<String, List<KafkaStream<byte[], byte []>>> topicMessageStreams =
> consumerConnector
> .createMessageStreams(consumers);
>
> final List<KafkaStream<byte[], byte[]>> streams =
> topicMessageStreams.get("mytopic");
> final AtomicInteger in = new AtomicInteger();
> final KafkaStream<byte[], byte[]> stream = streams.get(0);
>
> Thread t = new Thread() {
> public void run() {
> ConsumerIterator<byte[], byte[]> it = stream.iterator();
> while (it.hasNext()){
> System.out.println("waiting for messages");
> MessageAndMetadata<byte[], byte[]> m = it.next();
> System.out.println("key " + new String(m.key()));
> System.out.println("message " + new String(m.message()));
> in.incrementAndGet();
> System.out.println("count " + in.get());
> if (in.get() == 999) {
> break;
> }
> }
> }
> };
>
> t.start();
> t.join();
>
> }
>
> protected static kafka.producer.ProducerConfig createProducerConfig(){
> Properties producerProps = new Properties();
> //producerProps.put("serializer.class",
> "kafka.serializer.StringEncoder");
> putZkConnect(producerProps, "localhost:"+zookeeperTestServer.getPort());
> producerProps.setProperty("batch.size", "10");
> producerProps.setProperty("producer.type", "async");
> producerProps.put("metadata.broker.list", "localhost:9092");
> return new kafka.producer.ProducerConfig(producerProps);
> }
>
> protected ConsumerConfig createConsumerConfig(){
> Properties consumerProps = new Properties();
> putZkConnect(consumerProps, "localhost:"+zookeeperTestServer.getPort());
> putGroupId(consumerProps, "group1");
> consumerProps.put("auto.offset.reset", "smallest");
> ConsumerConfig consumerConfig = new ConsumerConfig(consumerProps);
> return consumerConfig;
> }
>
> The first 200 messages I send seem to get lost in the ether.
>
> I have also tried creating the topic myself.
>
> public static void createTopic(String name, int replica , int partitions
> ){
> ZkClient z = new ZkClient("localhost:"+zookeeperTestServer.getPort());
> Properties p = new Properties();
> AdminUtils.createTopic(z, name, replica, partitions, p);
> }
>
> Which complains when i try to write to the topic.
>
> 014-03-30 12:28:35 WARN BrokerPartitionInfo:83 - Error while fetching
> metadata [{TopicMetadata for topic mytopic ->
> No partition metadata for topic mytopic due to
> kafka.common.LeaderNotAvailableException}] for topic [mytopic]: class
> kafka.common.LeaderNotAvailableException
> 2014-03-30 12:28:35 WARN BrokerPartitionInfo:83 - Error while fetching
> metadata [{TopicMetadata for topic mytopic ->
> No partition metadata for topic mytopic due to
> kafka.common.LeaderNotAvailableException}] for topic [mytopic]: class
> kafka.common.LeaderNotAvailableException
> 2014-03-30 12:28:35 ERROR DefaultEventHandler:97 - Failed to collate
> messages by topic, partition due to: Failed to fetch topic metadata for
> topic: mytopic
>
>
> Does anyone know what is going on. I went through these pains converting to
> 0.8.0-betas and I was hoping to be done dealing with this :)
>