You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ma...@apache.org on 2018/02/26 09:55:21 UTC

[incubator-openwhisk] branch master updated: Enhance Kafka message provider for resiliency. (#3072)

This is an automated email from the ASF dual-hosted git repository.

markusthoemmes pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new a91feb0  Enhance Kafka message provider for resiliency. (#3072)
a91feb0 is described below

commit a91feb013dde79eca2e892f70840157f9b18ccb4
Author: Sang Heon Lee <de...@gmail.com>
AuthorDate: Mon Feb 26 18:55:19 2018 +0900

    Enhance Kafka message provider for resiliency. (#3072)
    
    Co-authored-by: Markus Thömmes <ma...@me.com>
---
 common/scala/src/main/resources/application.conf   |  2 +
 .../connector/kafka/KafkaConsumerConnector.scala   | 90 +++++++++++++++++-----
 .../connector/kafka/KafkaMessagingProvider.scala   | 12 ++-
 .../connector/kafka/KafkaProducerConnector.scala   | 79 +++++++++++--------
 .../whisk/core/connector/MessageConsumer.scala     |  4 +-
 .../whisk/core/connector/MessagingProvider.scala   | 16 ++--
 .../core/database/RemoteCacheInvalidation.scala    |  5 +-
 .../core/loadBalancer/ContainerPoolBalancer.scala  |  2 +-
 .../ShardingContainerPoolBalancer.scala            |  2 +-
 .../main/scala/whisk/core/invoker/Invoker.scala    |  2 +-
 tests/src/test/scala/ha/ShootComponentsTests.scala | 70 +++++++++--------
 .../test/scala/services/KafkaConnectorTests.scala  | 44 ++++++++---
 .../core/connector/test/MessageFeedTests.scala     |  2 +-
 .../whisk/core/connector/test/TestConnector.scala  |  6 +-
 14 files changed, 219 insertions(+), 117 deletions(-)

diff --git a/common/scala/src/main/resources/application.conf b/common/scala/src/main/resources/application.conf
index 3e09bba..1b9b739 100644
--- a/common/scala/src/main/resources/application.conf
+++ b/common/scala/src/main/resources/application.conf
@@ -60,6 +60,8 @@ whisk {
         producer {
             acks = 1
             max-request-size = ${whisk.activation.payload.max}
+            request-timeout-ms = 30000
+            metadata-max-age-ms = 15000
         }
         consumer {
             session-timeout-ms = 30000
diff --git a/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala b/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala
index 09cd3ac..51a27ad 100644
--- a/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala
+++ b/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala
@@ -19,43 +19,86 @@ package whisk.connector.kafka
 
 import java.util.Properties
 
-import scala.collection.JavaConversions.iterableAsScalaIterable
-import scala.collection.JavaConversions.seqAsJavaList
-import scala.concurrent.duration.Duration
-import scala.concurrent.duration.DurationInt
-import org.apache.kafka.clients.consumer.ConsumerConfig
-import org.apache.kafka.clients.consumer.KafkaConsumer
+import akka.actor.ActorSystem
+import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
+import org.apache.kafka.common.errors.{RetriableException, WakeupException}
 import org.apache.kafka.common.serialization.ByteArrayDeserializer
 import pureconfig.loadConfigOrThrow
 import whisk.common.Logging
 import whisk.core.ConfigKeys
 import whisk.core.connector.MessageConsumer
 
-class KafkaConsumerConnector(kafkahost: String,
-                             groupid: String,
-                             topic: String,
-                             override val maxPeek: Int = Int.MaxValue)(implicit logging: Logging)
+import scala.collection.JavaConversions.{iterableAsScalaIterable, seqAsJavaList}
+import scala.concurrent.duration._
+import scala.concurrent.{ExecutionContext, Future}
+
+case class KafkaConsumerConfig(sessionTimeoutMs: Long)
+
+class KafkaConsumerConnector(
+  kafkahost: String,
+  groupid: String,
+  topic: String,
+  override val maxPeek: Int = Int.MaxValue)(implicit logging: Logging, actorSystem: ActorSystem)
     extends MessageConsumer {
 
+  implicit val ec: ExecutionContext = actorSystem.dispatcher
+  private val gracefulWaitTime = 100.milliseconds
+
+  // The consumer is generally configured via getProps. This configuration only loads values necessary for "outer"
+  // logic, like the wakeup timer.
+  private val cfg = loadConfigOrThrow[KafkaConsumerConfig](ConfigKeys.kafkaConsumer)
+
   /**
    * Long poll for messages. Method returns once message are available but no later than given
    * duration.
    *
    * @param duration the maximum duration for the long poll
    */
-  override def peek(duration: Duration = 500.milliseconds) = {
-    val records = consumer.poll(duration.toMillis)
-    records map { r =>
-      (r.topic, r.partition, r.offset, r.value)
-    }
+  override def peek(duration: FiniteDuration = 500.milliseconds,
+                    retry: Int = 3): Iterable[(String, Int, Long, Array[Byte])] = {
+
+    // poll can be infinitely blocked in edge-cases, so we need to wakeup explicitly.
+    val wakeUpTask = actorSystem.scheduler.scheduleOnce(cfg.sessionTimeoutMs.milliseconds + 1.second)(consumer.wakeup())
+
+    try {
+      consumer.poll(duration.toMillis).map(r => (r.topic, r.partition, r.offset, r.value))
+    } catch {
+      // Happens if the peek hangs.
+      case _: WakeupException if retry > 0 =>
+        logging.error(this, s"poll timeout occurred. Retrying $retry more times.")
+        Thread.sleep(gracefulWaitTime.toMillis) // using Thread.sleep is okay, since `poll` is blocking anyway
+        peek(duration, retry - 1)
+      case e: RetriableException if retry > 0 =>
+        logging.error(this, s"$e: Retrying $retry more times")
+        wakeUpTask.cancel()
+        Thread.sleep(gracefulWaitTime.toMillis) // using Thread.sleep is okay, since `poll` is blocking anyway
+        peek(duration, retry - 1)
+      // Every other error results in a restart of the consumer
+      case e: Throwable =>
+        recreateConsumer()
+        throw e
+    } finally wakeUpTask.cancel()
   }
 
   /**
    * Commits offsets from last poll.
    */
-  def commit() = consumer.commitSync()
+  def commit(retry: Int = 3): Unit =
+    try {
+      consumer.commitSync()
+    } catch {
+      case e: RetriableException =>
+        if (retry > 0) {
+          logging.error(this, s"$e: retrying $retry more times")
+          Thread.sleep(gracefulWaitTime.toMillis) // using Thread.sleep is okay, since `commitSync` is blocking anyway
+          commit(retry - 1)
+        } else {
+          throw e
+        }
+    }
 
-  override def close() = {
+  override def close(): Unit = {
+    consumer.close()
     logging.info(this, s"closing '$topic' consumer")
   }
 
@@ -79,9 +122,18 @@ class KafkaConsumerConnector(kafkahost: String,
     val keyDeserializer = new ByteArrayDeserializer
     val valueDeserializer = new ByteArrayDeserializer
     val consumer = new KafkaConsumer(props, keyDeserializer, valueDeserializer)
-    topics foreach { consumer.subscribe(_) }
+    topics.foreach(consumer.subscribe(_))
     consumer
   }
 
-  private val consumer = getConsumer(getProps, Some(List(topic)))
+  private def recreateConsumer(): Unit = {
+    val oldConsumer = consumer
+    Future {
+      oldConsumer.close()
+      logging.info(this, s"old consumer closed")
+    }
+    consumer = getConsumer(getProps, Some(List(topic)))
+  }
+
+  @volatile private var consumer = getConsumer(getProps, Some(List(topic)))
 }
diff --git a/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala b/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala
index 3c8df85..6b0fc14 100644
--- a/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala
+++ b/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala
@@ -20,8 +20,10 @@ package whisk.connector.kafka
 import java.util.Properties
 import java.util.concurrent.ExecutionException
 
-import scala.concurrent.ExecutionContext
 import scala.concurrent.duration.FiniteDuration
+import akka.actor.ActorSystem
+
+import scala.concurrent.duration._
 import scala.collection.JavaConverters._
 import org.apache.kafka.clients.admin.AdminClientConfig
 import org.apache.kafka.clients.admin.AdminClient
@@ -43,11 +45,12 @@ case class KafkaConfig(replicationFactor: Short)
 object KafkaMessagingProvider extends MessagingProvider {
 
   def getConsumer(config: WhiskConfig, groupId: String, topic: String, maxPeek: Int, maxPollInterval: FiniteDuration)(
-    implicit logging: Logging): MessageConsumer =
+    implicit logging: Logging,
+    actorSystem: ActorSystem): MessageConsumer =
     new KafkaConsumerConnector(config.kafkaHosts, groupId, topic, maxPeek)
 
-  def getProducer(config: WhiskConfig, ec: ExecutionContext)(implicit logging: Logging): MessageProducer =
-    new KafkaProducerConnector(config.kafkaHosts, ec)
+  def getProducer(config: WhiskConfig)(implicit logging: Logging, actorSystem: ActorSystem): MessageProducer =
+    new KafkaProducerConnector(config.kafkaHosts)
 
   def ensureTopic(config: WhiskConfig, topic: String, topicConfig: String)(implicit logging: Logging): Boolean = {
     val kc = loadConfigOrThrow[KafkaConfig](ConfigKeys.kafka)
@@ -55,6 +58,7 @@ object KafkaMessagingProvider extends MessagingProvider {
       loadConfigOrThrow[Map[String, String]](ConfigKeys.kafkaTopics + s".$topicConfig"))
     val props = new Properties
     props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, config.kafkaHosts)
+
     val commonConfig =
       KafkaConfiguration.configMapToKafkaConfig(loadConfigOrThrow[Map[String, String]](ConfigKeys.kafkaCommon))
     commonConfig.foreach {
diff --git a/common/scala/src/main/scala/whisk/connector/kafka/KafkaProducerConnector.scala b/common/scala/src/main/scala/whisk/connector/kafka/KafkaProducerConnector.scala
index dca7cce..0c511f3 100644
--- a/common/scala/src/main/scala/whisk/connector/kafka/KafkaProducerConnector.scala
+++ b/common/scala/src/main/scala/whisk/connector/kafka/KafkaProducerConnector.scala
@@ -19,36 +19,38 @@ package whisk.connector.kafka
 
 import java.util.Properties
 
-import scala.concurrent.ExecutionContext
-import scala.concurrent.Future
-import scala.concurrent.Promise
-import scala.util.Failure
-import scala.util.Success
-import org.apache.kafka.clients.producer.Callback
-import org.apache.kafka.clients.producer.KafkaProducer
-import org.apache.kafka.clients.producer.ProducerConfig
-import org.apache.kafka.clients.producer.ProducerRecord
-import org.apache.kafka.clients.producer.RecordMetadata
-import org.apache.kafka.common.errors.NotLeaderForPartitionException
+import akka.actor.ActorSystem
+import akka.pattern.after
+import org.apache.kafka.clients.producer._
+import org.apache.kafka.common.errors.{
+  NotEnoughReplicasAfterAppendException,
+  RecordTooLargeException,
+  RetriableException,
+  TimeoutException
+}
 import org.apache.kafka.common.serialization.StringSerializer
-import whisk.common.Counter
-import whisk.common.Logging
-import whisk.core.connector.Message
-import whisk.core.connector.MessageProducer
-import whisk.core.entity.UUIDs
 import pureconfig._
+import whisk.common.{Counter, Logging, TransactionId}
 import whisk.core.ConfigKeys
+import whisk.core.connector.{Message, MessageProducer}
+import whisk.core.entity.UUIDs
+
+import scala.concurrent.duration._
+import scala.concurrent.{ExecutionContext, Future, Promise}
+import scala.util.{Failure, Success}
 
-class KafkaProducerConnector(kafkahosts: String,
-                             implicit val executionContext: ExecutionContext,
-                             id: String = UUIDs.randomUUID().toString)(implicit logging: Logging)
+class KafkaProducerConnector(kafkahosts: String, id: String = UUIDs.randomUUID().toString)(implicit logging: Logging,
+                                                                                           actorSystem: ActorSystem)
     extends MessageProducer {
 
-  override def sentCount() = sentCounter.cur
+  implicit val ec: ExecutionContext = actorSystem.dispatcher
+  private val gracefulWaitTime = 100.milliseconds
+
+  override def sentCount(): Long = sentCounter.cur
 
   /** Sends msg to topic. This is an asynchronous operation. */
-  override def send(topic: String, msg: Message, retry: Int = 2): Future[RecordMetadata] = {
-    implicit val transid = msg.transid
+  override def send(topic: String, msg: Message, retry: Int = 3): Future[RecordMetadata] = {
+    implicit val transid: TransactionId = msg.transid
     val record = new ProducerRecord[String, String](topic, "messages", msg.serialize)
     val produced = Promise[RecordMetadata]()
 
@@ -66,17 +68,25 @@ class KafkaProducerConnector(kafkahosts: String,
       case Failure(t) =>
         logging.error(this, s"sending message on topic '$topic' failed: ${t.getMessage}")
     } recoverWith {
-      case t: NotLeaderForPartitionException =>
-        if (retry > 0) {
-          logging.error(this, s"NotLeaderForPartitionException is retryable, remain $retry retry")
-          Thread.sleep(100)
-          send(topic, msg, retry - 1)
-        } else produced.future
+      // Do not retry on these exceptions as they may cause duplicate messages on Kafka.
+      case _: NotEnoughReplicasAfterAppendException | _: TimeoutException =>
+        recreateProducer()
+        produced.future
+      case r: RetriableException if retry > 0 =>
+        logging.info(this, s"$r: Retrying $retry more times")
+        after(gracefulWaitTime, actorSystem.scheduler)(send(topic, msg, retry - 1))
+      // Ignore this exception as restarting the producer doesn't make sense
+      case e: RecordTooLargeException =>
+        Future.failed(e)
+      // All unknown errors just result in a recreation of the producer. The failure is propagated.
+      case _: Throwable =>
+        recreateProducer()
+        produced.future
     }
   }
 
   /** Closes producer. */
-  override def close() = {
+  override def close(): Unit = {
     logging.info(this, "closing producer")
     producer.close()
   }
@@ -104,5 +114,14 @@ class KafkaProducerConnector(kafkahosts: String,
     new KafkaProducer(props, keySerializer, valueSerializer)
   }
 
-  private val producer = getProducer(getProps)
+  private def recreateProducer(): Unit = {
+    val oldProducer = producer
+    Future {
+      oldProducer.close()
+      logging.info(this, s"old consumer closed")
+    }
+    producer = getProducer(getProps)
+  }
+
+  @volatile private var producer = getProducer(getProps)
 }
diff --git a/common/scala/src/main/scala/whisk/core/connector/MessageConsumer.scala b/common/scala/src/main/scala/whisk/core/connector/MessageConsumer.scala
index 2dd8ba3..14af69e 100644
--- a/common/scala/src/main/scala/whisk/core/connector/MessageConsumer.scala
+++ b/common/scala/src/main/scala/whisk/core/connector/MessageConsumer.scala
@@ -42,13 +42,13 @@ trait MessageConsumer {
    * @param duration for the long poll
    * @return iterable collection (topic, partition, offset, bytes)
    */
-  def peek(duration: Duration): Iterable[(String, Int, Long, Array[Byte])]
+  def peek(duration: FiniteDuration, retry: Int = 3): Iterable[(String, Int, Long, Array[Byte])]
 
   /**
    * Commits offsets from last peek operation to ensure they are removed
    * from the connector.
    */
-  def commit(): Unit
+  def commit(retry: Int = 3): Unit
 
   /** Closes consumer. */
   def close(): Unit
diff --git a/common/scala/src/main/scala/whisk/core/connector/MessagingProvider.scala b/common/scala/src/main/scala/whisk/core/connector/MessagingProvider.scala
index 1737624..8ec1f5a 100644
--- a/common/scala/src/main/scala/whisk/core/connector/MessagingProvider.scala
+++ b/common/scala/src/main/scala/whisk/core/connector/MessagingProvider.scala
@@ -17,7 +17,8 @@
 
 package whisk.core.connector
 
-import scala.concurrent.ExecutionContext
+import akka.actor.ActorSystem
+
 import scala.concurrent.duration.DurationInt
 import scala.concurrent.duration.FiniteDuration
 import whisk.common.Logging
@@ -28,11 +29,12 @@ import whisk.spi.Spi
  * An Spi for providing Messaging implementations.
  */
 trait MessagingProvider extends Spi {
-  def getConsumer(config: WhiskConfig,
-                  groupId: String,
-                  topic: String,
-                  maxPeek: Int = Int.MaxValue,
-                  maxPollInterval: FiniteDuration = 5.minutes)(implicit logging: Logging): MessageConsumer
-  def getProducer(config: WhiskConfig, ec: ExecutionContext)(implicit logging: Logging): MessageProducer
+  def getConsumer(
+    config: WhiskConfig,
+    groupId: String,
+    topic: String,
+    maxPeek: Int = Int.MaxValue,
+    maxPollInterval: FiniteDuration = 5.minutes)(implicit logging: Logging, actorSystem: ActorSystem): MessageConsumer
+  def getProducer(config: WhiskConfig)(implicit logging: Logging, actorSystem: ActorSystem): MessageProducer
   def ensureTopic(config: WhiskConfig, topic: String, topicConfig: String)(implicit logging: Logging): Boolean
 }
diff --git a/common/scala/src/main/scala/whisk/core/database/RemoteCacheInvalidation.scala b/common/scala/src/main/scala/whisk/core/database/RemoteCacheInvalidation.scala
index b615708..c632c83 100644
--- a/common/scala/src/main/scala/whisk/core/database/RemoteCacheInvalidation.scala
+++ b/common/scala/src/main/scala/whisk/core/database/RemoteCacheInvalidation.scala
@@ -60,8 +60,9 @@ class RemoteCacheInvalidation(config: WhiskConfig, component: String, instance:
   private val instanceId = s"$component${instance.toInt}"
 
   private val msgProvider = SpiLoader.get[MessagingProvider]
-  private val cacheInvalidationConsumer = msgProvider.getConsumer(config, s"$topic$instanceId", topic, maxPeek = 128)
-  private val cacheInvalidationProducer = msgProvider.getProducer(config, ec)
+  private val cacheInvalidationConsumer =
+    msgProvider.getConsumer(config, s"$topic$instanceId", topic, maxPeek = 128)
+  private val cacheInvalidationProducer = msgProvider.getProducer(config)
 
   def notifyOtherInstancesAboutInvalidation(key: CacheKey): Future[Unit] = {
     cacheInvalidationProducer.send(topic, CacheInvalidationMessage(key, instanceId)).map(_ => Unit)
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/ContainerPoolBalancer.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/ContainerPoolBalancer.scala
index a5327b0..dfa57bb 100644
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/ContainerPoolBalancer.scala
+++ b/core/controller/src/main/scala/whisk/core/loadBalancer/ContainerPoolBalancer.scala
@@ -169,7 +169,7 @@ class ContainerPoolBalancer(config: WhiskConfig, controllerInstance: InstanceId)
 
   /** Gets a producer which can publish messages to the kafka bus. */
   private val messagingProvider = SpiLoader.get[MessagingProvider]
-  private val messageProducer = messagingProvider.getProducer(config, executionContext)
+  private val messageProducer = messagingProvider.getProducer(config)
 
   private def sendActivationToInvoker(producer: MessageProducer,
                                       msg: ActivationMessage,
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
index 2ec822a..607670d 100644
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
+++ b/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
@@ -173,7 +173,7 @@ class ShardingContainerPoolBalancer(config: WhiskConfig, controllerInstance: Ins
   }
 
   private val messagingProvider = SpiLoader.get[MessagingProvider]
-  private val messageProducer = messagingProvider.getProducer(config, executionContext)
+  private val messageProducer = messagingProvider.getProducer(config)
 
   /** 3. Send the activation to the invoker */
   private def sendActivationToInvoker(producer: MessageProducer,
diff --git a/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala b/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
index 4ba2214..247b303 100644
--- a/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
+++ b/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
@@ -182,7 +182,7 @@ object Invoker {
     if (!msgProvider.ensureTopic(config, topic = "invoker" + assignedInvokerId, topicConfig = "invoker")) {
       abort(s"failure during msgProvider.ensureTopic for topic invoker$assignedInvokerId")
     }
-    val producer = msgProvider.getProducer(config, ec)
+    val producer = msgProvider.getProducer(config)
     val invoker = try {
       new InvokerReactive(config, invokerInstance, producer)
     } catch {
diff --git a/tests/src/test/scala/ha/ShootComponentsTests.scala b/tests/src/test/scala/ha/ShootComponentsTests.scala
index 6c2d00c..fa7d3a8 100644
--- a/tests/src/test/scala/ha/ShootComponentsTests.scala
+++ b/tests/src/test/scala/ha/ShootComponentsTests.scala
@@ -47,7 +47,8 @@ class ShootComponentsTests
     with WskTestHelpers
     with ScalaFutures
     with WskActorSystem
-    with StreamLogging {
+    with StreamLogging
+    with ShootComponentUtils {
 
   implicit val wskprops = WskProps()
   val wsk = new WskRest
@@ -63,10 +64,8 @@ class ShootComponentsTests
   val allowedRequestsPerMinute = (amountOfControllers - 1.0) * limitPerController
   val timeBeweenRequests = 60.seconds / allowedRequestsPerMinute
 
-  val controller0DockerHost = WhiskProperties.getBaseControllerHost() + ":" + WhiskProperties.getProperty(
-    WhiskConfig.dockerPort)
-
-  val couchDB0DockerHost = WhiskProperties.getBaseDBHost() + ":" + WhiskProperties.getProperty(WhiskConfig.dockerPort)
+  val controller0DockerHost = WhiskProperties.getBaseControllerHost()
+  val couchDB0DockerHost = WhiskProperties.getBaseDBHost()
 
   val dbProtocol = WhiskProperties.getProperty(WhiskConfig.dbProtocol)
   val dbHostsList = WhiskProperties.getDBHosts
@@ -76,35 +75,6 @@ class ShootComponentsTests
   val dbPrefix = WhiskProperties.getProperty(WhiskConfig.dbPrefix)
   val dbWhiskAuth = WhiskProperties.getProperty(WhiskConfig.dbAuths)
 
-  private def getDockerCommand(host: String, component: String, cmd: String) = {
-    def file(path: String) = Try(new File(path)).filter(_.exists).map(_.getAbsolutePath).toOption
-
-    val docker = (file("/usr/bin/docker") orElse file("/usr/local/bin/docker")).getOrElse("docker")
-
-    Seq(docker, "--host", host, cmd, component)
-  }
-
-  def restartComponent(host: String, component: String) = {
-    val cmd: Seq[String] = getDockerCommand(host, component, "restart")
-    println(s"Running command: ${cmd.mkString(" ")}")
-
-    TestUtils.runCmd(0, new File("."), cmd: _*)
-  }
-
-  def stopComponent(host: String, component: String) = {
-    val cmd: Seq[String] = getDockerCommand(host, component, "stop")
-    println(s"Running command: ${cmd.mkString(" ")}")
-
-    TestUtils.runCmd(0, new File("."), cmd: _*)
-  }
-
-  def startComponent(host: String, component: String) = {
-    val cmd: Seq[String] = getDockerCommand(host, component, "start")
-    println(s"Running command: ${cmd.mkString(" ")}")
-
-    TestUtils.runCmd(0, new File("."), cmd: _*)
-  }
-
   def ping(host: String, port: Int, path: String = "/") = {
     val response = Try {
       Http().singleRequest(HttpRequest(uri = s"http://$host:$port$path")).futureValue
@@ -317,3 +287,35 @@ class ShootComponentsTests
       }
   }
 }
+
+trait ShootComponentUtils {
+  private def getDockerCommand(host: String, component: String, cmd: String) = {
+    def file(path: String) = Try(new File(path)).filter(_.exists).map(_.getAbsolutePath).toOption
+
+    val docker = (file("/usr/bin/docker") orElse file("/usr/local/bin/docker")).getOrElse("docker")
+    val dockerPort = WhiskProperties.getProperty(WhiskConfig.dockerPort)
+
+    Seq(docker, "--host", host + ":" + dockerPort, cmd, component)
+  }
+
+  def restartComponent(host: String, component: String) = {
+    val cmd: Seq[String] = getDockerCommand(host, component, "restart")
+    println(s"Running command: ${cmd.mkString(" ")}")
+
+    TestUtils.runCmd(0, new File("."), cmd: _*)
+  }
+
+  def stopComponent(host: String, component: String) = {
+    val cmd: Seq[String] = getDockerCommand(host, component, "stop")
+    println(s"Running command: ${cmd.mkString(" ")}")
+
+    TestUtils.runCmd(0, new File("."), cmd: _*)
+  }
+
+  def startComponent(host: String, component: String) = {
+    val cmd: Seq[String] = getDockerCommand(host, component, "start")
+    println(s"Running command: ${cmd.mkString(" ")}")
+
+    TestUtils.runCmd(0, new File("."), cmd: _*)
+  }
+}
diff --git a/tests/src/test/scala/services/KafkaConnectorTests.scala b/tests/src/test/scala/services/KafkaConnectorTests.scala
index d987cb7..3f91b59 100644
--- a/tests/src/test/scala/services/KafkaConnectorTests.scala
+++ b/tests/src/test/scala/services/KafkaConnectorTests.scala
@@ -21,23 +21,30 @@ import java.io.File
 import java.util.Calendar
 
 import common.{StreamLogging, TestUtils, WhiskProperties, WskActorSystem}
+import ha.ShootComponentUtils
 import org.apache.kafka.clients.consumer.CommitFailedException
 import org.junit.runner.RunWith
-import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
 import org.scalatest.junit.JUnitRunner
+import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
 import whisk.common.TransactionId
 import whisk.connector.kafka.{KafkaConsumerConnector, KafkaMessagingProvider, KafkaProducerConnector}
 import whisk.core.WhiskConfig
 import whisk.core.connector.Message
 import whisk.utils.{retry, ExecutionContextFactory}
 
-import scala.concurrent.{Await, ExecutionContext}
 import scala.concurrent.duration.{DurationInt, FiniteDuration}
+import scala.concurrent.{Await, ExecutionContext}
 import scala.language.postfixOps
 import scala.util.Try
 
 @RunWith(classOf[JUnitRunner])
-class KafkaConnectorTests extends FlatSpec with Matchers with WskActorSystem with BeforeAndAfterAll with StreamLogging {
+class KafkaConnectorTests
+    extends FlatSpec
+    with Matchers
+    with WskActorSystem
+    with BeforeAndAfterAll
+    with StreamLogging
+    with ShootComponentUtils {
   implicit val transid: TransactionId = TransactionId.testing
   implicit val ec: ExecutionContext = ExecutionContextFactory.makeCachedThreadPoolExecutionContext()
 
@@ -46,7 +53,7 @@ class KafkaConnectorTests extends FlatSpec with Matchers with WskActorSystem wit
 
   val groupid = "kafkatest"
   val topic = "KafkaConnectorTestTopic"
-  val maxPollInterval = 10 seconds
+  val maxPollInterval = 10.seconds
 
   // Need to overwrite replication factor for tests that shut down and start
   // Kafka instances intentionally. These tests will fail if there is more than
@@ -57,10 +64,10 @@ class KafkaConnectorTests extends FlatSpec with Matchers with WskActorSystem wit
   println(s"Create test topic '$topic' with replicationFactor=$replicationFactor")
   assert(KafkaMessagingProvider.ensureTopic(config, topic, topic), s"Creation of topic $topic failed")
 
-  println(s"Create test topic '${topic}' with replicationFactor=${replicationFactor}")
-  assert(KafkaMessagingProvider.ensureTopic(config, topic, topic), s"Creation of topic ${topic} failed")
+  println(s"Create test topic '$topic' with replicationFactor=$replicationFactor")
+  assert(KafkaMessagingProvider.ensureTopic(config, topic, topic), s"Creation of topic $topic failed")
 
-  val producer = new KafkaProducerConnector(config.kafkaHosts, ec)
+  val producer = new KafkaProducerConnector(config.kafkaHosts)
   val consumer = new KafkaConsumerConnector(config.kafkaHosts, groupid, topic)
 
   override def afterAll(): Unit = {
@@ -143,18 +150,31 @@ class KafkaConnectorTests extends FlatSpec with Matchers with WskActorSystem wit
         val startLog = s", started"
         val prevCount = startLog.r.findAllMatchIn(commandComponent(kafkaHost, "logs", s"kafka$i").stdout).length
 
-        commandComponent(kafkaHost, "stop", s"kafka$i")
-        sendAndReceiveMessage(message, 30 seconds, 30 seconds) should have size 1
+        // 1. stop one of kafka node
+        stopComponent(kafkaHost, s"kafka$i")
+
+        // 2. kafka cluster should be ok at least after three retries
+        retry({
+          val received = sendAndReceiveMessage(message, 40 seconds, 40 seconds)
+          received.size should be >= 1
+        }, 3, Some(100.milliseconds))
         consumer.commit()
 
-        commandComponent(kafkaHost, "start", s"kafka$i")
+        // 3. recover stopped node
+        startComponent(kafkaHost, s"kafka$i")
+
+        // 4. wait until kafka is up
         retry({
           startLog.r
             .findAllMatchIn(commandComponent(kafkaHost, "logs", s"kafka$i").stdout)
             .length shouldBe prevCount + 1
-        }, 20, Some(1.second)) // wait until kafka is up
+        }, 20, Some(1.second))
 
-        sendAndReceiveMessage(message, 30 seconds, 30 seconds) should have size 1
+        // 5. kafka cluster should be ok at least after three retires
+        retry({
+          val received = sendAndReceiveMessage(message, 40 seconds, 40 seconds)
+          received.size should be >= 1
+        }, 3, Some(100.milliseconds))
         consumer.commit()
       }
     }
diff --git a/tests/src/test/scala/whisk/core/connector/test/MessageFeedTests.scala b/tests/src/test/scala/whisk/core/connector/test/MessageFeedTests.scala
index b240edd..c6743aa 100644
--- a/tests/src/test/scala/whisk/core/connector/test/MessageFeedTests.scala
+++ b/tests/src/test/scala/whisk/core/connector/test/MessageFeedTests.scala
@@ -65,7 +65,7 @@ class MessageFeedTests
     val peekCount = new AtomicInteger()
 
     val consumer = new TestConnector("feedtest", 4, true) {
-      override def peek(duration: Duration) = {
+      override def peek(duration: FiniteDuration, retry: Int = 0) = {
         peekCount.incrementAndGet()
         super.peek(duration)
       }
diff --git a/tests/src/test/scala/whisk/core/connector/test/TestConnector.scala b/tests/src/test/scala/whisk/core/connector/test/TestConnector.scala
index e3b9597..8b428e5 100644
--- a/tests/src/test/scala/whisk/core/connector/test/TestConnector.scala
+++ b/tests/src/test/scala/whisk/core/connector/test/TestConnector.scala
@@ -21,7 +21,7 @@ import java.util.ArrayList
 import java.util.concurrent.LinkedBlockingQueue
 
 import scala.concurrent.Future
-import scala.concurrent.duration.Duration
+import scala.concurrent.duration._
 import scala.collection.JavaConversions._
 
 import org.apache.kafka.clients.producer.RecordMetadata
@@ -37,7 +37,7 @@ class TestConnector(topic: String, override val maxPeek: Int, allowMoreThanMax:
     extends MessageConsumer
     with StreamLogging {
 
-  override def peek(duration: Duration) = {
+  override def peek(duration: FiniteDuration, retry: Int = 0) = {
     val msgs = new ArrayList[Message]
     queue.synchronized {
       queue.drainTo(msgs, if (allowMoreThanMax) Int.MaxValue else maxPeek)
@@ -48,7 +48,7 @@ class TestConnector(topic: String, override val maxPeek: Int, allowMoreThanMax:
     }
   }
 
-  override def commit() = {
+  override def commit(retry: Int = 0) = {
     if (throwCommitException) {
       throw new Exception("commit failed")
     } else {

-- 
To stop receiving notification emails like this one, please contact
markusthoemmes@apache.org.