You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Neha Narkhede (JIRA)" <ji...@apache.org> on 2015/01/26 02:25:35 UTC

[jira] [Comment Edited] (KAFKA-1886) SimpleConsumer swallowing ClosedByInterruptException

    [ https://issues.apache.org/jira/browse/KAFKA-1886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14284963#comment-14284963 ] 

Neha Narkhede edited comment on KAFKA-1886 at 1/26/15 1:25 AM:
---------------------------------------------------------------

If interested, I hacked an existing test for this.

{code}
def testConsumerEmptyTopic() {
      val newTopic = "new-topic"
      TestUtils.createTopic(zkClient, newTopic, numPartitions = 1, replicationFactor = 1, servers = servers)
      val thread = new Thread {
        override def run {
          System.out.println("Starting the fetch")
          val start = System.currentTimeMillis()
          try
          {
            val fetchResponse = consumer.fetch(new FetchRequestBuilder().minBytes(100000).maxWait(3000).addFetch(newTopic, 0, 0, 10000).build())
          }
          catch {
          case e: Throwable =>{
            val  end = System.currentTimeMillis()
            System.out.println("Caught exception" + e + ". Took " + (end - start));
            System.out.println("Fetch interrupted " + Thread.currentThread().isInterrupted)
          }
          }
        }
      }

     thread.start()
      Thread.sleep(1000)
      thread.interrupt()
      thread.join()
      System.out.println("Ending test")
  }
{code}


was (Author: auradkar):
If interested, I hacked an existing test for this.

def testConsumerEmptyTopic() {
      val newTopic = "new-topic"
      TestUtils.createTopic(zkClient, newTopic, numPartitions = 1, replicationFactor = 1, servers = servers)
      val thread = new Thread {
        override def run {
          System.out.println("Starting the fetch")
          val start = System.currentTimeMillis()
          try
          {
            val fetchResponse = consumer.fetch(new FetchRequestBuilder().minBytes(100000).maxWait(3000).addFetch(newTopic, 0, 0, 10000).build())
          }
          catch {
          case e: Throwable =>{
            val  end = System.currentTimeMillis()
            System.out.println("Caught exception" + e + ". Took " + (end - start));
            System.out.println("Fetch interrupted " + Thread.currentThread().isInterrupted)
          }
          }
        }
      }

     thread.start()
      Thread.sleep(1000)
      thread.interrupt()
      thread.join()
      System.out.println("Ending test")
  }

> SimpleConsumer swallowing ClosedByInterruptException
> ----------------------------------------------------
>
>                 Key: KAFKA-1886
>                 URL: https://issues.apache.org/jira/browse/KAFKA-1886
>             Project: Kafka
>          Issue Type: Bug
>          Components: producer 
>            Reporter: Aditya A Auradkar
>            Assignee: Jun Rao
>         Attachments: KAFKA-1886.patch
>
>
> This issue was originally reported by a Samza developer. I've included an exchange of mine with Chris Riccomini. I'm trying to reproduce the problem on my dev setup.
> From: criccomi
> Hey all,
> Samza's BrokerProxy [1] threads appear to be wedging randomly when we try to interrupt its fetcher thread. I noticed that SimpleConsumer.scala catches Throwable in its sendRequest method [2]. I'm wondering: if blockingChannel.send/receive throws a ClosedByInterruptException
> when the thread is interrupted, what happens? It looks like sendRequest will catch the exception (which I
> think clears the thread's interrupted flag), and then retries the send. If the send succeeds on the retry, I think that the ClosedByInterruptException exception is effectively swallowed, and the BrokerProxy will continue
> fetching messages as though its thread was never interrupted.
> Am I misunderstanding how things work?
> Cheers,
> Chris
> [1] https://github.com/apache/incubator-samza/blob/master/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala#L126
> [2] https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/consumer/SimpleConsumer.scala#L75



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)