You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "aarti gupta (JIRA)" <ji...@apache.org> on 2014/09/29 19:50:33 UTC

[jira] [Commented] (KAFKA-1657) Fetch request using Simple consumer fails due to failed due to Leader not local for partition

    [ https://issues.apache.org/jira/browse/KAFKA-1657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14151952#comment-14151952 ] 

aarti gupta commented on KAFKA-1657:
------------------------------------

To elaborate on the expected usage.

I want to specify the ip address, port of any node in the kafka cluster, (instead of the dynamic args[1] above)
and irrespective of whether that is the leader for the partition that the consumer is interested in (args[0]), the seed broker should be sufficient to discover the other nodes in the cluster and return the leader for the partition.

Is this the correct interpretation of the documentation for simple consumer here ?https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example


> Fetch request using Simple consumer fails due to failed due to Leader not local for partition
> ---------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-1657
>                 URL: https://issues.apache.org/jira/browse/KAFKA-1657
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.8.1.1
>            Reporter: aarti gupta
>
> I have a three node Kafka cluster, running on the same physical machine, (on different ports)
>  with replication factor = 3, and a single topic with 3 partitions.
> Multiple producers write to the topic, and a custom partitioner is used to direct messages to a given partition.
> I use the simple consumer to read from a given partition of the topic, and have three instances of my consumer running
> The code snippet for the simple consumer suggests, that having any node in the cluster, (not necessarily the leader for that partition) is sufficient to find the leader for the partition, however, on running this, I find, that given a different node in the cluster, a null pointer exception is thrown, and the logs show the error
> [2014-09-28 20:40:20,984] WARN [KafkaApi-1] Fetch request with correlation id 0 from client testClient on partition [VCCTask,1] failed due to Leader not local for partition [VCCTask,1] on broker 1 (kafka.server.KafkaApis)
> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic VCCTask
> Topic:VCCTask	PartitionCount:3	ReplicationFactor:3	Configs:
> 	Topic: VCCTask	Partition: 0	Leader: 1	Replicas: 2,3,1	Isr: 1,2,3
> 	Topic: VCCTask	Partition: 1	Leader: 1	Replicas: 3,1,2	Isr: 1,2,3
> 	Topic: VCCTask	Partition: 2	Leader: 1	Replicas: 1,2,3	Isr: 1,2,3
> If i specify the leader for the partition, instead of any node in the cluster, everything works great, but this is an operational nightmare.
> I was able to reproduce this using a simple test, where a producer writes numbers from 1 to 999999, and the consumers, consume from a specific partition.
> Here are the code snippets
> public class TestConsumerStoreOffsetZookeeper {
>     public static void main(String[] args) throws JSONException {
>         TestConsumerStoreOffsetZookeeper testConsumer = new TestConsumerStoreOffsetZookeeper();
>         JSONObject jsonObject = new JSONObject();
>         jsonObject.put("topicName", "VCCTask");
>         jsonObject.put("clientName", "testClient");
>         jsonObject.put("partition", args[0]);
>         jsonObject.put("hostPort", "172.16.78.171");
>         jsonObject.put("znodeName", "VCCTask");
>         jsonObject.put("port", args[1]);
>         testConsumer.initialize(jsonObject);
>         final long startTime = System.currentTimeMillis();
>         testConsumer.startReceiving(new FutureCallback<byte[]>() {
>             int noOfMessagesConsumed= 0;
>             @Override
>             public void onSuccess(byte[] result) {
>                 LOG.info("YES!! " + ByteBuffer.wrap(result).getLong());
>                 ++noOfMessagesConsumed;
>                 LOG.info("# Messages consumed "+ noOfMessagesConsumed +" Time elapsed"+ (System.currentTimeMillis()-startTime )/1000 +" seconds");
>             }
>             @Override
>             public void onFailure(Throwable t) {
>                 LOG.info("NO!! " + t.fillInStackTrace().getMessage());
>             }
>         });
>     }
>     private String topicToRead;
>     private static Logger LOG = Logger.getLogger("TestConsumerStoreOffsetZookeeper");
>     List<String> seedBrokers = Lists.newArrayList("localhost");
>     private int port;
>     private SimpleConsumer consumer;
>     Integer partition;
>     String clientName;
>     private Broker currentLeader;
>     private String counter;
>     CuratorFramework zooKeeper;
>     public void startReceiving(final FutureCallback<byte[]> futureCallback) {
>         findLeaderAndUpdateSelfPointers(seedBrokers, port, topicToRead, partition);
>         LOG.info("Kafka consumer delegate listening on topic " + topicToRead + " and partition " + partition);
>         int numErrors = 0;
>         while (true) {
>             long latestOffset = 0;
>             Stat stat = null;
>             final String path = "/" + topicToRead + "/"+partition;
>             try {
>                 //************************Read top of the
>                 stat = zooKeeper.checkExists().forPath(path);
>                 if (stat == null) {
>                     latestOffset = getLastOffsetFromBeginningOfStream(this.consumer, topicToRead, partition, OffsetRequest.EarliestTime(), clientName);
>                     byte b[] = new byte[8];
>                     ByteBuffer byteBuffer = ByteBuffer.wrap(b);
>                     byteBuffer.putLong(latestOffset);
>                     final String s = zooKeeper.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE).forPath(path);
>                     LOG.info(" Zookeeper create string is "+ s);
>                     stat = zooKeeper.checkExists().forPath(path);
>                     if (stat == null) {
>                         LOG.info("Stat was null");
>                         throw new RuntimeException("Stat in zookeeper was null, cannot continue as message stream cannot be persisted");
>                     }
>                 } else {
>                     final byte[] data = zooKeeper.getData().storingStatIn(stat).forPath(path);
>                     if(data.length>0){
>                     latestOffset = ByteBuffer.wrap(data).getLong();
>                     }else {
>                         latestOffset = getLastOffsetFromBeginningOfStream(this.consumer,topicToRead,partition,OffsetRequest.EarliestTime(),clientName);
>                     }
>                 }
>             } catch (Exception e) {
>                 throw new RuntimeException(e.fillInStackTrace().getMessage());
>             }
>             LOG.info("Topic name is " + topicToRead);
>             LOG.info("Last offset is " + latestOffset);
>             LOG.info("Constructing new fetch request on  " + topicToRead + " from offset" + latestOffset);
>             FetchRequest request = new FetchRequestBuilder().clientId(clientName).addFetch(topicToRead, partition, latestOffset, 100000).build();
>             FetchResponse fetchResponse = consumer.fetch(request);
>             if (fetchResponse.hasError()) {
>                 numErrors++;
>                 final short code = fetchResponse.errorCode(topicToRead, partition);
>                 LOG.info("Error fetching data from broker: " + consumer.host() + " Reason " + code);
>                 if (code == ErrorMapping.OffsetOutOfRangeCode()) {
>                     LOG.info("Offset out of range error: calculating offset again");
>                     throw new RuntimeException("Offset is out of range, multiple consumers are not allowed, this consumer will exit");
>                 }
>                 if (numErrors > 5 && code!=3) {
>                     consumer.close();
>                     consumer = null;
>                     findLeaderAndUpdateSelfPointers(seedBrokers, port, topicToRead, partition);
>                     numErrors = 0;
>                 }
>                 continue;
>             }
>             final ByteBufferMessageSet messageAndOffsets = fetchResponse.messageSet(topicToRead, partition);
>             final int validBytes = messageAndOffsets.validBytes();
>             LOG.info("Received fetch response on topic  " + topicToRead + " from offset" + latestOffset + " fetch response valid bytes is " + validBytes);
>             try {
>                 if (validBytes == 0) {
>                     LOG.info("No message received");
>                     //Don't keep hammering Kafka
>                     Thread.sleep(1000);
>                     continue;
>                 }
>                 for (MessageAndOffset messageAndOffset : messageAndOffsets) {
>                     LOG.info("Processing offset");
>                     final long currentOffset = messageAndOffset.offset();
>                     LOG.info("Processing offset " + currentOffset);
>                     //in case of compression entire block may be received
>                     if (currentOffset < latestOffset) {
>                         LOG.info("Found an old offset: " + currentOffset + "Expecting:" + latestOffset);
>                         continue;
>                     }
>                     final ByteBuffer payload = messageAndOffset.message().payload();
>                     byte[] bytes = new byte[payload.limit()];
>                     payload.get(bytes);
>                     LOG.info(this.getClass().getName() + " Received message from offset" + String.valueOf(latestOffset) + new String(bytes, "UTF-8"));
>                     LOG.info(this.getClass().getName() + " Executing future callback");
>                     //TODO ***************this should be atomic with writing job in db***********************
>                     futureCallback.onSuccess(bytes);
>                     try {
>                         long nextOffset = messageAndOffset.nextOffset();
>                         incrementOffset(nextOffset, stat, path);
>                     } catch (KeeperException | InterruptedException e) {
>                         LOG.info("Encountered exception in writing to" + e.fillInStackTrace().getMessage());
>                     }
>                     //****************************************************************************************
>                 }
>                 LOG.info("Outside for loop");
>             } catch (Exception e1) {
>                 LOG.info("Error in processing message or running callback " + e1.getMessage());
>                 futureCallback.onFailure(e1);
>                 throw new RuntimeException(e1);
>             }
>         }
>     }
>     private void incrementOffset(long nextOffset, Stat stat, String path) throws Exception {
>         if (stat == null) {
>             throw new RuntimeException("Given stat was null");
>         }
>         byte b[] = new byte[8];
>         ByteBuffer byteBuffer = ByteBuffer.wrap(b);
>         byteBuffer.putLong(nextOffset);
>         LOG.info("Offset consumed successfully: Setting offset in zookeeper as next offset: "+ nextOffset);
>         final Stat statWrite = zooKeeper.setData().forPath(path, b);
>         if(statWrite.getDataLength() ==0){
>             throw new RuntimeException("Unable to save offset in zookeeper");
>         }
>     }
>     //TODO: agupta adapters should not have an initialize method, rename and merge with startListening
>     public void initialize(JSONObject configData) {
>         try {
>             final String hostPort = configData.getString("hostPort");
>             zooKeeper = CuratorFrameworkFactory.newClient(hostPort,new ExponentialBackoffRetry(10, 3000));
>             zooKeeper.start();
>             this.counter = configData.getString("znodeName");
>             this.topicToRead = configData.getString("topicName");
>             LOG.info("Topic name is " + topicToRead);
>             //TODO: agupta: read seedbrokers from zookeeper
>             //*ZkClient zkClient = new ZkClient("localhost:2108", 4000, 6000, new BytesPushThroughSerializer());
>             //List<String> brokerList = zkClient.getChildren("/brokers/ips");
>             List<String> seedBrokers = Lists.newArrayList("localhost");
>             this.seedBrokers = seedBrokers;
>             this.port = configData.getInt("port");
>             this.partition= configData.getInt("partition");
>             this.clientName = configData.getString("clientName");
>             LOG.info("Finding leader with for partition " + partition + " clientName " + clientName);
>         } catch (JSONException | IOException e) {
>             e.printStackTrace();
>             LOG.info("Error parsing configuration" + e.getMessage());
>         } catch (Exception e) {
>             LOG.info("Error starting zookeeper" + e.getMessage());
>         }
>     }
>     /**
>      * Find last offset to define where to start reading if this is the first read
>      *
>      * @param consumer
>      * @param topic
>      * @param partition
>      * @param whichTime
>      * @param clientName
>      * @return
>      */
>     public static long getLastOffsetFromBeginningOfStream(SimpleConsumer consumer, String topic, int partition,
>                                                           long whichTime, String clientName) {
>         TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
>         Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
>         requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
>         kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
>         OffsetResponse response = consumer.getOffsetsBefore(request);
>         if (response.hasError()) {
>             System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition));
>             return 0;
>         }
>         long[] offsets = response.offsets(topic, partition);
>         return offsets[0];
>     }
>     /**
>      * Return the lead broker for a given topic and partition
>      *
>      * @param seedBrokers
>      * @param port
>      * @param topic
>      * @param partition
>      * @return
>      */
>     private PartitionMetadata findLeaderAndUpdateSelfPointers(List<String> seedBrokers, int port, String topic, int partition) {
>         PartitionMetadata returnMetaData = null;
>         loop:
>         for (String seed : seedBrokers) {
>             SimpleConsumer consumer = null;
>             try {
>                 this.consumer = new SimpleConsumer(seed, port, 100000, 64 * 1024, "leaderLookup");
>                 List<String> topics = Collections.singletonList(topic);
>                 TopicMetadataRequest req = new TopicMetadataRequest(topics);
>                 kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);
>                 List<TopicMetadata> metaData = resp.topicsMetadata();
>                 for (TopicMetadata item : metaData) {
>                     for (PartitionMetadata part : item.partitionsMetadata()) {
>                         if (part.partitionId() == partition) {
>                             returnMetaData = part;
>                             LOG.info("Found leader " + returnMetaData.leader().host());
>                             break loop;
>                         }
>                     }
>                 }
>             } catch (Exception e) {
>                 LOG.info("Error communicating with Broker [" + seed + "] to find Leader for [" + topic
>                         + ", " + partition + "] Reason: " + e);
>             } finally {
>                 if (consumer != null) consumer.close();
>             }
>         }
>         LOG.info("KafkaConsumerDelegate initializing self pointers ");
>         if (returnMetaData != null) {
>             currentLeader = returnMetaData.leader();
>             if (currentLeader != null) {
>                 this.consumer = new SimpleConsumer(currentLeader.host(), currentLeader.port(), 100000, 64 * 1024, clientName);
>             }
>         }
>         LOG.info("KafkaConsumerDelegate: returning metadata");
>         return returnMetaData;
>     }
> *******************************



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)