You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@openwhisk.apache.org by GitBox <gi...@apache.org> on 2018/02/10 19:07:30 UTC

[GitHub] csantanapr closed pull request #3271: Rewrite Kafka test and accomodate for eventual consistency.

csantanapr closed pull request #3271: Rewrite Kafka test and accomodate for eventual consistency.
URL: https://github.com/apache/incubator-openwhisk/pull/3271
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/tests/src/test/scala/services/KafkaConnectorTests.scala b/tests/src/test/scala/services/KafkaConnectorTests.scala
index d0c3019900..7761856285 100644
--- a/tests/src/test/scala/services/KafkaConnectorTests.scala
+++ b/tests/src/test/scala/services/KafkaConnectorTests.scala
@@ -20,30 +20,26 @@ package services
 import java.io.File
 import java.util.Calendar
 
-import scala.concurrent.Await
-import scala.concurrent.duration.{DurationInt, FiniteDuration}
-import scala.language.postfixOps
-import scala.util.Try
+import common.{StreamLogging, TestUtils, WhiskProperties, WskActorSystem}
 import org.apache.kafka.clients.consumer.CommitFailedException
 import org.junit.runner.RunWith
-import org.scalatest.BeforeAndAfterAll
-import org.scalatest.FlatSpec
-import org.scalatest.Matchers
+import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
 import org.scalatest.junit.JUnitRunner
-import common.{StreamLogging, TestUtils, WhiskProperties, WskActorSystem}
 import whisk.common.TransactionId
-import whisk.connector.kafka.KafkaConsumerConnector
-import whisk.connector.kafka.KafkaProducerConnector
-import whisk.connector.kafka.KafkaMessagingProvider
+import whisk.connector.kafka.{KafkaConsumerConnector, KafkaMessagingProvider, KafkaProducerConnector}
 import whisk.core.WhiskConfig
 import whisk.core.connector.Message
