You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by cs...@apache.org on 2018/02/10 19:07:30 UTC

[incubator-openwhisk] branch master updated: Rewrite Kafka test and accomodate for eventual consistency. (#3271)

This is an automated email from the ASF dual-hosted git repository.

csantanapr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new add7040  Rewrite Kafka test and accomodate for eventual consistency. (#3271)
add7040 is described below

commit add7040f5597780901bdb44fc25b5b67f9d82018
Author: Markus Thömmes <ma...@me.com>
AuthorDate: Sat Feb 10 20:07:25 2018 +0100

    Rewrite Kafka test and accomodate for eventual consistency. (#3271)
---
 .../test/scala/services/KafkaConnectorTests.scala  | 119 ++++++++++-----------
 1 file changed, 59 insertions(+), 60 deletions(-)

diff --git a/tests/src/test/scala/services/KafkaConnectorTests.scala b/tests/src/test/scala/services/KafkaConnectorTests.scala
index d0c3019..7761856 100644
--- a/tests/src/test/scala/services/KafkaConnectorTests.scala
+++ b/tests/src/test/scala/services/KafkaConnectorTests.scala
@@ -20,30 +20,26 @@ package services
 import java.io.File
 import java.util.Calendar
 
-import scala.concurrent.Await
-import scala.concurrent.duration.{DurationInt, FiniteDuration}
-import scala.language.postfixOps
-import scala.util.Try
+import common.{StreamLogging, TestUtils, WhiskProperties, WskActorSystem}
 import org.apache.kafka.clients.consumer.CommitFailedException
 import org.junit.runner.RunWith
-import org.scalatest.BeforeAndAfterAll
-import org.scalatest.FlatSpec
-import org.scalatest.Matchers
+import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
 import org.scalatest.junit.JUnitRunner
-import common.{StreamLogging, TestUtils, WhiskProperties, WskActorSystem}
 import whisk.common.TransactionId
-import whisk.connector.kafka.KafkaConsumerConnector
-import whisk.connector.kafka.KafkaProducerConnector
-import whisk.connector.kafka.KafkaMessagingProvider
+import whisk.connector.kafka.{KafkaConsumerConnector, KafkaMessagingProvider, KafkaProducerConnector}
 import whisk.core.WhiskConfig
 import whisk.core.connector.Message
-import whisk.utils.ExecutionContextFactory
-import whisk.utils.retry
+import whisk.utils.{retry, ExecutionContextFactory}
+
+import scala.concurrent.{Await, ExecutionContext}
+import scala.concurrent.duration.{DurationInt, FiniteDuration}
+import scala.language.postfixOps
+import scala.util.Try
 
 @RunWith(classOf[JUnitRunner])
 class KafkaConnectorTests extends FlatSpec with Matchers with WskActorSystem with BeforeAndAfterAll with StreamLogging {
-  implicit val transid = TransactionId.testing
-  implicit val ec = ExecutionContextFactory.makeCachedThreadPoolExecutionContext()
+  implicit val transid: TransactionId = TransactionId.testing
+  implicit val ec: ExecutionContext = ExecutionContextFactory.makeCachedThreadPoolExecutionContext()
 
   val config = new WhiskConfig(WhiskConfig.kafkaHosts)
   assert(config.isValid)
@@ -54,14 +50,14 @@ class KafkaConnectorTests extends FlatSpec with Matchers with WskActorSystem wit
   // Need to overwrite replication factor for tests that shut down and start
   // Kafka instances intentionally. These tests will fail if there is more than
   // one Kafka host but a replication factor of 1.
-  val kafkaHosts = config.kafkaHosts.split(",")
-  val replicationFactor = kafkaHosts.length / 2 + 1
+  val kafkaHosts: Array[String] = config.kafkaHosts.split(",")
+  val replicationFactor: Int = kafkaHosts.length / 2 + 1
   System.setProperty("whisk.kafka.replication-factor", replicationFactor.toString)
-  println(s"Create test topic '${topic}' with replicationFactor=${replicationFactor}")
-  assert(KafkaMessagingProvider.ensureTopic(config, topic, topic), s"Creation of topic ${topic} failed")
+  println(s"Create test topic '$topic' with replicationFactor=$replicationFactor")
+  assert(KafkaMessagingProvider.ensureTopic(config, topic, topic), s"Creation of topic $topic failed")
 
-  val sessionTimeout = 10 seconds
-  val maxPollInterval = 10 seconds
+  val sessionTimeout: FiniteDuration = 10 seconds
+  val maxPollInterval: FiniteDuration = 10 seconds
   val producer = new KafkaProducerConnector(config.kafkaHosts, ec)
   val consumer = new KafkaConsumerConnector(
     config.kafkaHosts,
@@ -70,13 +66,13 @@ class KafkaConnectorTests extends FlatSpec with Matchers with WskActorSystem wit
     sessionTimeout = sessionTimeout,
     maxPollInterval = maxPollInterval)
 
-  override def afterAll() = {
+  override def afterAll(): Unit = {
     producer.close()
     consumer.close()
     super.afterAll()
   }
 
-  def commandComponent(host: String, command: String, component: String) = {
+  def commandComponent(host: String, command: String, component: String): TestUtils.RunResult = {
     def file(path: String) = Try(new File(path)).filter(_.exists).map(_.getAbsolutePath).toOption
     val docker = (file("/usr/bin/docker") orElse file("/usr/local/bin/docker")).getOrElse("docker")
     val dockerPort = WhiskProperties.getProperty(WhiskConfig.dockerPort)
@@ -88,67 +84,70 @@ class KafkaConnectorTests extends FlatSpec with Matchers with WskActorSystem wit
   def sendAndReceiveMessage(message: Message,
                             waitForSend: FiniteDuration,
                             waitForReceive: FiniteDuration): Iterable[String] = {
-    val start = java.lang.System.currentTimeMillis
-    println(s"Send message to topic.\n")
-    val sent = Await.result(producer.send(topic, message), waitForSend)
-    println(s"Successfully sent message to topic: ${sent}\n")
-    println(s"Receiving message from topic.\n")
-    val received = consumer.peek(waitForReceive).map { case (_, _, _, msg) => new String(msg, "utf-8") }
-    val end = java.lang.System.currentTimeMillis
-    val elapsed = end - start
-    println(s"Received ${received.size}. Took $elapsed msec: $received\n")
-
-    received
+    retry {
+      val start = java.lang.System.currentTimeMillis
+      println(s"Send message to topic.")
+      val sent = Await.result(producer.send(topic, message), waitForSend)
+      println(s"Successfully sent message to topic: $sent")
+      println(s"Receiving message from topic.")
+      val received = consumer.peek(waitForReceive).map { case (_, _, _, msg) => new String(msg, "utf-8") }
+      val end = java.lang.System.currentTimeMillis
+      val elapsed = end - start
+      println(s"Received ${received.size}. Took $elapsed msec: $received")
+
+      received.last should be(message.serialize)
+      received
+    }
   }
 
+  def createMessage(): Message = new Message { override val serialize: String = Calendar.getInstance.getTime.toString }
+
   behavior of "Kafka connector"
 
   it should "send and receive a kafka message which sets up the topic" in {
     for (i <- 0 until 5) {
-      val message = new Message { override val serialize = Calendar.getInstance().getTime().toString }
+      val message = createMessage()
       val received = sendAndReceiveMessage(message, 20 seconds, 10 seconds)
       received.size should be >= 1
-      received.last should be(message.serialize)
       consumer.commit()
     }
   }
 
   it should "send and receive a kafka message even after session timeout" in {
-    for (i <- 0 until 4) {
-      val message = new Message { override val serialize = Calendar.getInstance().getTime().toString }
-      val received = sendAndReceiveMessage(message, 1 seconds, 1 seconds)
+    // "clear" the topic so there are 0 messages to be read
+    sendAndReceiveMessage(createMessage(), 1 seconds, 1 seconds)
+    consumer.commit()
 
-      // only the last iteration will have an updated cursor
-      // iteration 0: get whatever is on the topic (at least 1 but may be more if a previous test failed)
-      // iteration 1: get iteration 0 records + 1 more (since we intentionally failed the commit on previous iteration)
-      // iteration 2: get iteration 1 records + 1 more (since we intentionally failed the commit on previous iteration)
-      // iteration 3: get exactly 1 records since iteration 2 should have forwarded the cursor
-      if (i < 3) {
-        received.size should be >= i + 1
-      } else {
-        received.size should be(1)
-      }
-      received.last should be(message.serialize)
+    (1 to 2).foreach { i =>
+      val message = createMessage()
+      val received = sendAndReceiveMessage(message, 1 seconds, 1 seconds)
+      received.size shouldBe i // should accumulate since the commits fail
 
-      if (i < 2) {
-        Thread.sleep((maxPollInterval + 1.second).toMillis)
-        a[CommitFailedException] should be thrownBy {
-          consumer.commit() // sleep should cause commit to fail
-        }
-      } else consumer.commit()
+      Thread.sleep((maxPollInterval + 1.second).toMillis)
+      a[CommitFailedException] should be thrownBy consumer.commit()
     }
+
+    val message3 = createMessage()
+    val received3 = sendAndReceiveMessage(message3, 1 seconds, 1 seconds)
+    received3.size shouldBe 2 + 1 // since the last commit still failed
+    consumer.commit()
+
+    val message4 = createMessage()
+    val received4 = sendAndReceiveMessage(message4, 1 seconds, 1 seconds)
+    received4.size shouldBe 1
+    consumer.commit()
   }
 
   if (kafkaHosts.length > 1) {
     it should "send and receive a kafka message even after shutdown one of instances" in {
-      for (i <- 0 until kafkaHosts.length) {
-        val message = new Message { override val serialize = Calendar.getInstance().getTime().toString }
+      kafkaHosts.indices.foreach { i =>
+        val message = createMessage()
         val kafkaHost = kafkaHosts(i).split(":")(0)
         val startLog = s", started"
         val prevCount = startLog.r.findAllMatchIn(commandComponent(kafkaHost, "logs", s"kafka$i").stdout).length
 
         commandComponent(kafkaHost, "stop", s"kafka$i")
-        sendAndReceiveMessage(message, 30 seconds, 30 seconds) should have size (1)
+        sendAndReceiveMessage(message, 30 seconds, 30 seconds) should have size 1
         consumer.commit()
 
         commandComponent(kafkaHost, "start", s"kafka$i")
@@ -158,7 +157,7 @@ class KafkaConnectorTests extends FlatSpec with Matchers with WskActorSystem wit
             .length shouldBe prevCount + 1
         }, 20, Some(1.second)) // wait until kafka is up
 
-        sendAndReceiveMessage(message, 30 seconds, 30 seconds) should have size (1)
+        sendAndReceiveMessage(message, 30 seconds, 30 seconds) should have size 1
         consumer.commit()
       }
     }

-- 
To stop receiving notification emails like this one, please contact
csantanapr@apache.org.