You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/02/25 21:34:00 UTC

[jira] [Commented] (KAFKA-2704) SimpleConsumer should throw InterruptedException when interrupted

    [ https://issues.apache.org/jira/browse/KAFKA-2704?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16376258#comment-16376258 ] 

ASF GitHub Bot commented on KAFKA-2704:
---------------------------------------

hachikuji closed pull request #810: KAFKA-2704:Make SimpleConsumer threadsafe
URL: https://github.com/apache/kafka/pull/810
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
index e15aca4b4ba..2d5ecb6c4e7 100644
--- a/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
+++ b/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
@@ -17,7 +17,7 @@
 
 package kafka.consumer
 
-
+import java.io.IOException
 import java.nio.channels.{AsynchronousCloseException, ClosedByInterruptException}
 import java.util.concurrent.TimeUnit
 
@@ -42,22 +42,31 @@ class SimpleConsumer(val host: String,
   private val lock = new Object()
   private val blockingChannel = new BlockingChannel(host, port, bufferSize, BlockingChannel.UseDefaultBufferSize, soTimeout)
   private val fetchRequestAndResponseStats = FetchRequestAndResponseStatsRegistry.getFetchRequestAndResponseStats(clientId)
-  private var isClosed = false
 
+  @nonthreadsafe
+  private def isConnected = blockingChannel.isConnected
+
+  @nonthreadsafe
   private def connect(): BlockingChannel = {
-    close
-    blockingChannel.connect()
+    if (!isConnected) {
+      blockingChannel.connect()
+    }
     blockingChannel
   }
-
+ 
+ @nonthreadsafe
   private def disconnect() = {
     debug("Disconnecting from " + formatAddress(host, port))
     blockingChannel.disconnect()
   }
 
   private def reconnect() {
-    disconnect()
-    connect()
+    lock synchronized {
+      if (isConnected) {
+        disconnect()
+      }
+      connect()
+    }
   }
 
   /**
@@ -70,18 +79,20 @@ class SimpleConsumer(val host: String,
     disconnect()
   }
 
+  @throws[IOException]
   def close() {
     lock synchronized {
       disconnect()
-      isClosed = true
     }
   }
   
+  @throws[IOException]
+  @throws[InterruptedException]
   private def sendRequest(request: RequestOrResponse): NetworkReceive = {
     lock synchronized {
       var response: NetworkReceive = null
       try {
-        getOrMakeConnection()
+        connect()
         blockingChannel.send(request)
         response = blockingChannel.receive()
       } catch {
@@ -107,11 +118,15 @@ class SimpleConsumer(val host: String,
     }
   }
 
+  @throws[IOException]
+  @throws[InterruptedException]
   def send(request: TopicMetadataRequest): TopicMetadataResponse = {
     val response = sendRequest(request)
     TopicMetadataResponse.readFrom(response.payload())
   }
 
+  @throws[IOException]
+  @throws[InterruptedException]
   def send(request: GroupCoordinatorRequest): GroupCoordinatorResponse = {
     val response = sendRequest(request)
     GroupCoordinatorResponse.readFrom(response.payload())
@@ -123,6 +138,8 @@ class SimpleConsumer(val host: String,
    *  @param request  specifies the topic name, topic partition, starting byte offset, maximum bytes to be fetched.
    *  @return a set of fetched messages
    */
+  @throws[IOException]
+  @throws[InterruptedException]
   def fetch(request: FetchRequest): FetchResponse = {
     var response: NetworkReceive = null
     val specificTimer = fetchRequestAndResponseStats.getFetchRequestAndResponseStats(host, port).requestTimer
@@ -146,6 +163,8 @@ class SimpleConsumer(val host: String,
    *  @param request a [[kafka.api.OffsetRequest]] object.
    *  @return a [[kafka.api.OffsetResponse]] object.
    */
+  @throws[IOException]
+  @throws[InterruptedException]
   def getOffsetsBefore(request: OffsetRequest) = OffsetResponse.readFrom(sendRequest(request).payload())
 
   /**
@@ -154,6 +173,8 @@ class SimpleConsumer(val host: String,
    * @param request a [[kafka.api.OffsetCommitRequest]] object.
    * @return a [[kafka.api.OffsetCommitResponse]] object.
    */
+  @throws[IOException]
+  @throws[InterruptedException]
   def commitOffsets(request: OffsetCommitRequest) = {
     // TODO: With KAFKA-1012, we have to first issue a ConsumerMetadataRequest and connect to the coordinator before
     // we can commit offsets.
@@ -166,13 +187,10 @@ class SimpleConsumer(val host: String,
    * @param request a [[kafka.api.OffsetFetchRequest]] object.
    * @return a [[kafka.api.OffsetFetchResponse]] object.
    */
+  @throws[IOException]
+  @throws[InterruptedException]
   def fetchOffsets(request: OffsetFetchRequest) = OffsetFetchResponse.readFrom(sendRequest(request).payload())
 
-  private def getOrMakeConnection() {
-    if(!isClosed && !blockingChannel.isConnected) {
-      connect()
-    }
-  }
 
   /**
    * Get the earliest or latest offset of a given topic, partition.
@@ -181,6 +199,8 @@ class SimpleConsumer(val host: String,
    * @param consumerId Id of the consumer which could be a consumer client, SimpleConsumerShell or a follower broker.
    * @return Requested offset.
    */
+  @throws[IOException]
+  @throws[InterruptedException]
   def earliestOrLatestOffset(topicAndPartition: TopicAndPartition, earliestOrLatest: Long, consumerId: Int): Long = {
     val request = OffsetRequest(requestInfo = Map(topicAndPartition -> PartitionOffsetRequestInfo(earliestOrLatest, 1)),
                                 clientId = clientId,


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> SimpleConsumer should throw InterruptedException when interrupted
> -----------------------------------------------------------------
>
>                 Key: KAFKA-2704
>                 URL: https://issues.apache.org/jira/browse/KAFKA-2704
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 0.8.2.1
>            Reporter: Hitoshi Ozawa
>            Priority: Major
>         Attachments: simpleconsumer_2.patch
>
>
> SimpleConsumer does not throw InterruptedException when interrupted



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)