You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ch...@apache.org on 2018/08/06 08:22:31 UTC

[incubator-openwhisk] branch master updated: Synchronize access to the KafkaConsumer. (#3921)

This is an automated email from the ASF dual-hosted git repository.

chetanm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 23e9268  Synchronize access to the KafkaConsumer. (#3921)
23e9268 is described below

commit 23e926885afb75e6b8057ff191df6b175451a656
Author: Markus Thömmes <ma...@me.com>
AuthorDate: Mon Aug 6 10:22:28 2018 +0200

    Synchronize access to the KafkaConsumer. (#3921)
    
    KafkaConsumer is not thread safe and hence should not be accessed concurrently. This is done by synchronizing the access to consumer instance. The cost of synchronization should be relatively low due to low contention between the monitoring code running (default 60 seconds) and fetching new entries.
---
 .../connector/kafka/KafkaConsumerConnector.scala     | 20 ++++++++++++--------
 1 file changed, 12 insertions(+), 8 deletions(-)

diff --git a/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala b/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala
index bc88907..1f2ea2b 100644
--- a/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala
+++ b/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala
@@ -74,7 +74,7 @@ class KafkaConsumerConnector(
     }
 
     try {
-      val response = consumer.poll(duration).asScala
+      val response = synchronized(consumer.poll(duration)).asScala
 
       // Cancel the scheduled wake-up task immediately.
       wakeUpTask.cancel()
@@ -120,7 +120,7 @@ class KafkaConsumerConnector(
    */
   def commit(retry: Int = 3): Unit =
     try {
-      consumer.commitSync()
+      synchronized(consumer.commitSync())
     } catch {
       case e: RetriableException =>
         if (retry > 0) {
@@ -132,7 +132,7 @@ class KafkaConsumerConnector(
         }
     }
 
-  override def close(): Unit = {
+  override def close(): Unit = synchronized {
     logging.info(this, s"closing consumer for '$topic'")
     consumer.close()
   }
@@ -149,11 +149,14 @@ class KafkaConsumerConnector(
     verifyConfig(config, ConsumerConfig.configNames().asScala.toSet)
 
     val consumer = new KafkaConsumer(config, new ByteArrayDeserializer, new ByteArrayDeserializer)
+
+    // subscribe does not need to be synchronized, because the reference to the consumer hasn't been returned yet and
+    // thus this is guaranteed only to be called by the calling thread.
     consumer.subscribe(Seq(topic).asJavaCollection)
     consumer
   }
 
-  private def recreateConsumer(): Unit = {
+  private def recreateConsumer(): Unit = synchronized {
     logging.info(this, s"recreating consumer for '$topic'")
     try {
       consumer.close()
@@ -182,10 +185,11 @@ class KafkaConsumerConnector(
       blocking {
         if (offset > 0) {
           val topicAndPartition = new TopicPartition(topic, 0)
-          consumer.endOffsets(Set(topicAndPartition).asJava).asScala.get(topicAndPartition).foreach { endOffset =>
-            // endOffset could lag behind the offset reported by the consumer internally resulting in negative numbers
-            val queueSize = (endOffset - offset).max(0)
-            MetricEmitter.emitHistogramMetric(queueMetric, queueSize)
+          synchronized(consumer.endOffsets(Set(topicAndPartition).asJava)).asScala.get(topicAndPartition).foreach {
+            endOffset =>
+              // endOffset could lag behind the offset reported by the consumer internally resulting in negative numbers
+              val queueSize = (endOffset - offset).max(0)
+              MetricEmitter.emitHistogramMetric(queueMetric, queueSize)
           }
         }
       }