You are viewing a plain text version of this content. The canonical link for it is here.
Posted to by "aarti gupta (JIRA)" <> on 2014/09/29 19:42:34 UTC

[jira] [Created] (KAFKA-1657) Fetch request using Simple consumer fails due to failed due to Leader not local for partition

aarti gupta created KAFKA-1657:

             Summary: Fetch request using Simple consumer fails due to failed due to Leader not local for partition
                 Key: KAFKA-1657
             Project: Kafka
          Issue Type: Bug
    Affects Versions:
            Reporter: aarti gupta

I have a three node Kafka cluster, running on the same physical machine, (on different ports)
 with replication factor = 3, and a single topic with 3 partitions.
Multiple producers write to the topic, and a custom partitioner is used to direct messages to a given partition.

I use the simple consumer to read from a given partition of the topic, and have three instances of my consumer running

The code snippet for the simple consumer suggests, that having any node in the cluster, (not necessarily the leader for that partition) is sufficient to find the leader for the partition, however, on running this, I find, that given a different node in the cluster, a null pointer exception is thrown, and the logs show the error

[2014-09-28 20:40:20,984] WARN [KafkaApi-1] Fetch request with correlation id 0 from client testClient on partition [VCCTask,1] failed due to Leader not local for partition [VCCTask,1] on broker 1 (kafka.server.KafkaApis)

bin/ --describe --zookeeper localhost:2181 --topic VCCTask
Topic:VCCTask	PartitionCount:3	ReplicationFactor:3	Configs:
	Topic: VCCTask	Partition: 0	Leader: 1	Replicas: 2,3,1	Isr: 1,2,3
	Topic: VCCTask	Partition: 1	Leader: 1	Replicas: 3,1,2	Isr: 1,2,3
	Topic: VCCTask	Partition: 2	Leader: 1	Replicas: 1,2,3	Isr: 1,2,3

If i specify the leader for the partition, instead of any node in the cluster, everything works great, but this is an operational nightmare.

I was able to reproduce this using a simple test, where a producer writes numbers from 1 to 999999, and the consumers, consume from a specific partition.

Here are the code snippets

public class TestConsumerStoreOffsetZookeeper {

