You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@openwhisk.apache.org by GitBox <gi...@apache.org> on 2018/04/26 12:38:27 UTC

[GitHub] vvraskin closed pull request #3325: Refactor Kafka clients for more generalized configuration reading.

vvraskin closed pull request #3325: Refactor Kafka clients for more generalized configuration reading.
URL: https://github.com/apache/incubator-openwhisk/pull/3325
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala b/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala
index 20d56356bf..16cbe35415 100644
--- a/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala
+++ b/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala
@@ -17,8 +17,6 @@
 
 package whisk.connector.kafka
 
-import java.util.Properties
-
 import akka.actor.ActorSystem
 import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
 import org.apache.kafka.common.TopicPartition
@@ -26,10 +24,10 @@ import org.apache.kafka.common.errors.{RetriableException, WakeupException}
 import org.apache.kafka.common.serialization.ByteArrayDeserializer
 import pureconfig.loadConfigOrThrow
 import whisk.common.{Logging, LoggingMarkers, MetricEmitter, Scheduler}
+import whisk.connector.kafka.KafkaConfiguration._
 import whisk.core.ConfigKeys
 import whisk.core.connector.MessageConsumer
 
-import scala.collection.JavaConversions.{iterableAsScalaIterable, seqAsJavaList}
 import scala.collection.JavaConverters._
 import scala.concurrent.duration._
 import scala.concurrent.{blocking, ExecutionContext, Future}
@@ -67,7 +65,7 @@ class KafkaConsumerConnector(
     val wakeUpTask = actorSystem.scheduler.scheduleOnce(cfg.sessionTimeoutMs.milliseconds + 1.second)(consumer.wakeup())
 
     try {
-      val response = consumer.poll(duration.toMillis).map(r => (r.topic, r.partition, r.offset, r.value))
+      val response = consumer.poll(duration.toMillis).asScala.map(r => (r.topic, r.partition, r.offset, r.value))
       response.lastOption.foreach {
         case (_, _, newOffset, _) => offset = newOffset + 1
       }
@@ -112,40 +110,28 @@ class KafkaConsumerConnector(
     logging.info(this, s"closing '$topic' consumer")
   }
 
-  private def getProps: Properties = {
-    val props = new Properties
-    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupid)
-    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkahost)
-    props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPeek.toString)
-
-    val config =
-      KafkaConfiguration.configMapToKafkaConfig(loadConfigOrThrow[Map[String, String]](ConfigKeys.kafkaCommon)) ++
-        KafkaConfiguration.configMapToKafkaConfig(loadConfigOrThrow[Map[String, String]](ConfigKeys.kafkaConsumer))
-    config.foreach {
-      case (key, value) => props.put(key, value)
-    }
-    props
-  }
-
   /** Creates a new kafka consumer and subscribes to topic list if given. */
