You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by cs...@apache.org on 2018/02/10 19:07:30 UTC
[incubator-openwhisk] branch master updated: Rewrite Kafka test and
accomodate for eventual consistency. (#3271)
This is an automated email from the ASF dual-hosted git repository.
csantanapr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new add7040 Rewrite Kafka test and accomodate for eventual consistency. (#3271)
add7040 is described below
commit add7040f5597780901bdb44fc25b5b67f9d82018
Author: Markus Thömmes <ma...@me.com>
AuthorDate: Sat Feb 10 20:07:25 2018 +0100
Rewrite Kafka test and accomodate for eventual consistency. (#3271)
---
.../test/scala/services/KafkaConnectorTests.scala | 119 ++++++++++-----------
1 file changed, 59 insertions(+), 60 deletions(-)
diff --git a/tests/src/test/scala/services/KafkaConnectorTests.scala b/tests/src/test/scala/services/KafkaConnectorTests.scala
index d0c3019..7761856 100644
--- a/tests/src/test/scala/services/KafkaConnectorTests.scala
+++ b/tests/src/test/scala/services/KafkaConnectorTests.scala
@@ -20,30 +20,26 @@ package services
import java.io.File
import java.util.Calendar
-import scala.concurrent.Await
-import scala.concurrent.duration.{DurationInt, FiniteDuration}
-import scala.language.postfixOps
-import scala.util.Try
+import common.{StreamLogging, TestUtils, WhiskProperties, WskActorSystem}
import org.apache.kafka.clients.consumer.CommitFailedException
import org.junit.runner.RunWith
-import org.scalatest.BeforeAndAfterAll
-import org.scalatest.FlatSpec
-import org.scalatest.Matchers
+import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
import org.scalatest.junit.JUnitRunner
-import common.{StreamLogging, TestUtils, WhiskProperties, WskActorSystem}
import whisk.common.TransactionId
-import whisk.connector.kafka.KafkaConsumerConnector
-import whisk.connector.kafka.KafkaProducerConnector
-import whisk.connector.kafka.KafkaMessagingProvider
+import whisk.connector.kafka.{KafkaConsumerConnector, KafkaMessagingProvider, KafkaProducerConnector}
import whisk.core.WhiskConfig
import whisk.core.connector.Message
-import whisk.utils.ExecutionContextFactory
-import whisk.utils.retry
+import whisk.utils.{retry, ExecutionContextFactory}
+
+import scala.concurrent.{Await, ExecutionContext}
+import scala.concurrent.duration.{DurationInt, FiniteDuration}
+import scala.language.postfixOps
+import scala.util.Try
@RunWith(classOf[JUnitRunner])
class KafkaConnectorTests extends FlatSpec with Matchers with WskActorSystem with BeforeAndAfterAll with StreamLogging {
- implicit val transid = TransactionId.testing
- implicit val ec = ExecutionContextFactory.makeCachedThreadPoolExecutionContext()
+ implicit val transid: TransactionId = TransactionId.testing
+ implicit val ec: ExecutionContext = ExecutionContextFactory.makeCachedThreadPoolExecutionContext()
val config = new WhiskConfig(WhiskConfig.kafkaHosts)
assert(config.isValid)
@@ -54,14 +50,14 @@ class KafkaConnectorTests extends FlatSpec with Matchers with WskActorSystem wit
// Need to overwrite replication factor for tests that shut down and start
// Kafka instances intentionally. These tests will fail if there is more than
// one Kafka host but a replication factor of 1.
- val kafkaHosts = config.kafkaHosts.split(",")
- val replicationFactor = kafkaHosts.length / 2 + 1
+ val kafkaHosts: Array[String] = config.kafkaHosts.split(",")
+ val replicationFactor: Int = kafkaHosts.length / 2 + 1
System.setProperty("whisk.kafka.replication-factor", replicationFactor.toString)
- println(s"Create test topic '${topic}' with replicationFactor=${replicationFactor}")
- assert(KafkaMessagingProvider.ensureTopic(config, topic, topic), s"Creation of topic ${topic} failed")
+ println(s"Create test topic '$topic' with replicationFactor=$replicationFactor")
+ assert(KafkaMessagingProvider.ensureTopic(config, topic, topic), s"Creation of topic $topic failed")
- val sessionTimeout = 10 seconds
- val maxPollInterval = 10 seconds
+ val sessionTimeout: FiniteDuration = 10 seconds
+ val maxPollInterval: FiniteDuration = 10 seconds
val producer = new KafkaProducerConnector(config.kafkaHosts, ec)
val consumer = new KafkaConsumerConnector(
config.kafkaHosts,
@@ -70,13 +66,13 @@ class KafkaConnectorTests extends FlatSpec with Matchers with WskActorSystem wit
sessionTimeout = sessionTimeout,
maxPollInterval = maxPollInterval)
- override def afterAll() = {
+ override def afterAll(): Unit = {
producer.close()
consumer.close()
super.afterAll()
}
- def commandComponent(host: String, command: String, component: String) = {
+ def commandComponent(host: String, command: String, component: String): TestUtils.RunResult = {
def file(path: String) = Try(new File(path)).filter(_.exists).map(_.getAbsolutePath).toOption
val docker = (file("/usr/bin/docker") orElse file("/usr/local/bin/docker")).getOrElse("docker")
val dockerPort = WhiskProperties.getProperty(WhiskConfig.dockerPort)
@@ -88,67 +84,70 @@ class KafkaConnectorTests extends FlatSpec with Matchers with WskActorSystem wit
def sendAndReceiveMessage(message: Message,
waitForSend: FiniteDuration,
waitForReceive: FiniteDuration): Iterable[String] = {
- val start = java.lang.System.currentTimeMillis
- println(s"Send message to topic.\n")
- val sent = Await.result(producer.send(topic, message), waitForSend)
- println(s"Successfully sent message to topic: ${sent}\n")
- println(s"Receiving message from topic.\n")
- val received = consumer.peek(waitForReceive).map { case (_, _, _, msg) => new String(msg, "utf-8") }
- val end = java.lang.System.currentTimeMillis
- val elapsed = end - start
- println(s"Received ${received.size}. Took $elapsed msec: $received\n")
-
- received
+ retry {
+ val start = java.lang.System.currentTimeMillis
+ println(s"Send message to topic.")
+ val sent = Await.result(producer.send(topic, message), waitForSend)
+ println(s"Successfully sent message to topic: $sent")
+ println(s"Receiving message from topic.")
+ val received = consumer.peek(waitForReceive).map { case (_, _, _, msg) => new String(msg, "utf-8") }
+ val end = java.lang.System.currentTimeMillis
+ val elapsed = end - start
+ println(s"Received ${received.size}. Took $elapsed msec: $received")
+
+ received.last should be(message.serialize)
+ received
+ }
}
+ def createMessage(): Message = new Message { override val serialize: String = Calendar.getInstance.getTime.toString }
+
behavior of "Kafka connector"
it should "send and receive a kafka message which sets up the topic" in {
for (i <- 0 until 5) {
- val message = new Message { override val serialize = Calendar.getInstance().getTime().toString }
+ val message = createMessage()
val received = sendAndReceiveMessage(message, 20 seconds, 10 seconds)
received.size should be >= 1
- received.last should be(message.serialize)
consumer.commit()
}
}
it should "send and receive a kafka message even after session timeout" in {
- for (i <- 0 until 4) {
- val message = new Message { override val serialize = Calendar.getInstance().getTime().toString }
- val received = sendAndReceiveMessage(message, 1 seconds, 1 seconds)
+ // "clear" the topic so there are 0 messages to be read
+ sendAndReceiveMessage(createMessage(), 1 seconds, 1 seconds)
+ consumer.commit()
- // only the last iteration will have an updated cursor
- // iteration 0: get whatever is on the topic (at least 1 but may be more if a previous test failed)
- // iteration 1: get iteration 0 records + 1 more (since we intentionally failed the commit on previous iteration)
- // iteration 2: get iteration 1 records + 1 more (since we intentionally failed the commit on previous iteration)
- // iteration 3: get exactly 1 records since iteration 2 should have forwarded the cursor
- if (i < 3) {
- received.size should be >= i + 1
- } else {
- received.size should be(1)
- }
- received.last should be(message.serialize)
+ (1 to 2).foreach { i =>
+ val message = createMessage()
+ val received = sendAndReceiveMessage(message, 1 seconds, 1 seconds)
+ received.size shouldBe i // should accumulate since the commits fail
- if (i < 2) {
- Thread.sleep((maxPollInterval + 1.second).toMillis)
- a[CommitFailedException] should be thrownBy {
- consumer.commit() // sleep should cause commit to fail
- }
- } else consumer.commit()
+ Thread.sleep((maxPollInterval + 1.second).toMillis)
+ a[CommitFailedException] should be thrownBy consumer.commit()
}
+
+ val message3 = createMessage()
+ val received3 = sendAndReceiveMessage(message3, 1 seconds, 1 seconds)
+ received3.size shouldBe 2 + 1 // since the last commit still failed
+ consumer.commit()
+
+ val message4 = createMessage()
+ val received4 = sendAndReceiveMessage(message4, 1 seconds, 1 seconds)
+ received4.size shouldBe 1
+ consumer.commit()
}
if (kafkaHosts.length > 1) {
it should "send and receive a kafka message even after shutdown one of instances" in {
- for (i <- 0 until kafkaHosts.length) {
- val message = new Message { override val serialize = Calendar.getInstance().getTime().toString }
+ kafkaHosts.indices.foreach { i =>
+ val message = createMessage()
val kafkaHost = kafkaHosts(i).split(":")(0)
val startLog = s", started"
val prevCount = startLog.r.findAllMatchIn(commandComponent(kafkaHost, "logs", s"kafka$i").stdout).length
commandComponent(kafkaHost, "stop", s"kafka$i")
- sendAndReceiveMessage(message, 30 seconds, 30 seconds) should have size (1)
+ sendAndReceiveMessage(message, 30 seconds, 30 seconds) should have size 1
consumer.commit()
commandComponent(kafkaHost, "start", s"kafka$i")
@@ -158,7 +157,7 @@ class KafkaConnectorTests extends FlatSpec with Matchers with WskActorSystem wit
.length shouldBe prevCount + 1
}, 20, Some(1.second)) // wait until kafka is up
- sendAndReceiveMessage(message, 30 seconds, 30 seconds) should have size (1)
+ sendAndReceiveMessage(message, 30 seconds, 30 seconds) should have size 1
consumer.commit()
}
}
--
To stop receiving notification emails like this one, please contact
csantanapr@apache.org.