-import whisk.utils.ExecutionContextFactory
-import whisk.utils.retry
+import whisk.utils.{retry, ExecutionContextFactory}
+
+import scala.concurrent.{Await, ExecutionContext}
+import scala.concurrent.duration.{DurationInt, FiniteDuration}
+import scala.language.postfixOps
+import scala.util.Try
 
 @RunWith(classOf[JUnitRunner])
 class KafkaConnectorTests extends FlatSpec with Matchers with WskActorSystem with BeforeAndAfterAll with StreamLogging {
-  implicit val transid = TransactionId.testing
-  implicit val ec = ExecutionContextFactory.makeCachedThreadPoolExecutionContext()
+  implicit val transid: TransactionId = TransactionId.testing
+  implicit val ec: ExecutionContext = ExecutionContextFactory.makeCachedThreadPoolExecutionContext()
 
   val config = new WhiskConfig(WhiskConfig.kafkaHosts)
   assert(config.isValid)
@@ -54,14 +50,14 @@ class KafkaConnectorTests extends FlatSpec with Matchers with WskActorSystem wit
   // Need to overwrite replication factor for tests that shut down and start
   // Kafka instances intentionally. These tests will fail if there is more than
   // one Kafka host but a replication factor of 1.
-  val kafkaHosts = config.kafkaHosts.split(",")
-  val replicationFactor = kafkaHosts.length / 2 + 1
+  val kafkaHosts: Array[String] = config.kafkaHosts.split(",")
+  val replicationFactor: Int = kafkaHosts.length / 2 + 1
   System.setProperty("whisk.kafka.replication-factor", replicationFactor.toString)
-  println(s"Create test topic '${topic}' with replicationFactor=${replicationFactor}")
-  assert(KafkaMessagingProvider.ensureTopic(config, topic, topic), s"Creation of topic ${topic} failed")
+  println(s"Create test topic '$topic' with replicationFactor=$replicationFactor")
+  assert(KafkaMessagingProvider.ensureTopic(config, topic, topic), s"Creation of topic $topic failed")
 
-  val sessionTimeout = 10 seconds
-  val maxPollInterval = 10 seconds
+  val sessionTimeout: FiniteDuration = 10 seconds
+  val maxPollInterval: FiniteDuration = 10 seconds
   val producer = new KafkaProducerConnector(config.kafkaHosts, ec)
   val consumer = new KafkaConsumerConnector(
     config.kafkaHosts,
@@ -70,13 +66,13 @@ class KafkaConnectorTests extends FlatSpec with Matchers with WskActorSystem wit
     sessionTimeout = sessionTimeout,
     maxPollInterval = maxPollInterval)
 
-  override def afterAll() = {
+  override def afterAll(): Unit = {
     producer.close()
     consumer.close()
     super.afterAll()
   }
 
-  def commandComponent(host: String, command: String, component: String) = {
+  def commandComponent(host: String, command: String, component: String): TestUtils.RunResult = {
     def file(path: String) = Try(new File(path)).filter(_.exists).map(_.getAbsolutePath).toOption
     val docker = (file("/usr/bin/docker") orElse file("/usr/local/bin/docker")).getOrElse("docker")
     val dockerPort = WhiskProperties.getProperty(WhiskConfig.dockerPort)
@@ -88,67 +84,70 @@ class KafkaConnectorTests extends FlatSpec with Matchers with WskActorSystem wit
   def sendAndReceiveMessage(message: Message,
                             waitForSend: FiniteDuration,
                             waitForReceive: FiniteDuration): Iterable[String] = {
-    val start = java.lang.System.currentTimeMillis
-    println(s"Send message to topic.\n")
-    val sent = Await.result(producer.send(topic, message), waitForSend)
-    println(s"Successfully sent message to topic: ${sent}\n")
-    println(s"Receiving message from topic.\n")
-    val received = consumer.peek(waitForReceive).map { case (_, _, _, msg) => new String(msg, "utf-8") }
-    val end = java.lang.System.currentTimeMillis
-    val elapsed = end - start
-    println(s"Received ${received.size}. Took $elapsed msec: $received\n")
-
-    received
+    retry {
+      val start = java.lang.System.currentTimeMillis
+      println(s"Send message to topic.")
+      val sent = Await.result(producer.send(topic, message), waitForSend)
+      println(s"Successfully sent message to topic: $sent")
+      println(s"Receiving message from topic.")
+      val received = consumer.peek(waitForReceive).map { case (_, _, _, msg) => new String(msg, "utf-8") }
+      val end = java.lang.System.currentTimeMillis
+      val elapsed = end - start
+      println(s"Received ${received.size}. Took $elapsed msec: $received")
+
+      received.last should be(message.serialize)
+      received
+    }
   }
 
+  def createMessage(): Message = new Message { override val serialize: String = Calendar.getInstance.getTime.toString }
+
   behavior of "Kafka connector"
 
   it should "send and receive a kafka message which sets up the topic" in {
     for (i <- 0 until 5) {
-      val message = new Message { override val serialize = Calendar.getInstance().getTime().toString }
+      val message = createMessage()
       val received = sendAndReceiveMessage(message, 20 seconds, 10 seconds)
       received.size should be >= 1
-      received.last should be(message.serialize)
       consumer.commit()
     }
   }
 
   it should "send and receive a kafka message even after session timeout" in {
-    for (i <- 0 until 4) {
-      val message = new Message { override val serialize = Calendar.getInstance().getTime().toString }
-      val received = sendAndReceiveMessage(message, 1 seconds, 1 seconds)
+    // "clear" the topic so there are 0 messages to be read
+    sendAndReceiveMessage(createMessage(), 1 seconds, 1 seconds)
+    consumer.commit()
 
-      // only the last iteration will have an updated cursor
-      // iteration 0: get whatever is on the topic (at least 1 but may be more if a previous test failed)
-      // iteration 1: get iteration 0 records + 1 more (since we intentionally failed the commit on previous iteration)
-      // iteration 2: get iteration 1 records + 1 more (since we intentionally failed the commit on previous iteration)
-      // iteration 3: get exactly 1 records since iteration 2 should have forwarded the cursor
-      if (i < 3) {
-        received.size should be >= i + 1
-      } else {
-        received.size should be(1)
-      }
-      received.last should be(message.serialize)
+    (1 to 2).foreach { i =>
+      val message = createMessage()
+      val received = sendAndReceiveMessage(message, 1 seconds, 1 seconds)
+      received.size shouldBe i // should accumulate since the commits fail
 
-      if (i < 2) {
-        Thread.sleep((maxPollInterval + 1.second).toMillis)
-        a[CommitFailedException] should be thrownBy {
-          consumer.commit() // sleep should cause commit to fail
-        }
-      } else consumer.commit()
+      Thread.sleep((maxPollInterval + 1.second).toMillis)
+      a[CommitFailedException] should be thrownBy consumer.commit()
     }
+
+    val message3 = createMessage()
+    val received3 = sendAndReceiveMessage(message3, 1 seconds, 1 seconds)
+    received3.size shouldBe 2 + 1 // since the last commit still failed
+    consumer.commit()
+
+    val message4 = createMessage()
+    val received4 = sendAndReceiveMessage(message4, 1 seconds, 1 seconds)
+    received4.size shouldBe 1
+    consumer.commit()
   }
 
   if (kafkaHosts.length > 1) {
     it should "send and receive a kafka message even after shutdown one of instances" in {
-      for (i <- 0 until kafkaHosts.length) {
-        val message = new Message { override val serialize = Calendar.getInstance().getTime().toString }
+      kafkaHosts.indices.foreach { i =>
+        val message = createMessage()
         val kafkaHost = kafkaHosts(i).split(":")(0)
         val startLog = s", started"
         val prevCount = startLog.r.findAllMatchIn(commandComponent(kafkaHost, "logs", s"kafka$i").stdout).length
 
         commandComponent(kafkaHost, "stop", s"kafka$i")
-        sendAndReceiveMessage(message, 30 seconds, 30 seconds) should have size (1)
+        sendAndReceiveMessage(message, 30 seconds, 30 seconds) should have size 1
         consumer.commit()
 
         commandComponent(kafkaHost, "start", s"kafka$i")
@@ -158,7 +157,7 @@ class KafkaConnectorTests extends FlatSpec with Matchers with WskActorSystem wit
             .length shouldBe prevCount + 1
         }, 20, Some(1.second)) // wait until kafka is up
 
-        sendAndReceiveMessage(message, 30 seconds, 30 seconds) should have size (1)
+        sendAndReceiveMessage(message, 30 seconds, 30 seconds) should have size 1
         consumer.commit()
       }
     }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services