You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@kafka.apache.org by Aarti Gupta <aa...@gmail.com> on 2014/09/13 10:01:29 UTC

head() on Kafka stream gives NoSuchMethodError

The following error is thrown, (when I call KafkaStream.head(), as shown in
the code snippet below)

* WARN -  java.lang.NoSuchMethodError:
kafka.consumer.KafkaStream.head()Lkafka/message/MessageAndMetadata;*

My use case, is that I want to block on the receive() method, and when
anything is published on the topic, I 'head' of the queue to the calling
method, that processes it.

I do not use partitioning and have a single stream.


import com.google.common.collect.Maps;
import x.x.x.Task;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.javaapi.consumer.ZookeeperConsumerConnector;
import kafka.message.MessageAndMetadata;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;

/**
 * @author agupta
 */
public class KafkaConsumerDelegate implements ConsumerDelegate {

    private ConsumerConnector consumerConnector;
    private String topicName;
    private static Logger LOG =
LoggerFactory.getLogger(KafkaProducerDelegate.class.getName());
    private final Map<String, Integer> topicCount = Maps.newHashMap();
    private Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams;
    private List<KafkaStream<byte[], byte[]>> kafkaStreams;


    @Override
    public Task receive(final boolean consumerConfirms) {
        try {
            LOG.info("Kafka consumer delegate listening on topic " +
getTopicName());
            kafkaStreams = messageStreams.get(getTopicName());
            final KafkaStream<byte[], byte[]> kafkaStream =
kafkaStreams.get(0);
            return Executors.newSingleThreadExecutor().submit(new
Callable<Task>() {
                @Override
                public Task call() throws Exception {
                    * final MessageAndMetadata<byte[], byte[]>
messageAndMetadata= kafkaStream.head();*

                        final Task message = new Task() {
                            @Override
                            public byte[] getBytes() {
                                return messageAndMetadata.message();

                            }
                        };
                        return message;
                    }
                }).get();

        } catch (Exception e) {
            LOG.warn("Error in consumer " + e.getMessage());
        }
        return null;
    }

    @Override
    public void initialize(JSONObject configData, boolean publisherAckMode)
throws IOException {
        try {
            this.topicName = configData.getString("topicName");
            LOG.info("Topic name is " + topicName);
        } catch (JSONException e) {
            e.printStackTrace();
            LOG.error("Error parsing configuration", e);
        }
        Properties properties = new Properties();
        properties.put("zookeeper.connect", "localhost:2181");
        properties.put("group.id", "testgroup");
        ConsumerConfig consumerConfig = new ConsumerConfig(properties);
        //only one stream, and one topic, (Since we are not supporting
partitioning)
        topicCount.put(getTopicName(), 1);
        consumerConnector = new ZookeeperConsumerConnector(consumerConfig);
        messageStreams = consumerConnector.createMessageStreams(topicCount);
    }

    @Override
    public void stop() throws IOException {
//TODO
        throw new UnsupportedOperationException("Method Not Implemented");
    }

    public String getTopicName() {
        return this.topicName;
    }
}



Lastly, I am using the following binary

kafka_2.8.0-0.8.1.1

and the following maven dependency

  <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.10</artifactId>
            <version>0.8.1.1</version>
        </dependency>


Any suggestions?

Thanks
aarti

Re: head() on Kafka stream gives NoSuchMethodError

Posted by Jun Rao <ju...@gmail.com>.
head() is a scala method. Calling it from java requires you to figure out
the exact class name in byte code. A simpler way is to use the java
iterable api in KafkaStream. By default, it blocks on the hasNext()
call when there is no message.

Thanks,

Jun

On Sat, Sep 13, 2014 at 1:01 AM, Aarti Gupta <aa...@gmail.com> wrote:

