You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Edward Capriolo <ed...@gmail.com> on 2014/03/30 18:30:19 UTC

Kicking the tires on 0.8.1...tires kicking back

I am trying to convert a few projects to the latest kafka...
Is this the latest artifact?
  <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-perf_2.8.0</artifactId>
                <version>0.8.1</version>
        </dependency>

I have a piece of code

  @Test
  public void test() throws InterruptedException {
    //super.createTopic("mytopic", 1, 1);
    KafkaWriter kw = new
KafkaWriter(super.createProducerConfig().props().props()
            , "mytopic"
            , new DefaultMessagePartitioner());
    kw.init();
    for (int i =0 ;i< 1000 ; i++){
      System.out.println("sending");
      kw.send(("bla "+i+" yyy zzz").getBytes());
    }
    System.out.println("done");
    ConsumerConnector consumerConnector =
Consumer.createJavaConsumerConnector(super.createConsumerConfig());

    Map<String, Integer> consumers = new HashMap<String, Integer>();
    consumers.put("mytopic", 1);
    Map<String, List<KafkaStream<byte[], byte []>>> topicMessageStreams =
consumerConnector
            .createMessageStreams(consumers);

    final List<KafkaStream<byte[], byte[]>> streams =
topicMessageStreams.get("mytopic");
    final AtomicInteger in = new AtomicInteger();
    final KafkaStream<byte[], byte[]> stream = streams.get(0);

    Thread t = new Thread() {
      public void run() {
        ConsumerIterator<byte[], byte[]> it = stream.iterator();
        while (it.hasNext()){
          System.out.println("waiting for messages");
          MessageAndMetadata<byte[], byte[]> m = it.next();
          System.out.println("key " + new String(m.key()));
          System.out.println("message " + new String(m.message()));
          in.incrementAndGet();
          System.out.println("count " + in.get());
          if (in.get() == 999) {
            break;
          }
        }
      }
    };

    t.start();
    t.join();

  }

 protected static kafka.producer.ProducerConfig createProducerConfig(){
    Properties producerProps = new Properties();
    //producerProps.put("serializer.class",
"kafka.serializer.StringEncoder");
    putZkConnect(producerProps, "localhost:"+zookeeperTestServer.getPort());
    producerProps.setProperty("batch.size", "10");
    producerProps.setProperty("producer.type", "async");
    producerProps.put("metadata.broker.list", "localhost:9092");
    return new kafka.producer.ProducerConfig(producerProps);
  }

  protected ConsumerConfig createConsumerConfig(){
    Properties consumerProps = new Properties();
    putZkConnect(consumerProps, "localhost:"+zookeeperTestServer.getPort());
    putGroupId(consumerProps, "group1");
    consumerProps.put("auto.offset.reset", "smallest");
    ConsumerConfig consumerConfig = new ConsumerConfig(consumerProps);
    return consumerConfig;
  }

The first 200 messages I send seem to get lost in the ether.

I have also tried creating the topic myself.

  public static void createTopic(String name, int replica , int partitions
){
    ZkClient z = new ZkClient("localhost:"+zookeeperTestServer.getPort());
    Properties p  = new Properties();
    AdminUtils.createTopic(z, name, replica, partitions, p);
  }

Which complains when i try to write to the topic.

014-03-30 12:28:35 WARN  BrokerPartitionInfo:83 - Error while fetching
metadata [{TopicMetadata for topic mytopic ->
No partition metadata for topic mytopic due to
kafka.common.LeaderNotAvailableException}] for topic [mytopic]: class
kafka.common.LeaderNotAvailableException
2014-03-30 12:28:35 WARN  BrokerPartitionInfo:83 - Error while fetching
metadata [{TopicMetadata for topic mytopic ->
No partition metadata for topic mytopic due to
kafka.common.LeaderNotAvailableException}] for topic [mytopic]: class
kafka.common.LeaderNotAvailableException
2014-03-30 12:28:35 ERROR DefaultEventHandler:97 - Failed to collate
messages by topic, partition due to: Failed to fetch topic metadata for
topic: mytopic


Does anyone know what is going on. I went through these pains converting to
0.8.0-betas and I was hoping to be done dealing with this :)

Re: Kicking the tires on 0.8.1...tires kicking back

Posted by Edward Capriolo <ed...@gmail.com>.
The same code works when I switch to 0.8.0
             <dependency>
                        <groupId>org.apache.kafka</groupId>
                        <artifactId>kafka_2.9.2</artifactId>
                       <version>0.8.0</version>
                </dependency>

