You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by bd...@apache.org on 2023/05/03 03:41:15 UTC

[openwhisk] branch master updated: upgrade kafka client library to 2.8.2 (#5400)

This is an automated email from the ASF dual-hosted git repository.

bdoyle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 6bc559d41 upgrade kafka client library to 2.8.2 (#5400)
6bc559d41 is described below

commit 6bc559d41f74525e26c9bd03d01dc4a3e9b5c658
Author: Brendan Doyle <bd...@gmail.com>
AuthorDate: Tue May 2 20:41:07 2023 -0700

    upgrade kafka client library to 2.8.2 (#5400)
    
    * upgrade kafka client library
    
    * attempt build upgrading embedded kafka
    
    * attempt to fix standalone server startup test
    
    * bump to kafka client to latest patch
    
    * revert kafka test timeout config change
    
    ---------
    
    Co-authored-by: Brendan Doyle <br...@qualtrics.com>
---
 common/scala/build.gradle                          |  2 +-
 core/monitoring/user-events/build.gradle           |  4 +--
 .../core/monitoring/metrics/KafkaSpecBase.scala    | 20 ++++++++++++---
 core/standalone/build.gradle                       |  4 +--
 .../openwhisk/standalone/KafkaLauncher.scala       | 10 +++++---
 settings.gradle                                    |  2 +-
 tests/build.gradle                                 |  5 ++--
 .../cosmosdb/cache/CacheInvalidatorTests.scala     | 30 ++++++++++++----------
 8 files changed, 47 insertions(+), 30 deletions(-)

diff --git a/common/scala/build.gradle b/common/scala/build.gradle
index fbbe74a60..3c165e635 100644
--- a/common/scala/build.gradle
+++ b/common/scala/build.gradle
@@ -61,7 +61,7 @@ dependencies {
     api "commons-codec:commons-codec:1.9"
     api "commons-io:commons-io:2.11.0"
     api "commons-collections:commons-collections:3.2.2"
-    api "org.apache.kafka:kafka-clients:2.4.0"
+    api "org.apache.kafka:kafka-clients:2.8.2"
     api "org.apache.httpcomponents:httpclient:4.5.5"
     api "com.fasterxml.uuid:java-uuid-generator:3.1.3"
     api "com.github.ben-manes.caffeine:caffeine:2.6.2"
diff --git a/core/monitoring/user-events/build.gradle b/core/monitoring/user-events/build.gradle
index c6f4a9fc4..42269d1f6 100644
--- a/core/monitoring/user-events/build.gradle
+++ b/core/monitoring/user-events/build.gradle
@@ -45,9 +45,9 @@ dependencies {
 
     testImplementation "junit:junit:4.11"
     testImplementation "org.scalatest:scalatest_${gradle.scala.depVersion}:3.0.8"
-    testImplementation "io.github.embeddedkafka:embedded-kafka_${gradle.scala.depVersion}:2.4.0"
+    testImplementation "io.github.embeddedkafka:embedded-kafka_${gradle.scala.depVersion}:2.8.1"
     constraints {
-        testImplementation("io.github.embeddedkafka:embedded-kafka_${gradle.scala.depVersion}:2.4.0")
+        testImplementation("io.github.embeddedkafka:embedded-kafka_${gradle.scala.depVersion}:2.8.1")
         testImplementation('org.apache.avro:avro:1.11.1') {
             because 'embeddedkafka dependency cannot be upgraded currently and avro in embedded kafka 2.4.0 has vulns'
         }
diff --git a/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/KafkaSpecBase.scala b/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/KafkaSpecBase.scala
index 5ba749191..e9998b19f 100644
--- a/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/KafkaSpecBase.scala
+++ b/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/KafkaSpecBase.scala
@@ -17,8 +17,8 @@
 
 package org.apache.openwhisk.core.monitoring.metrics
 
-import akka.kafka.testkit.scaladsl.{EmbeddedKafkaLike, ScalatestKafkaSpec}
-import net.manub.embeddedkafka.EmbeddedKafka
+import akka.kafka.testkit.scaladsl.ScalatestKafkaSpec
+import io.github.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig}
 import org.scalatest._
 import org.scalatest.concurrent.{Eventually, IntegrationPatience, ScalaFutures}
 
@@ -30,11 +30,25 @@ abstract class KafkaSpecBase
     with ScalaFutures
     with FlatSpecLike
     with EmbeddedKafka
-    with EmbeddedKafkaLike
     with IntegrationPatience
     with Eventually
     with EventsTestHelper { this: Suite =>
   implicit val timeoutConfig: PatienceConfig = PatienceConfig(1.minute)
   override val sleepAfterProduce: FiniteDuration = 10.seconds
   override protected val topicCreationTimeout = 60.seconds
+  override protected val producerPublishTimeout: FiniteDuration = 60.seconds
+
+  lazy implicit val embeddedKafkaConfig: EmbeddedKafkaConfig = EmbeddedKafkaConfig(kafkaPort, zooKeeperPort)
+
+  override def bootstrapServers = s"localhost:${embeddedKafkaConfig.kafkaPort}"
+
+  override def setUp(): Unit = {
+    EmbeddedKafka.start()(embeddedKafkaConfig)
+    super.setUp()
+  }
+
+  override def cleanUp(): Unit = {
+    super.cleanUp()
+    EmbeddedKafka.stop()
+  }
 }
diff --git a/core/standalone/build.gradle b/core/standalone/build.gradle
index 031e529d8..46c526b9b 100644
--- a/core/standalone/build.gradle
+++ b/core/standalone/build.gradle
@@ -169,9 +169,9 @@ dependencies {
     implementation project(':tools:admin')
     implementation "org.rogach:scallop_${gradle.scala.depVersion}:3.3.2"
 
-    implementation "io.github.embeddedkafka:embedded-kafka_${gradle.scala.depVersion}:2.4.0"
+    implementation "io.github.embeddedkafka:embedded-kafka_${gradle.scala.depVersion}:2.8.1"
     constraints {
-        implementation("io.github.embeddedkafka:embedded-kafka_${gradle.scala.depVersion}:2.4.0")
+        implementation("io.github.embeddedkafka:embedded-kafka_${gradle.scala.depVersion}:2.8.1")
         implementation('org.apache.avro:avro:1.11.1') {
             because 'embeddedkafka dependency cannot be upgraded currently and avro in embedded kafka 2.4.0 has vulns'
         }
diff --git a/core/standalone/src/main/scala/org/apache/openwhisk/standalone/KafkaLauncher.scala b/core/standalone/src/main/scala/org/apache/openwhisk/standalone/KafkaLauncher.scala
index 2c6790868..41e91e4a0 100644
--- a/core/standalone/src/main/scala/org/apache/openwhisk/standalone/KafkaLauncher.scala
+++ b/core/standalone/src/main/scala/org/apache/openwhisk/standalone/KafkaLauncher.scala
@@ -18,10 +18,9 @@
 package org.apache.openwhisk.standalone
 
 import java.io.File
-
 import akka.actor.ActorSystem
 import kafka.server.KafkaConfig
-import net.manub.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig}
+import io.github.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig}
 import org.apache.commons.io.FileUtils
 import org.apache.openwhisk.common.{Logging, TransactionId}
 import org.apache.openwhisk.core.WhiskConfig
@@ -30,6 +29,7 @@ import org.apache.openwhisk.core.entity.ControllerInstanceId
 import org.apache.openwhisk.core.loadBalancer.{LeanBalancer, LoadBalancer, LoadBalancerProvider}
 import org.apache.openwhisk.standalone.StandaloneDockerSupport.{checkOrAllocatePort, containerName, createRunCmd}
 
+import java.nio.file.FileSystems
 import scala.concurrent.{ExecutionContext, Future}
 import scala.reflect.io.Directory
 import scala.util.Try
@@ -66,8 +66,10 @@ class KafkaLauncher(
       EmbeddedKafkaConfig(kafkaPort = kafkaPort, zooKeeperPort = zkPort, customBrokerProperties = brokerProps)
 
     val t = Try {
-      EmbeddedKafka.startZooKeeper(createDir("zookeeper"))
-      EmbeddedKafka.startKafka(createDir("kafka"))
+      createDir("zookeeper")
+      createDir("kafka")
+      EmbeddedKafka.startZooKeeper(FileSystems.getDefault.getPath(workDir.getPath,"zookeeper"))
+      EmbeddedKafka.startKafka(FileSystems.getDefault.getPath(workDir.getPath,"kafka"))
     }
 
     Future
diff --git a/settings.gradle b/settings.gradle
index 0ba8ae6be..9c9cca5e5 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -78,7 +78,7 @@ gradle.ext.scalafmt = [
 ]
 
 gradle.ext.akka = [version : '2.6.12']
-gradle.ext.akka_kafka = [version : '2.0.2']
+gradle.ext.akka_kafka = [version : '2.0.5']
 gradle.ext.akka_http = [version : '10.2.4']
 gradle.ext.akka_management = [version : '1.0.5']
 
diff --git a/tests/build.gradle b/tests/build.gradle
index fe0ef9eeb..61e07b258 100644
--- a/tests/build.gradle
+++ b/tests/build.gradle
@@ -239,9 +239,9 @@ dependencies {
             because 'swagger-request-validator-core cannot be upgraded to 2.x where vuln is remediated'
         }
     }
-    implementation "io.github.embeddedkafka:embedded-kafka_${gradle.scala.depVersion}:2.4.0"
+    implementation "io.github.embeddedkafka:embedded-kafka_${gradle.scala.depVersion}:2.8.1"
     constraints {
-        implementation("io.github.embeddedkafka:embedded-kafka_${gradle.scala.depVersion}:2.4.0")
+        implementation("io.github.embeddedkafka:embedded-kafka_${gradle.scala.depVersion}:2.8.1")
         implementation('org.apache.avro:avro:1.11.1') {
             because 'embeddedkafka dependency cannot be upgraded currently and avro in embedded kafka 2.4.0 has vulns'
         }
@@ -255,7 +255,6 @@ dependencies {
     implementation "com.microsoft.azure:azure-cosmos:3.7.6"
     implementation 'org.testcontainers:elasticsearch:1.17.6'
     implementation 'org.testcontainers:mongodb:1.17.1'
-
     implementation project(':common:scala')
     implementation project(':core:controller')
     implementation project(':core:scheduler')
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/cache/CacheInvalidatorTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/cache/CacheInvalidatorTests.scala
index e4d252801..a40ed64df 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/cache/CacheInvalidatorTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/cache/CacheInvalidatorTests.scala
@@ -17,26 +17,17 @@
 
 package org.apache.openwhisk.core.database.cosmosdb.cache
 import java.net.UnknownHostException
-
 import akka.Done
 import akka.actor.CoordinatedShutdown
-import akka.kafka.testkit.scaladsl.{EmbeddedKafkaLike, ScalatestKafkaSpec}
+import akka.kafka.testkit.scaladsl.ScalatestKafkaSpec
 import com.typesafe.config.ConfigFactory
-import net.manub.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig}
+import io.github.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig}
 import org.apache.kafka.common.KafkaException
 import org.apache.kafka.common.serialization.StringDeserializer
 import org.apache.openwhisk.common.{AkkaLogging, TransactionId}
 import org.apache.openwhisk.core.database.{CacheInvalidationMessage, RemoteCacheInvalidation}
 import org.apache.openwhisk.core.database.cosmosdb.{CosmosDBArtifactStoreProvider, CosmosDBTestSupport}
-import org.apache.openwhisk.core.entity.{
-  DocumentReader,
-  EntityName,
-  EntityPath,
-  WhiskDocumentReader,
-  WhiskEntity,
-  WhiskEntityJsonFormat,
-  WhiskPackage
-}
+import org.apache.openwhisk.core.entity.{DocumentReader, EntityName, EntityPath, WhiskDocumentReader, WhiskEntity, WhiskEntityJsonFormat, WhiskPackage}
 import org.junit.runner.RunWith
 import org.scalatest.concurrent.ScalaFutures
 import org.scalatest.junit.JUnitRunner
@@ -48,7 +39,6 @@ import scala.util.Random
 @RunWith(classOf[JUnitRunner])
 class CacheInvalidatorTests
     extends ScalatestKafkaSpec(6061)
-    with EmbeddedKafkaLike
     with EmbeddedKafka
     with CosmosDBTestSupport
     with Matchers
@@ -58,7 +48,19 @@ class CacheInvalidatorTests
   private implicit val logging = new AkkaLogging(system.log)
   implicit override val patienceConfig: PatienceConfig = PatienceConfig(timeout = 300.seconds)
 
-  override def createKafkaConfig: EmbeddedKafkaConfig = EmbeddedKafkaConfig(kafkaPort, zooKeeperPort)
+  def createKafkaConfig: EmbeddedKafkaConfig = EmbeddedKafkaConfig(kafkaPort, zooKeeperPort)
+
+  override def bootstrapServers = s"localhost:$kafkaPort"
+
+  override def setUp(): Unit = {
+    EmbeddedKafka.start()(createKafkaConfig)
+    super.setUp()
+  }
+
+  override def cleanUp(): Unit = {
+    super.cleanUp()
+    EmbeddedKafka.stop()
+  }
 
   behavior of "CosmosDB CacheInvalidation"