> The following error is thrown, (when I call KafkaStream.head(), as shown in
> the code snippet below)
>
> * WARN -  java.lang.NoSuchMethodError:
> kafka.consumer.KafkaStream.head()Lkafka/message/MessageAndMetadata;*
>
> My use case, is that I want to block on the receive() method, and when
> anything is published on the topic, I 'head' of the queue to the calling
> method, that processes it.
>
> I do not use partitioning and have a single stream.
>
>
> import com.google.common.collect.Maps;
> import x.x.x.Task;
> import kafka.consumer.ConsumerConfig;
> import kafka.consumer.KafkaStream;
> import kafka.javaapi.consumer.ConsumerConnector;
> import kafka.javaapi.consumer.ZookeeperConsumerConnector;
> import kafka.message.MessageAndMetadata;
> import org.codehaus.jettison.json.JSONException;
> import org.codehaus.jettison.json.JSONObject;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
>
> import java.io.IOException;
> import java.util.List;
> import java.util.Map;
> import java.util.Properties;
> import java.util.concurrent.Callable;
> import java.util.concurrent.Executors;
>
> /**
>  * @author agupta
>  */
> public class KafkaConsumerDelegate implements ConsumerDelegate {
>
>     private ConsumerConnector consumerConnector;
>     private String topicName;
>     private static Logger LOG =
> LoggerFactory.getLogger(KafkaProducerDelegate.class.getName());
>     private final Map<String, Integer> topicCount = Maps.newHashMap();
>     private Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams;
>     private List<KafkaStream<byte[], byte[]>> kafkaStreams;
>
>
>     @Override
>     public Task receive(final boolean consumerConfirms) {
>         try {
>             LOG.info("Kafka consumer delegate listening on topic " +
> getTopicName());
>             kafkaStreams = messageStreams.get(getTopicName());
>             final KafkaStream<byte[], byte[]> kafkaStream =
> kafkaStreams.get(0);
>             return Executors.newSingleThreadExecutor().submit(new
> Callable<Task>() {
>                 @Override
>                 public Task call() throws Exception {
>                     * final MessageAndMetadata<byte[], byte[]>
> messageAndMetadata= kafkaStream.head();*
>
>                         final Task message = new Task() {
>                             @Override
>                             public byte[] getBytes() {
>                                 return messageAndMetadata.message();
>
>                             }
>                         };
>                         return message;
>                     }
>                 }).get();
>
>         } catch (Exception e) {
>             LOG.warn("Error in consumer " + e.getMessage());
>         }
>         return null;
>     }
>
>     @Override
>     public void initialize(JSONObject configData, boolean publisherAckMode)
> throws IOException {
>         try {
>             this.topicName = configData.getString("topicName");
>             LOG.info("Topic name is " + topicName);
>         } catch (JSONException e) {
>             e.printStackTrace();
>             LOG.error("Error parsing configuration", e);
>         }
>         Properties properties = new Properties();
>         properties.put("zookeeper.connect", "localhost:2181");
>         properties.put("group.id", "testgroup");
>         ConsumerConfig consumerConfig = new ConsumerConfig(properties);
>         //only one stream, and one topic, (Since we are not supporting
> partitioning)
>         topicCount.put(getTopicName(), 1);
>         consumerConnector = new ZookeeperConsumerConnector(consumerConfig);
>         messageStreams =
> consumerConnector.createMessageStreams(topicCount);
>     }
>
>     @Override
>     public void stop() throws IOException {
> //TODO
>         throw new UnsupportedOperationException("Method Not Implemented");
>     }
>
>     public String getTopicName() {
>         return this.topicName;
>     }
> }
>
>
>
> Lastly, I am using the following binary
>
> kafka_2.8.0-0.8.1.1
>
> and the following maven dependency
>
>   <dependency>
>             <groupId>org.apache.kafka</groupId>
>             <artifactId>kafka_2.10</artifactId>
>             <version>0.8.1.1</version>
>         </dependency>
>
>
> Any suggestions?
>
> Thanks
> aarti
>