And use my own

  public static void createTopic(String name, int replica, int partitions )
{
    String[] arguments = new String[8];
    arguments[0] = "--zookeeper";
    arguments[1] = "localhost:"+zookeeperTestServer.getPort();
    arguments[2] = "--replica";
    arguments[3] = replica+"";
    arguments[4] = "--partition";
    arguments[5] = partitions+"";
    arguments[6] = "--topic";
    arguments[7] = name;

    CreateTopicCommand.main(arguments);
    try {
      Thread.sleep(1000);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }




On Sun, Mar 30, 2014 at 12:30 PM, Edward Capriolo <ed...@gmail.com>wrote:

> I am trying to convert a few projects to the latest kafka...
> Is this the latest artifact?
>   <dependency>
>                 <groupId>org.apache.kafka</groupId>
>                 <artifactId>kafka-perf_2.8.0</artifactId>
>                 <version>0.8.1</version>
>         </dependency>
>
> I have a piece of code
>
>   @Test
>   public void test() throws InterruptedException {
>     //super.createTopic("mytopic", 1, 1);
>     KafkaWriter kw = new
> KafkaWriter(super.createProducerConfig().props().props()
>             , "mytopic"
>             , new DefaultMessagePartitioner());
>     kw.init();
>     for (int i =0 ;i< 1000 ; i++){
>       System.out.println("sending");
>       kw.send(("bla "+i+" yyy zzz").getBytes());
>     }
>     System.out.println("done");
>     ConsumerConnector consumerConnector =
> Consumer.createJavaConsumerConnector(super.createConsumerConfig());
>
>     Map<String, Integer> consumers = new HashMap<String, Integer>();
>     consumers.put("mytopic", 1);
>     Map<String, List<KafkaStream<byte[], byte []>>> topicMessageStreams =
> consumerConnector
>             .createMessageStreams(consumers);
>
>     final List<KafkaStream<byte[], byte[]>> streams =
> topicMessageStreams.get("mytopic");
>     final AtomicInteger in = new AtomicInteger();
>     final KafkaStream<byte[], byte[]> stream = streams.get(0);
>
>     Thread t = new Thread() {
>       public void run() {
>         ConsumerIterator<byte[], byte[]> it = stream.iterator();
>         while (it.hasNext()){
>           System.out.println("waiting for messages");
>           MessageAndMetadata<byte[], byte[]> m = it.next();
>           System.out.println("key " + new String(m.key()));
>           System.out.println("message " + new String(m.message()));
>           in.incrementAndGet();
>           System.out.println("count " + in.get());
>           if (in.get() == 999) {
>             break;
>           }
>         }
>       }
>     };
>
>     t.start();
>     t.join();
>
>   }
>
>  protected static kafka.producer.ProducerConfig createProducerConfig(){
>     Properties producerProps = new Properties();
>     //producerProps.put("serializer.class",
> "kafka.serializer.StringEncoder");
>     putZkConnect(producerProps,
> "localhost:"+zookeeperTestServer.getPort());
>     producerProps.setProperty("batch.size", "10");
>     producerProps.setProperty("producer.type", "async");
>     producerProps.put("metadata.broker.list", "localhost:9092");
>     return new kafka.producer.ProducerConfig(producerProps);
>   }
>
>   protected ConsumerConfig createConsumerConfig(){
>     Properties consumerProps = new Properties();
>     putZkConnect(consumerProps,
> "localhost:"+zookeeperTestServer.getPort());
>     putGroupId(consumerProps, "group1");
>     consumerProps.put("auto.offset.reset", "smallest");
>     ConsumerConfig consumerConfig = new ConsumerConfig(consumerProps);
>     return consumerConfig;
>   }
>
> The first 200 messages I send seem to get lost in the ether.
>
> I have also tried creating the topic myself.
>
>   public static void createTopic(String name, int replica , int partitions
> ){
>     ZkClient z = new ZkClient("localhost:"+zookeeperTestServer.getPort());
>     Properties p  = new Properties();
>     AdminUtils.createTopic(z, name, replica, partitions, p);
>   }
>
> Which complains when i try to write to the topic.
>
> 014-03-30 12:28:35 WARN  BrokerPartitionInfo:83 - Error while fetching
> metadata [{TopicMetadata for topic mytopic ->
> No partition metadata for topic mytopic due to
> kafka.common.LeaderNotAvailableException}] for topic [mytopic]: class
> kafka.common.LeaderNotAvailableException
> 2014-03-30 12:28:35 WARN  BrokerPartitionInfo:83 - Error while fetching
> metadata [{TopicMetadata for topic mytopic ->
> No partition metadata for topic mytopic due to
> kafka.common.LeaderNotAvailableException}] for topic [mytopic]: class
> kafka.common.LeaderNotAvailableException
> 2014-03-30 12:28:35 ERROR DefaultEventHandler:97 - Failed to collate
> messages by topic, partition due to: Failed to fetch topic metadata for
> topic: mytopic
>
>
> Does anyone know what is going on. I went through these pains converting
> to 0.8.0-betas and I was hoping to be done dealing with this :)
>

Re: Kicking the tires on 0.8.1...tires kicking back