    public static void main(String[] args) throws JSONException {

        TestConsumerStoreOffsetZookeeper testConsumer = new TestConsumerStoreOffsetZookeeper();
        JSONObject jsonObject = new JSONObject();
        jsonObject.put("topicName", "VCCTask");
        jsonObject.put("clientName", "testClient");
        jsonObject.put("partition", args[0]);
        jsonObject.put("hostPort", "");
        jsonObject.put("znodeName", "VCCTask");
        jsonObject.put("port", args[1]);
        final long startTime = System.currentTimeMillis();
        testConsumer.startReceiving(new FutureCallback<byte[]>() {
            int noOfMessagesConsumed= 0;
            public void onSuccess(byte[] result) {
      "YES!! " + ByteBuffer.wrap(result).getLong());
      "# Messages consumed "+ noOfMessagesConsumed +" Time elapsed"+ (System.currentTimeMillis()-startTime )/1000 +" seconds");

            public void onFailure(Throwable t) {
      "NO!! " + t.fillInStackTrace().getMessage());

    private String topicToRead;
    private static Logger LOG = Logger.getLogger("TestConsumerStoreOffsetZookeeper");
    List<String> seedBrokers = Lists.newArrayList("localhost");
    private int port;
    private SimpleConsumer consumer;
    Integer partition;
    String clientName;
    private Broker currentLeader;
    private String counter;
    CuratorFramework zooKeeper;

    public void startReceiving(final FutureCallback<byte[]> futureCallback) {

        findLeaderAndUpdateSelfPointers(seedBrokers, port, topicToRead, partition);"Kafka consumer delegate listening on topic " + topicToRead + " and partition " + partition);
        int numErrors = 0;
        while (true) {
            long latestOffset = 0;
            Stat stat = null;
            final String path = "/" + topicToRead + "/"+partition;
            try {
                //************************Read top of the
                stat = zooKeeper.checkExists().forPath(path);
                if (stat == null) {
                    latestOffset = getLastOffsetFromBeginningOfStream(this.consumer, topicToRead, partition, OffsetRequest.EarliestTime(), clientName);
                    byte b[] = new byte[8];
                    ByteBuffer byteBuffer = ByteBuffer.wrap(b);
                    final String s = zooKeeper.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(path);
          " Zookeeper create string is "+ s);
                    stat = zooKeeper.checkExists().forPath(path);
                    if (stat == null) {
              "Stat was null");
                        throw new RuntimeException("Stat in zookeeper was null, cannot continue as message stream cannot be persisted");
                } else {
                    final byte[] data = zooKeeper.getData().storingStatIn(stat).forPath(path);
                    latestOffset = ByteBuffer.wrap(data).getLong();
                    }else {
                        latestOffset = getLastOffsetFromBeginningOfStream(this.consumer,topicToRead,partition,OffsetRequest.EarliestTime(),clientName);

            } catch (Exception e) {
                throw new RuntimeException(e.fillInStackTrace().getMessage());
  "Topic name is " + topicToRead);
  "Last offset is " + latestOffset);
  "Constructing new fetch request on  " + topicToRead + " from offset" + latestOffset);
            FetchRequest request = new FetchRequestBuilder().clientId(clientName).addFetch(topicToRead, partition, latestOffset, 100000).build();
            FetchResponse fetchResponse = consumer.fetch(request);
            if (fetchResponse.hasError()) {
                final short code = fetchResponse.errorCode(topicToRead, partition);
      "Error fetching data from broker: " + + " Reason " + code);
                if (code == ErrorMapping.OffsetOutOfRangeCode()) {
          "Offset out of range error: calculating offset again");
                    throw new RuntimeException("Offset is out of range, multiple consumers are not allowed, this consumer will exit");
                if (numErrors > 5 && code!=3) {
                    consumer = null;
                    findLeaderAndUpdateSelfPointers(seedBrokers, port, topicToRead, partition);
                    numErrors = 0;
            final ByteBufferMessageSet messageAndOffsets = fetchResponse.messageSet(topicToRead, partition);
            final int validBytes = messageAndOffsets.validBytes();
  "Received fetch response on topic  " + topicToRead + " from offset" + latestOffset + " fetch response valid bytes is " + validBytes);
            try {
                if (validBytes == 0) {
          "No message received");
                    //Don't keep hammering Kafka
                for (MessageAndOffset messageAndOffset : messageAndOffsets) {
          "Processing offset");
                    final long currentOffset = messageAndOffset.offset();
          "Processing offset " + currentOffset);
                    //in case of compression entire block may be received
                    if (currentOffset < latestOffset) {
              "Found an old offset: " + currentOffset + "Expecting:" + latestOffset);
                    final ByteBuffer payload = messageAndOffset.message().payload();
                    byte[] bytes = new byte[payload.limit()];
           + " Received message from offset" + String.valueOf(latestOffset) + new String(bytes, "UTF-8"));
           + " Executing future callback");
                    //TODO ***************this should be atomic with writing job in db***********************
                    try {
                        long nextOffset = messageAndOffset.nextOffset();
                        incrementOffset(nextOffset, stat, path);
                    } catch (KeeperException | InterruptedException e) {
              "Encountered exception in writing to" + e.fillInStackTrace().getMessage());
      "Outside for loop");
            } catch (Exception e1) {
      "Error in processing message or running callback " + e1.getMessage());
                throw new RuntimeException(e1);


    private void incrementOffset(long nextOffset, Stat stat, String path) throws Exception {
        if (stat == null) {
            throw new RuntimeException("Given stat was null");
        byte b[] = new byte[8];
        ByteBuffer byteBuffer = ByteBuffer.wrap(b);
        byteBuffer.putLong(nextOffset);"Offset consumed successfully: Setting offset in zookeeper as next offset: "+ nextOffset);
        final Stat statWrite = zooKeeper.setData().forPath(path, b);
        if(statWrite.getDataLength() ==0){
            throw new RuntimeException("Unable to save offset in zookeeper");


    //TODO: agupta adapters should not have an initialize method, rename and merge with startListening
    public void initialize(JSONObject configData) {
        try {
            final String hostPort = configData.getString("hostPort");
            zooKeeper = CuratorFrameworkFactory.newClient(hostPort,new ExponentialBackoffRetry(10, 3000));
            this.counter = configData.getString("znodeName");
            this.topicToRead = configData.getString("topicName");
  "Topic name is " + topicToRead);
            //TODO: agupta: read seedbrokers from zookeeper
            //*ZkClient zkClient = new ZkClient("localhost:2108", 4000, 6000, new BytesPushThroughSerializer());
            //List<String> brokerList = zkClient.getChildren("/brokers/ips");
            List<String> seedBrokers = Lists.newArrayList("localhost");
            this.seedBrokers = seedBrokers;
            this.port = configData.getInt("port");
            this.partition= configData.getInt("partition");
            this.clientName = configData.getString("clientName");
  "Finding leader with for partition " + partition + " clientName " + clientName);
        } catch (JSONException | IOException e) {
  "Error parsing configuration" + e.getMessage());
        } catch (Exception e) {
  "Error starting zookeeper" + e.getMessage());


     * Find last offset to define where to start reading if this is the first read
     * @param consumer
     * @param topic
     * @param partition
     * @param whichTime
     * @param clientName
     * @return

    public static long getLastOffsetFromBeginningOfStream(SimpleConsumer consumer, String topic, int partition,
                                                          long whichTime, String clientName) {
        TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
        Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
        requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
        kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
        OffsetResponse response = consumer.getOffsetsBefore(request);
        if (response.hasError()) {
            System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition));
            return 0;
        long[] offsets = response.offsets(topic, partition);
        return offsets[0];

     * Return the lead broker for a given topic and partition
     * @param seedBrokers
     * @param port
     * @param topic
     * @param partition
     * @return
    private PartitionMetadata findLeaderAndUpdateSelfPointers(List<String> seedBrokers, int port, String topic, int partition) {
        PartitionMetadata returnMetaData = null;
        for (String seed : seedBrokers) {
            SimpleConsumer consumer = null;
            try {
                this.consumer = new SimpleConsumer(seed, port, 100000, 64 * 1024, "leaderLookup");

                List<String> topics = Collections.singletonList(topic);
                TopicMetadataRequest req = new TopicMetadataRequest(topics);
                kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);

                List<TopicMetadata> metaData = resp.topicsMetadata();

                for (TopicMetadata item : metaData) {
                    for (PartitionMetadata part : item.partitionsMetadata()) {
                        if (part.partitionId() == partition) {
                            returnMetaData = part;
                  "Found leader " + returnMetaData.leader().host());
                            break loop;
            } catch (Exception e) {
      "Error communicating with Broker [" + seed + "] to find Leader for [" + topic
                        + ", " + partition + "] Reason: " + e);
            } finally {
                if (consumer != null) consumer.close();
        }"KafkaConsumerDelegate initializing self pointers ");
        if (returnMetaData != null) {
            currentLeader = returnMetaData.leader();
            if (currentLeader != null) {
                this.consumer = new SimpleConsumer(, currentLeader.port(), 100000, 64 * 1024, clientName);
        }"KafkaConsumerDelegate: returning metadata");
        return returnMetaData;


This message was sent by Atlassian JIRA