-  private def getConsumer(props: Properties, topics: Option[List[String]] = None) = {
-    val keyDeserializer = new ByteArrayDeserializer
-    val valueDeserializer = new ByteArrayDeserializer
-    val consumer = new KafkaConsumer(props, keyDeserializer, valueDeserializer)
-    topics.foreach(consumer.subscribe(_))
+  private def createConsumer(topic: String) = {
+    val config = Map(
+      ConsumerConfig.GROUP_ID_CONFIG -> groupid,
+      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> kafkahost,
+      ConsumerConfig.MAX_POLL_RECORDS_CONFIG -> maxPeek.toString) ++
+      configMapToKafkaConfig(loadConfigOrThrow[Map[String, String]](ConfigKeys.kafkaCommon)) ++
+      configMapToKafkaConfig(loadConfigOrThrow[Map[String, String]](ConfigKeys.kafkaConsumer))
+
+    val consumer = new KafkaConsumer(config, new ByteArrayDeserializer, new ByteArrayDeserializer)
+    consumer.subscribe(Seq(topic).asJavaCollection)
     consumer
   }
 
   private def recreateConsumer(): Unit = {
     val oldConsumer = consumer
-    Future {
-      oldConsumer.close()
-      logging.info(this, s"old consumer closed")
-    }
-    consumer = getConsumer(getProps, Some(List(topic)))
+    oldConsumer.close()
+    logging.info(this, s"old consumer closed")
+    consumer = createConsumer(topic)
   }
 
-  @volatile private var consumer = getConsumer(getProps, Some(List(topic)))
+  @volatile private var consumer: KafkaConsumer[Array[Byte], Array[Byte]] = createConsumer(topic)
 
   // Read current lag of the consumed topic, e.g. invoker queue
   // Since we use only one partition in kafka, it is defined 0
diff --git a/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala b/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala
index 6b0fc1415e..e939a4637e 100644
--- a/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala
+++ b/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala
@@ -20,22 +20,16 @@ package whisk.connector.kafka
 import java.util.Properties
 import java.util.concurrent.ExecutionException
 
-import scala.concurrent.duration.FiniteDuration
 import akka.actor.ActorSystem
-
-import scala.concurrent.duration._
-import scala.collection.JavaConverters._
-import org.apache.kafka.clients.admin.AdminClientConfig
-import org.apache.kafka.clients.admin.AdminClient
-import org.apache.kafka.clients.admin.NewTopic
+import org.apache.kafka.clients.admin.{AdminClient, AdminClientConfig, NewTopic}
 import org.apache.kafka.common.errors.TopicExistsException
-import whisk.common.Logging
-import whisk.core.ConfigKeys
-import whisk.core.WhiskConfig
-import whisk.core.connector.MessageConsumer
-import whisk.core.connector.MessageProducer
-import whisk.core.connector.MessagingProvider
 import pureconfig._
+import whisk.common.Logging
+import whisk.core.{ConfigKeys, WhiskConfig}
+import whisk.core.connector.{MessageConsumer, MessageProducer, MessagingProvider}
+
+import scala.collection.JavaConverters._
+import scala.concurrent.duration.FiniteDuration
 
 case class KafkaConfig(replicationFactor: Short)
 
@@ -43,6 +37,7 @@ case class KafkaConfig(replicationFactor: Short)
  * A Kafka based implementation of MessagingProvider
  */
 object KafkaMessagingProvider extends MessagingProvider {
+  import KafkaConfiguration._
 
   def getConsumer(config: WhiskConfig, groupId: String, topic: String, maxPeek: Int, maxPollInterval: FiniteDuration)(
     implicit logging: Logging,
@@ -56,18 +51,10 @@ object KafkaMessagingProvider extends MessagingProvider {
     val kc = loadConfigOrThrow[KafkaConfig](ConfigKeys.kafka)
     val tc = KafkaConfiguration.configMapToKafkaConfig(
       loadConfigOrThrow[Map[String, String]](ConfigKeys.kafkaTopics + s".$topicConfig"))
-    val props = new Properties
-    props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, config.kafkaHosts)
-
-    val commonConfig =
-      KafkaConfiguration.configMapToKafkaConfig(loadConfigOrThrow[Map[String, String]](ConfigKeys.kafkaCommon))
-    commonConfig.foreach {
-      case (key, value) => {
-        props.put(key, value)
-      }
-    }
 
-    val client = AdminClient.create(props)
+    val baseConfig = Map(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG -> config.kafkaHosts)
+    val commonConfig = configMapToKafkaConfig(loadConfigOrThrow[Map[String, String]](ConfigKeys.kafkaCommon))
+    val client = AdminClient.create(baseConfig ++ commonConfig)
     val numPartitions = 1
     val nt = new NewTopic(topic, numPartitions, kc.replicationFactor).configs(tc.asJava)
     val results = client.createTopics(List(nt).asJava)
@@ -89,9 +76,24 @@ object KafkaMessagingProvider extends MessagingProvider {
 }
 
 object KafkaConfiguration {
-  def configToKafkaKey(configKey: String) = configKey.replace("-", ".")
+  import scala.language.implicitConversions
+
+  implicit def mapToProperties(map: Map[String, String]): Properties = {
+    val props = new Properties()
+    map.foreach { case (key, value) => props.setProperty(key, value) }
+    props
+  }
+
+  /**
+   * Converts TypesafeConfig keys to a KafkaConfig key.
+   *
+   * TypesafeConfig's keys are usually kebab-cased (dash-delimited), whereas KafkaConfig keys are dot.delimited. This
+   * converts an example-key-to-illustrate to example.key.to.illustrate.
+   */
+  def configToKafkaKey(configKey: String): String = configKey.replace("-", ".")
 
-  def configMapToKafkaConfig(configMap: Map[String, String]) = configMap.map {
+  /** Converts a Map read from TypesafeConfig to a Map to be read by Kafka clients. */
+  def configMapToKafkaConfig(configMap: Map[String, String]): Map[String, String] = configMap.map {
     case (key, value) => configToKafkaKey(key) -> value
   }
 }
diff --git a/common/scala/src/main/scala/whisk/connector/kafka/KafkaProducerConnector.scala b/common/scala/src/main/scala/whisk/connector/kafka/KafkaProducerConnector.scala
index 125dbef604..f82acaf85d 100644
--- a/common/scala/src/main/scala/whisk/connector/kafka/KafkaProducerConnector.scala
+++ b/common/scala/src/main/scala/whisk/connector/kafka/KafkaProducerConnector.scala
@@ -17,20 +17,14 @@
 
 package whisk.connector.kafka
 
-import java.util.Properties
-
 import akka.actor.ActorSystem
 import akka.pattern.after
 import org.apache.kafka.clients.producer._
-import org.apache.kafka.common.errors.{
-  NotEnoughReplicasAfterAppendException,
-  RecordTooLargeException,
-  RetriableException,
-  TimeoutException
-}
+import org.apache.kafka.common.errors._
 import org.apache.kafka.common.serialization.StringSerializer
 import pureconfig._
 import whisk.common.{Counter, Logging, TransactionId}
+import whisk.connector.kafka.KafkaConfiguration._
 import whisk.core.ConfigKeys
 import whisk.core.connector.{Message, MessageProducer}
 import whisk.core.entity.UUIDs
@@ -97,35 +91,20 @@ class KafkaProducerConnector(kafkahosts: String, id: String = UUIDs.randomUUID()
 
   private val sentCounter = new Counter()
 
-  private def getProps: Properties = {
-    val props = new Properties
-    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkahosts)
-
-    // Load additional config from the config files and add them here.
-    val config =
-      KafkaConfiguration.configMapToKafkaConfig(loadConfigOrThrow[Map[String, String]](ConfigKeys.kafkaCommon)) ++
-        KafkaConfiguration.configMapToKafkaConfig(loadConfigOrThrow[Map[String, String]](ConfigKeys.kafkaProducer))
-
-    config.foreach {
-      case (key, value) => props.put(key, value)
-    }
-    props
-  }
+  private def createProducer(): KafkaProducer[String, String] = {
+    val config = Map(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> kafkahosts) ++
+      configMapToKafkaConfig(loadConfigOrThrow[Map[String, String]](ConfigKeys.kafkaCommon)) ++
+      configMapToKafkaConfig(loadConfigOrThrow[Map[String, String]](ConfigKeys.kafkaProducer))
 
-  private def getProducer(props: Properties): KafkaProducer[String, String] = {
-    val keySerializer = new StringSerializer
-    val valueSerializer = new StringSerializer
-    new KafkaProducer(props, keySerializer, valueSerializer)
+    new KafkaProducer(config, new StringSerializer, new StringSerializer)
   }
 
   private def recreateProducer(): Unit = {
     val oldProducer = producer
-    Future {
-      oldProducer.close()
-      logging.info(this, s"old consumer closed")
-    }
-    producer = getProducer(getProps)
+    oldProducer.close()
+    logging.info(this, s"old producer closed")
+    producer = createProducer()
   }
 
-  @volatile private var producer = getProducer(getProps)
+  @volatile private var producer = createProducer()
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services