You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ma...@apache.org on 2018/02/26 09:55:21 UTC
[incubator-openwhisk] branch master updated: Enhance Kafka message
provider for resiliency. (#3072)
This is an automated email from the ASF dual-hosted git repository.
markusthoemmes pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new a91feb0 Enhance Kafka message provider for resiliency. (#3072)
a91feb0 is described below
commit a91feb013dde79eca2e892f70840157f9b18ccb4
Author: Sang Heon Lee <de...@gmail.com>
AuthorDate: Mon Feb 26 18:55:19 2018 +0900
Enhance Kafka message provider for resiliency. (#3072)
Co-authored-by: Markus Thömmes <ma...@me.com>
---
common/scala/src/main/resources/application.conf | 2 +
.../connector/kafka/KafkaConsumerConnector.scala | 90 +++++++++++++++++-----
.../connector/kafka/KafkaMessagingProvider.scala | 12 ++-
.../connector/kafka/KafkaProducerConnector.scala | 79 +++++++++++--------
.../whisk/core/connector/MessageConsumer.scala | 4 +-
.../whisk/core/connector/MessagingProvider.scala | 16 ++--
.../core/database/RemoteCacheInvalidation.scala | 5 +-
.../core/loadBalancer/ContainerPoolBalancer.scala | 2 +-
.../ShardingContainerPoolBalancer.scala | 2 +-
.../main/scala/whisk/core/invoker/Invoker.scala | 2 +-
tests/src/test/scala/ha/ShootComponentsTests.scala | 70 +++++++++--------
.../test/scala/services/KafkaConnectorTests.scala | 44 ++++++++---
.../core/connector/test/MessageFeedTests.scala | 2 +-
.../whisk/core/connector/test/TestConnector.scala | 6 +-
14 files changed, 219 insertions(+), 117 deletions(-)
diff --git a/common/scala/src/main/resources/application.conf b/common/scala/src/main/resources/application.conf
index 3e09bba..1b9b739 100644
--- a/common/scala/src/main/resources/application.conf
+++ b/common/scala/src/main/resources/application.conf
@@ -60,6 +60,8 @@ whisk {
producer {
acks = 1
max-request-size = ${whisk.activation.payload.max}
+ request-timeout-ms = 30000
+ metadata-max-age-ms = 15000
}
consumer {
session-timeout-ms = 30000
diff --git a/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala b/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala
index 09cd3ac..51a27ad 100644
--- a/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala
+++ b/common/scala/src/main/scala/whisk/connector/kafka/KafkaConsumerConnector.scala
@@ -19,43 +19,86 @@ package whisk.connector.kafka
import java.util.Properties
-import scala.collection.JavaConversions.iterableAsScalaIterable
-import scala.collection.JavaConversions.seqAsJavaList
-import scala.concurrent.duration.Duration
-import scala.concurrent.duration.DurationInt
-import org.apache.kafka.clients.consumer.ConsumerConfig
-import org.apache.kafka.clients.consumer.KafkaConsumer
+import akka.actor.ActorSystem
+import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
+import org.apache.kafka.common.errors.{RetriableException, WakeupException}
import org.apache.kafka.common.serialization.ByteArrayDeserializer
import pureconfig.loadConfigOrThrow
import whisk.common.Logging
import whisk.core.ConfigKeys
import whisk.core.connector.MessageConsumer
-class KafkaConsumerConnector(kafkahost: String,
- groupid: String,
- topic: String,
- override val maxPeek: Int = Int.MaxValue)(implicit logging: Logging)
+import scala.collection.JavaConversions.{iterableAsScalaIterable, seqAsJavaList}
+import scala.concurrent.duration._
+import scala.concurrent.{ExecutionContext, Future}
+
+case class KafkaConsumerConfig(sessionTimeoutMs: Long)
+
+class KafkaConsumerConnector(
+ kafkahost: String,
+ groupid: String,
+ topic: String,
+ override val maxPeek: Int = Int.MaxValue)(implicit logging: Logging, actorSystem: ActorSystem)
extends MessageConsumer {
+ implicit val ec: ExecutionContext = actorSystem.dispatcher
+ private val gracefulWaitTime = 100.milliseconds
+
+ // The consumer is generally configured via getProps. This configuration only loads values necessary for "outer"
+ // logic, like the wakeup timer.
+ private val cfg = loadConfigOrThrow[KafkaConsumerConfig](ConfigKeys.kafkaConsumer)
+
/**
* Long poll for messages. Method returns once message are available but no later than given
* duration.
*
* @param duration the maximum duration for the long poll
*/
- override def peek(duration: Duration = 500.milliseconds) = {
- val records = consumer.poll(duration.toMillis)
- records map { r =>
- (r.topic, r.partition, r.offset, r.value)
- }
+ override def peek(duration: FiniteDuration = 500.milliseconds,
+ retry: Int = 3): Iterable[(String, Int, Long, Array[Byte])] = {
+
+ // poll can be infinitely blocked in edge-cases, so we need to wakeup explicitly.
+ val wakeUpTask = actorSystem.scheduler.scheduleOnce(cfg.sessionTimeoutMs.milliseconds + 1.second)(consumer.wakeup())
+
+ try {
+ consumer.poll(duration.toMillis).map(r => (r.topic, r.partition, r.offset, r.value))
+ } catch {
+ // Happens if the peek hangs.
+ case _: WakeupException if retry > 0 =>
+ logging.error(this, s"poll timeout occurred. Retrying $retry more times.")
+ Thread.sleep(gracefulWaitTime.toMillis) // using Thread.sleep is okay, since `poll` is blocking anyway
+ peek(duration, retry - 1)
+ case e: RetriableException if retry > 0 =>
+ logging.error(this, s"$e: Retrying $retry more times")
+ wakeUpTask.cancel()
+ Thread.sleep(gracefulWaitTime.toMillis) // using Thread.sleep is okay, since `poll` is blocking anyway
+ peek(duration, retry - 1)
+ // Every other error results in a restart of the consumer
+ case e: Throwable =>
+ recreateConsumer()
+ throw e
+ } finally wakeUpTask.cancel()
}
/**
* Commits offsets from last poll.
*/
- def commit() = consumer.commitSync()
+ def commit(retry: Int = 3): Unit =
+ try {
+ consumer.commitSync()
+ } catch {
+ case e: RetriableException =>
+ if (retry > 0) {
+ logging.error(this, s"$e: retrying $retry more times")
+ Thread.sleep(gracefulWaitTime.toMillis) // using Thread.sleep is okay, since `commitSync` is blocking anyway
+ commit(retry - 1)
+ } else {
+ throw e
+ }
+ }
- override def close() = {
+ override def close(): Unit = {
+ consumer.close()
logging.info(this, s"closing '$topic' consumer")
}
@@ -79,9 +122,18 @@ class KafkaConsumerConnector(kafkahost: String,
val keyDeserializer = new ByteArrayDeserializer
val valueDeserializer = new ByteArrayDeserializer
val consumer = new KafkaConsumer(props, keyDeserializer, valueDeserializer)
- topics foreach { consumer.subscribe(_) }
+ topics.foreach(consumer.subscribe(_))
consumer
}
- private val consumer = getConsumer(getProps, Some(List(topic)))
+ private def recreateConsumer(): Unit = {
+ val oldConsumer = consumer
+ Future {
+ oldConsumer.close()
+ logging.info(this, s"old consumer closed")
+ }
+ consumer = getConsumer(getProps, Some(List(topic)))
+ }
+
+ @volatile private var consumer = getConsumer(getProps, Some(List(topic)))
}
diff --git a/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala b/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala
index 3c8df85..6b0fc14 100644
--- a/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala
+++ b/common/scala/src/main/scala/whisk/connector/kafka/KafkaMessagingProvider.scala
@@ -20,8 +20,10 @@ package whisk.connector.kafka
import java.util.Properties
import java.util.concurrent.ExecutionException
-import scala.concurrent.ExecutionContext
import scala.concurrent.duration.FiniteDuration
+import akka.actor.ActorSystem
+
+import scala.concurrent.duration._
import scala.collection.JavaConverters._
import org.apache.kafka.clients.admin.AdminClientConfig
import org.apache.kafka.clients.admin.AdminClient
@@ -43,11 +45,12 @@ case class KafkaConfig(replicationFactor: Short)
object KafkaMessagingProvider extends MessagingProvider {
def getConsumer(config: WhiskConfig, groupId: String, topic: String, maxPeek: Int, maxPollInterval: FiniteDuration)(
- implicit logging: Logging): MessageConsumer =
+ implicit logging: Logging,
+ actorSystem: ActorSystem): MessageConsumer =
new KafkaConsumerConnector(config.kafkaHosts, groupId, topic, maxPeek)
- def getProducer(config: WhiskConfig, ec: ExecutionContext)(implicit logging: Logging): MessageProducer =
- new KafkaProducerConnector(config.kafkaHosts, ec)
+ def getProducer(config: WhiskConfig)(implicit logging: Logging, actorSystem: ActorSystem): MessageProducer =
+ new KafkaProducerConnector(config.kafkaHosts)
def ensureTopic(config: WhiskConfig, topic: String, topicConfig: String)(implicit logging: Logging): Boolean = {
val kc = loadConfigOrThrow[KafkaConfig](ConfigKeys.kafka)
@@ -55,6 +58,7 @@ object KafkaMessagingProvider extends MessagingProvider {
loadConfigOrThrow[Map[String, String]](ConfigKeys.kafkaTopics + s".$topicConfig"))
val props = new Properties
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, config.kafkaHosts)
+
val commonConfig =
KafkaConfiguration.configMapToKafkaConfig(loadConfigOrThrow[Map[String, String]](ConfigKeys.kafkaCommon))
commonConfig.foreach {
diff --git a/common/scala/src/main/scala/whisk/connector/kafka/KafkaProducerConnector.scala b/common/scala/src/main/scala/whisk/connector/kafka/KafkaProducerConnector.scala
index dca7cce..0c511f3 100644
--- a/common/scala/src/main/scala/whisk/connector/kafka/KafkaProducerConnector.scala
+++ b/common/scala/src/main/scala/whisk/connector/kafka/KafkaProducerConnector.scala
@@ -19,36 +19,38 @@ package whisk.connector.kafka
import java.util.Properties
-import scala.concurrent.ExecutionContext
-import scala.concurrent.Future
-import scala.concurrent.Promise
-import scala.util.Failure
-import scala.util.Success
-import org.apache.kafka.clients.producer.Callback
-import org.apache.kafka.clients.producer.KafkaProducer
-import org.apache.kafka.clients.producer.ProducerConfig
-import org.apache.kafka.clients.producer.ProducerRecord
-import org.apache.kafka.clients.producer.RecordMetadata
-import org.apache.kafka.common.errors.NotLeaderForPartitionException
+import akka.actor.ActorSystem
+import akka.pattern.after
+import org.apache.kafka.clients.producer._
+import org.apache.kafka.common.errors.{
+ NotEnoughReplicasAfterAppendException,
+ RecordTooLargeException,
+ RetriableException,
+ TimeoutException
+}
import org.apache.kafka.common.serialization.StringSerializer
-import whisk.common.Counter
-import whisk.common.Logging
-import whisk.core.connector.Message
-import whisk.core.connector.MessageProducer
-import whisk.core.entity.UUIDs
import pureconfig._
+import whisk.common.{Counter, Logging, TransactionId}
import whisk.core.ConfigKeys
+import whisk.core.connector.{Message, MessageProducer}
+import whisk.core.entity.UUIDs
+
+import scala.concurrent.duration._
+import scala.concurrent.{ExecutionContext, Future, Promise}
+import scala.util.{Failure, Success}
-class KafkaProducerConnector(kafkahosts: String,
- implicit val executionContext: ExecutionContext,
- id: String = UUIDs.randomUUID().toString)(implicit logging: Logging)
+class KafkaProducerConnector(kafkahosts: String, id: String = UUIDs.randomUUID().toString)(implicit logging: Logging,
+ actorSystem: ActorSystem)
extends MessageProducer {
- override def sentCount() = sentCounter.cur
+ implicit val ec: ExecutionContext = actorSystem.dispatcher
+ private val gracefulWaitTime = 100.milliseconds
+
+ override def sentCount(): Long = sentCounter.cur
/** Sends msg to topic. This is an asynchronous operation. */
- override def send(topic: String, msg: Message, retry: Int = 2): Future[RecordMetadata] = {
- implicit val transid = msg.transid
+ override def send(topic: String, msg: Message, retry: Int = 3): Future[RecordMetadata] = {
+ implicit val transid: TransactionId = msg.transid
val record = new ProducerRecord[String, String](topic, "messages", msg.serialize)
val produced = Promise[RecordMetadata]()
@@ -66,17 +68,25 @@ class KafkaProducerConnector(kafkahosts: String,
case Failure(t) =>
logging.error(this, s"sending message on topic '$topic' failed: ${t.getMessage}")
} recoverWith {
- case t: NotLeaderForPartitionException =>
- if (retry > 0) {
- logging.error(this, s"NotLeaderForPartitionException is retryable, remain $retry retry")
- Thread.sleep(100)
- send(topic, msg, retry - 1)
- } else produced.future
+ // Do not retry on these exceptions as they may cause duplicate messages on Kafka.
+ case _: NotEnoughReplicasAfterAppendException | _: TimeoutException =>
+ recreateProducer()
+ produced.future
+ case r: RetriableException if retry > 0 =>
+ logging.info(this, s"$r: Retrying $retry more times")
+ after(gracefulWaitTime, actorSystem.scheduler)(send(topic, msg, retry - 1))
+ // Ignore this exception as restarting the producer doesn't make sense
+ case e: RecordTooLargeException =>
+ Future.failed(e)
+ // All unknown errors just result in a recreation of the producer. The failure is propagated.
+ case _: Throwable =>
+ recreateProducer()
+ produced.future
}
}
/** Closes producer. */
- override def close() = {
+ override def close(): Unit = {
logging.info(this, "closing producer")
producer.close()
}
@@ -104,5 +114,14 @@ class KafkaProducerConnector(kafkahosts: String,
new KafkaProducer(props, keySerializer, valueSerializer)
}
- private val producer = getProducer(getProps)
+ private def recreateProducer(): Unit = {
+ val oldProducer = producer
+ Future {
+ oldProducer.close()
+ logging.info(this, s"old consumer closed")
+ }
+ producer = getProducer(getProps)
+ }
+
+ @volatile private var producer = getProducer(getProps)
}
diff --git a/common/scala/src/main/scala/whisk/core/connector/MessageConsumer.scala b/common/scala/src/main/scala/whisk/core/connector/MessageConsumer.scala
index 2dd8ba3..14af69e 100644
--- a/common/scala/src/main/scala/whisk/core/connector/MessageConsumer.scala
+++ b/common/scala/src/main/scala/whisk/core/connector/MessageConsumer.scala
@@ -42,13 +42,13 @@ trait MessageConsumer {
* @param duration for the long poll
* @return iterable collection (topic, partition, offset, bytes)
*/
- def peek(duration: Duration): Iterable[(String, Int, Long, Array[Byte])]
+ def peek(duration: FiniteDuration, retry: Int = 3): Iterable[(String, Int, Long, Array[Byte])]
/**
* Commits offsets from last peek operation to ensure they are removed
* from the connector.
*/
- def commit(): Unit
+ def commit(retry: Int = 3): Unit
/** Closes consumer. */
def close(): Unit
diff --git a/common/scala/src/main/scala/whisk/core/connector/MessagingProvider.scala b/common/scala/src/main/scala/whisk/core/connector/MessagingProvider.scala
index 1737624..8ec1f5a 100644
--- a/common/scala/src/main/scala/whisk/core/connector/MessagingProvider.scala
+++ b/common/scala/src/main/scala/whisk/core/connector/MessagingProvider.scala
@@ -17,7 +17,8 @@
package whisk.core.connector
-import scala.concurrent.ExecutionContext
+import akka.actor.ActorSystem
+
import scala.concurrent.duration.DurationInt
import scala.concurrent.duration.FiniteDuration
import whisk.common.Logging
@@ -28,11 +29,12 @@ import whisk.spi.Spi
* An Spi for providing Messaging implementations.
*/
trait MessagingProvider extends Spi {
- def getConsumer(config: WhiskConfig,
- groupId: String,
- topic: String,
- maxPeek: Int = Int.MaxValue,
- maxPollInterval: FiniteDuration = 5.minutes)(implicit logging: Logging): MessageConsumer
- def getProducer(config: WhiskConfig, ec: ExecutionContext)(implicit logging: Logging): MessageProducer
+ def getConsumer(
+ config: WhiskConfig,
+ groupId: String,
+ topic: String,
+ maxPeek: Int = Int.MaxValue,
+ maxPollInterval: FiniteDuration = 5.minutes)(implicit logging: Logging, actorSystem: ActorSystem): MessageConsumer
+ def getProducer(config: WhiskConfig)(implicit logging: Logging, actorSystem: ActorSystem): MessageProducer
def ensureTopic(config: WhiskConfig, topic: String, topicConfig: String)(implicit logging: Logging): Boolean
}
diff --git a/common/scala/src/main/scala/whisk/core/database/RemoteCacheInvalidation.scala b/common/scala/src/main/scala/whisk/core/database/RemoteCacheInvalidation.scala
index b615708..c632c83 100644
--- a/common/scala/src/main/scala/whisk/core/database/RemoteCacheInvalidation.scala
+++ b/common/scala/src/main/scala/whisk/core/database/RemoteCacheInvalidation.scala
@@ -60,8 +60,9 @@ class RemoteCacheInvalidation(config: WhiskConfig, component: String, instance:
private val instanceId = s"$component${instance.toInt}"
private val msgProvider = SpiLoader.get[MessagingProvider]
- private val cacheInvalidationConsumer = msgProvider.getConsumer(config, s"$topic$instanceId", topic, maxPeek = 128)
- private val cacheInvalidationProducer = msgProvider.getProducer(config, ec)
+ private val cacheInvalidationConsumer =
+ msgProvider.getConsumer(config, s"$topic$instanceId", topic, maxPeek = 128)
+ private val cacheInvalidationProducer = msgProvider.getProducer(config)
def notifyOtherInstancesAboutInvalidation(key: CacheKey): Future[Unit] = {
cacheInvalidationProducer.send(topic, CacheInvalidationMessage(key, instanceId)).map(_ => Unit)
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/ContainerPoolBalancer.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/ContainerPoolBalancer.scala
index a5327b0..dfa57bb 100644
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/ContainerPoolBalancer.scala
+++ b/core/controller/src/main/scala/whisk/core/loadBalancer/ContainerPoolBalancer.scala
@@ -169,7 +169,7 @@ class ContainerPoolBalancer(config: WhiskConfig, controllerInstance: InstanceId)
/** Gets a producer which can publish messages to the kafka bus. */
private val messagingProvider = SpiLoader.get[MessagingProvider]
- private val messageProducer = messagingProvider.getProducer(config, executionContext)
+ private val messageProducer = messagingProvider.getProducer(config)
private def sendActivationToInvoker(producer: MessageProducer,
msg: ActivationMessage,
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
index 2ec822a..607670d 100644
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
+++ b/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
@@ -173,7 +173,7 @@ class ShardingContainerPoolBalancer(config: WhiskConfig, controllerInstance: Ins
}
private val messagingProvider = SpiLoader.get[MessagingProvider]
- private val messageProducer = messagingProvider.getProducer(config, executionContext)
+ private val messageProducer = messagingProvider.getProducer(config)
/** 3. Send the activation to the invoker */
private def sendActivationToInvoker(producer: MessageProducer,
diff --git a/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala b/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
index 4ba2214..247b303 100644
--- a/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
+++ b/core/invoker/src/main/scala/whisk/core/invoker/Invoker.scala
@@ -182,7 +182,7 @@ object Invoker {
if (!msgProvider.ensureTopic(config, topic = "invoker" + assignedInvokerId, topicConfig = "invoker")) {
abort(s"failure during msgProvider.ensureTopic for topic invoker$assignedInvokerId")
}
- val producer = msgProvider.getProducer(config, ec)
+ val producer = msgProvider.getProducer(config)
val invoker = try {
new InvokerReactive(config, invokerInstance, producer)
} catch {
diff --git a/tests/src/test/scala/ha/ShootComponentsTests.scala b/tests/src/test/scala/ha/ShootComponentsTests.scala
index 6c2d00c..fa7d3a8 100644
--- a/tests/src/test/scala/ha/ShootComponentsTests.scala
+++ b/tests/src/test/scala/ha/ShootComponentsTests.scala
@@ -47,7 +47,8 @@ class ShootComponentsTests
with WskTestHelpers
with ScalaFutures
with WskActorSystem
- with StreamLogging {
+ with StreamLogging
+ with ShootComponentUtils {
implicit val wskprops = WskProps()
val wsk = new WskRest
@@ -63,10 +64,8 @@ class ShootComponentsTests
val allowedRequestsPerMinute = (amountOfControllers - 1.0) * limitPerController
val timeBeweenRequests = 60.seconds / allowedRequestsPerMinute
- val controller0DockerHost = WhiskProperties.getBaseControllerHost() + ":" + WhiskProperties.getProperty(
- WhiskConfig.dockerPort)
-
- val couchDB0DockerHost = WhiskProperties.getBaseDBHost() + ":" + WhiskProperties.getProperty(WhiskConfig.dockerPort)
+ val controller0DockerHost = WhiskProperties.getBaseControllerHost()
+ val couchDB0DockerHost = WhiskProperties.getBaseDBHost()
val dbProtocol = WhiskProperties.getProperty(WhiskConfig.dbProtocol)
val dbHostsList = WhiskProperties.getDBHosts
@@ -76,35 +75,6 @@ class ShootComponentsTests
val dbPrefix = WhiskProperties.getProperty(WhiskConfig.dbPrefix)
val dbWhiskAuth = WhiskProperties.getProperty(WhiskConfig.dbAuths)
- private def getDockerCommand(host: String, component: String, cmd: String) = {
- def file(path: String) = Try(new File(path)).filter(_.exists).map(_.getAbsolutePath).toOption
-
- val docker = (file("/usr/bin/docker") orElse file("/usr/local/bin/docker")).getOrElse("docker")
-
- Seq(docker, "--host", host, cmd, component)
- }
-
- def restartComponent(host: String, component: String) = {
- val cmd: Seq[String] = getDockerCommand(host, component, "restart")
- println(s"Running command: ${cmd.mkString(" ")}")
-
- TestUtils.runCmd(0, new File("."), cmd: _*)
- }
-
- def stopComponent(host: String, component: String) = {
- val cmd: Seq[String] = getDockerCommand(host, component, "stop")
- println(s"Running command: ${cmd.mkString(" ")}")
-
- TestUtils.runCmd(0, new File("."), cmd: _*)
- }
-
- def startComponent(host: String, component: String) = {
- val cmd: Seq[String] = getDockerCommand(host, component, "start")
- println(s"Running command: ${cmd.mkString(" ")}")
-
- TestUtils.runCmd(0, new File("."), cmd: _*)
- }
-
def ping(host: String, port: Int, path: String = "/") = {
val response = Try {
Http().singleRequest(HttpRequest(uri = s"http://$host:$port$path")).futureValue
@@ -317,3 +287,35 @@ class ShootComponentsTests
}
}
}
+
+trait ShootComponentUtils {
+ private def getDockerCommand(host: String, component: String, cmd: String) = {
+ def file(path: String) = Try(new File(path)).filter(_.exists).map(_.getAbsolutePath).toOption
+
+ val docker = (file("/usr/bin/docker") orElse file("/usr/local/bin/docker")).getOrElse("docker")
+ val dockerPort = WhiskProperties.getProperty(WhiskConfig.dockerPort)
+
+ Seq(docker, "--host", host + ":" + dockerPort, cmd, component)
+ }
+
+ def restartComponent(host: String, component: String) = {
+ val cmd: Seq[String] = getDockerCommand(host, component, "restart")
+ println(s"Running command: ${cmd.mkString(" ")}")
+
+ TestUtils.runCmd(0, new File("."), cmd: _*)
+ }
+
+ def stopComponent(host: String, component: String) = {
+ val cmd: Seq[String] = getDockerCommand(host, component, "stop")
+ println(s"Running command: ${cmd.mkString(" ")}")
+
+ TestUtils.runCmd(0, new File("."), cmd: _*)
+ }
+
+ def startComponent(host: String, component: String) = {
+ val cmd: Seq[String] = getDockerCommand(host, component, "start")
+ println(s"Running command: ${cmd.mkString(" ")}")
+
+ TestUtils.runCmd(0, new File("."), cmd: _*)
+ }
+}
diff --git a/tests/src/test/scala/services/KafkaConnectorTests.scala b/tests/src/test/scala/services/KafkaConnectorTests.scala
index d987cb7..3f91b59 100644
--- a/tests/src/test/scala/services/KafkaConnectorTests.scala
+++ b/tests/src/test/scala/services/KafkaConnectorTests.scala
@@ -21,23 +21,30 @@ import java.io.File
import java.util.Calendar
import common.{StreamLogging, TestUtils, WhiskProperties, WskActorSystem}
+import ha.ShootComponentUtils
import org.apache.kafka.clients.consumer.CommitFailedException
import org.junit.runner.RunWith
-import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
import org.scalatest.junit.JUnitRunner
+import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
import whisk.common.TransactionId
import whisk.connector.kafka.{KafkaConsumerConnector, KafkaMessagingProvider, KafkaProducerConnector}
import whisk.core.WhiskConfig
import whisk.core.connector.Message
import whisk.utils.{retry, ExecutionContextFactory}
-import scala.concurrent.{Await, ExecutionContext}
import scala.concurrent.duration.{DurationInt, FiniteDuration}
+import scala.concurrent.{Await, ExecutionContext}
import scala.language.postfixOps
import scala.util.Try
@RunWith(classOf[JUnitRunner])
-class KafkaConnectorTests extends FlatSpec with Matchers with WskActorSystem with BeforeAndAfterAll with StreamLogging {
+class KafkaConnectorTests
+ extends FlatSpec
+ with Matchers
+ with WskActorSystem
+ with BeforeAndAfterAll
+ with StreamLogging
+ with ShootComponentUtils {
implicit val transid: TransactionId = TransactionId.testing
implicit val ec: ExecutionContext = ExecutionContextFactory.makeCachedThreadPoolExecutionContext()
@@ -46,7 +53,7 @@ class KafkaConnectorTests extends FlatSpec with Matchers with WskActorSystem wit
val groupid = "kafkatest"
val topic = "KafkaConnectorTestTopic"
- val maxPollInterval = 10 seconds
+ val maxPollInterval = 10.seconds
// Need to overwrite replication factor for tests that shut down and start
// Kafka instances intentionally. These tests will fail if there is more than
@@ -57,10 +64,10 @@ class KafkaConnectorTests extends FlatSpec with Matchers with WskActorSystem wit
println(s"Create test topic '$topic' with replicationFactor=$replicationFactor")
assert(KafkaMessagingProvider.ensureTopic(config, topic, topic), s"Creation of topic $topic failed")
- println(s"Create test topic '${topic}' with replicationFactor=${replicationFactor}")
- assert(KafkaMessagingProvider.ensureTopic(config, topic, topic), s"Creation of topic ${topic} failed")
+ println(s"Create test topic '$topic' with replicationFactor=$replicationFactor")
+ assert(KafkaMessagingProvider.ensureTopic(config, topic, topic), s"Creation of topic $topic failed")
- val producer = new KafkaProducerConnector(config.kafkaHosts, ec)
+ val producer = new KafkaProducerConnector(config.kafkaHosts)
val consumer = new KafkaConsumerConnector(config.kafkaHosts, groupid, topic)
override def afterAll(): Unit = {
@@ -143,18 +150,31 @@ class KafkaConnectorTests extends FlatSpec with Matchers with WskActorSystem wit
val startLog = s", started"
val prevCount = startLog.r.findAllMatchIn(commandComponent(kafkaHost, "logs", s"kafka$i").stdout).length
- commandComponent(kafkaHost, "stop", s"kafka$i")
- sendAndReceiveMessage(message, 30 seconds, 30 seconds) should have size 1
+ // 1. stop one of kafka node
+ stopComponent(kafkaHost, s"kafka$i")
+
+ // 2. kafka cluster should be ok at least after three retries
+ retry({
+ val received = sendAndReceiveMessage(message, 40 seconds, 40 seconds)
+ received.size should be >= 1
+ }, 3, Some(100.milliseconds))
consumer.commit()
- commandComponent(kafkaHost, "start", s"kafka$i")
+ // 3. recover stopped node
+ startComponent(kafkaHost, s"kafka$i")
+
+ // 4. wait until kafka is up
retry({
startLog.r
.findAllMatchIn(commandComponent(kafkaHost, "logs", s"kafka$i").stdout)
.length shouldBe prevCount + 1
- }, 20, Some(1.second)) // wait until kafka is up
+ }, 20, Some(1.second))
- sendAndReceiveMessage(message, 30 seconds, 30 seconds) should have size 1
+ // 5. kafka cluster should be ok at least after three retires
+ retry({
+ val received = sendAndReceiveMessage(message, 40 seconds, 40 seconds)
+ received.size should be >= 1
+ }, 3, Some(100.milliseconds))
consumer.commit()
}
}
diff --git a/tests/src/test/scala/whisk/core/connector/test/MessageFeedTests.scala b/tests/src/test/scala/whisk/core/connector/test/MessageFeedTests.scala
index b240edd..c6743aa 100644
--- a/tests/src/test/scala/whisk/core/connector/test/MessageFeedTests.scala
+++ b/tests/src/test/scala/whisk/core/connector/test/MessageFeedTests.scala
@@ -65,7 +65,7 @@ class MessageFeedTests
val peekCount = new AtomicInteger()
val consumer = new TestConnector("feedtest", 4, true) {
- override def peek(duration: Duration) = {
+ override def peek(duration: FiniteDuration, retry: Int = 0) = {
peekCount.incrementAndGet()
super.peek(duration)
}
diff --git a/tests/src/test/scala/whisk/core/connector/test/TestConnector.scala b/tests/src/test/scala/whisk/core/connector/test/TestConnector.scala
index e3b9597..8b428e5 100644
--- a/tests/src/test/scala/whisk/core/connector/test/TestConnector.scala
+++ b/tests/src/test/scala/whisk/core/connector/test/TestConnector.scala
@@ -21,7 +21,7 @@ import java.util.ArrayList
import java.util.concurrent.LinkedBlockingQueue
import scala.concurrent.Future
-import scala.concurrent.duration.Duration
+import scala.concurrent.duration._
import scala.collection.JavaConversions._
import org.apache.kafka.clients.producer.RecordMetadata
@@ -37,7 +37,7 @@ class TestConnector(topic: String, override val maxPeek: Int, allowMoreThanMax:
extends MessageConsumer
with StreamLogging {
- override def peek(duration: Duration) = {
+ override def peek(duration: FiniteDuration, retry: Int = 0) = {
val msgs = new ArrayList[Message]
queue.synchronized {
queue.drainTo(msgs, if (allowMoreThanMax) Int.MaxValue else maxPeek)
@@ -48,7 +48,7 @@ class TestConnector(topic: String, override val maxPeek: Int, allowMoreThanMax:
}
}
- override def commit() = {
+ override def commit(retry: Int = 0) = {
if (throwCommitException) {
throw new Exception("commit failed")
} else {
--
To stop receiving notification emails like this one, please contact
markusthoemmes@apache.org.