Posted by "Michael G. Noll" <mi...@michael-noll.com>.
You might be running into the following known bug in 0.8.1:
https://issues.apache.org/jira/browse/KAFKA-1310

The fix is to downgrade to 0.8.0, or to migrate to 0.8.1.1 once it gets
released (IIRC the tentative 0.8.1.1 release date is mid-April).

Best,
Michael




On 30.03.2014 18:30, Edward Capriolo wrote:
> I am trying to convert a few projects to the latest kafka...
> Is this the latest artifact?
>   <dependency>
>                 <groupId>org.apache.kafka</groupId>
>                 <artifactId>kafka-perf_2.8.0</artifactId>
>                 <version>0.8.1</version>
>         </dependency>
> 
> I have a piece of code
> 
>   @Test
>   public void test() throws InterruptedException {
>     //super.createTopic("mytopic", 1, 1);
>     KafkaWriter kw = new
> KafkaWriter(super.createProducerConfig().props().props()
>             , "mytopic"
>             , new DefaultMessagePartitioner());
>     kw.init();
>     for (int i =0 ;i< 1000 ; i++){
>       System.out.println("sending");
>       kw.send(("bla "+i+" yyy zzz").getBytes());
>     }
>     System.out.println("done");
>     ConsumerConnector consumerConnector =
> Consumer.createJavaConsumerConnector(super.createConsumerConfig());
> 
>     Map<String, Integer> consumers = new HashMap<String, Integer>();
>     consumers.put("mytopic", 1);
>     Map<String, List<KafkaStream<byte[], byte []>>> topicMessageStreams =
> consumerConnector
>             .createMessageStreams(consumers);
> 
>     final List<KafkaStream<byte[], byte[]>> streams =
> topicMessageStreams.get("mytopic");
>     final AtomicInteger in = new AtomicInteger();
>     final KafkaStream<byte[], byte[]> stream = streams.get(0);
> 
>     Thread t = new Thread() {
>       public void run() {
>         ConsumerIterator<byte[], byte[]> it = stream.iterator();
>         while (it.hasNext()){
>           System.out.println("waiting for messages");
>           MessageAndMetadata<byte[], byte[]> m = it.next();
>           System.out.println("key " + new String(m.key()));
>           System.out.println("message " + new String(m.message()));
>           in.incrementAndGet();
>           System.out.println("count " + in.get());
>           if (in.get() == 999) {
>             break;
>           }
>         }
>       }
>     };
> 
>     t.start();
>     t.join();
> 
>   }
> 
>  protected static kafka.producer.ProducerConfig createProducerConfig(){
>     Properties producerProps = new Properties();
>     //producerProps.put("serializer.class",
> "kafka.serializer.StringEncoder");
>     putZkConnect(producerProps, "localhost:"+zookeeperTestServer.getPort());
>     producerProps.setProperty("batch.size", "10");
>     producerProps.setProperty("producer.type", "async");
>     producerProps.put("metadata.broker.list", "localhost:9092");
>     return new kafka.producer.ProducerConfig(producerProps);
>   }
> 
>   protected ConsumerConfig createConsumerConfig(){
>     Properties consumerProps = new Properties();
>     putZkConnect(consumerProps, "localhost:"+zookeeperTestServer.getPort());
>     putGroupId(consumerProps, "group1");
>     consumerProps.put("auto.offset.reset", "smallest");
>     ConsumerConfig consumerConfig = new ConsumerConfig(consumerProps);
>     return consumerConfig;
>   }
> 
> The first 200 messages I send seem to get lost in the ether.
> 
> I have also tried creating the topic myself.
> 
>   public static void createTopic(String name, int replica , int partitions
> ){
>     ZkClient z = new ZkClient("localhost:"+zookeeperTestServer.getPort());
>     Properties p  = new Properties();
>     AdminUtils.createTopic(z, name, replica, partitions, p);
>   }
> 
> Which complains when i try to write to the topic.
> 
> 014-03-30 12:28:35 WARN  BrokerPartitionInfo:83 - Error while fetching
> metadata [{TopicMetadata for topic mytopic ->
> No partition metadata for topic mytopic due to
> kafka.common.LeaderNotAvailableException}] for topic [mytopic]: class
> kafka.common.LeaderNotAvailableException
> 2014-03-30 12:28:35 WARN  BrokerPartitionInfo:83 - Error while fetching
> metadata [{TopicMetadata for topic mytopic ->
> No partition metadata for topic mytopic due to
> kafka.common.LeaderNotAvailableException}] for topic [mytopic]: class
> kafka.common.LeaderNotAvailableException
> 2014-03-30 12:28:35 ERROR DefaultEventHandler:97 - Failed to collate
> messages by topic, partition due to: Failed to fetch topic metadata for
> topic: mytopic
> 
> 
> Does anyone know what is going on. I went through these pains converting to
> 0.8.0-betas and I was hoping to be done dealing with this :)
>