You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by bd...@apache.org on 2023/05/03 03:41:15 UTC
[openwhisk] branch master updated: upgrade kafka client library to 2.8.2 (#5400)
This is an automated email from the ASF dual-hosted git repository.
bdoyle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new 6bc559d41 upgrade kafka client library to 2.8.2 (#5400)
6bc559d41 is described below
commit 6bc559d41f74525e26c9bd03d01dc4a3e9b5c658
Author: Brendan Doyle <bd...@gmail.com>
AuthorDate: Tue May 2 20:41:07 2023 -0700
upgrade kafka client library to 2.8.2 (#5400)
* upgrade kafka client library
* attempt build upgrading embedded kafka
* attempt to fix standalone server startup test
* bump to kafka client to latest patch
* revert kafka test timeout config change
---------
Co-authored-by: Brendan Doyle <br...@qualtrics.com>
---
common/scala/build.gradle | 2 +-
core/monitoring/user-events/build.gradle | 4 +--
.../core/monitoring/metrics/KafkaSpecBase.scala | 20 ++++++++++++---
core/standalone/build.gradle | 4 +--
.../openwhisk/standalone/KafkaLauncher.scala | 10 +++++---
settings.gradle | 2 +-
tests/build.gradle | 5 ++--
.../cosmosdb/cache/CacheInvalidatorTests.scala | 30 ++++++++++++----------
8 files changed, 47 insertions(+), 30 deletions(-)
diff --git a/common/scala/build.gradle b/common/scala/build.gradle
index fbbe74a60..3c165e635 100644
--- a/common/scala/build.gradle
+++ b/common/scala/build.gradle
@@ -61,7 +61,7 @@ dependencies {
api "commons-codec:commons-codec:1.9"
api "commons-io:commons-io:2.11.0"
api "commons-collections:commons-collections:3.2.2"
- api "org.apache.kafka:kafka-clients:2.4.0"
+ api "org.apache.kafka:kafka-clients:2.8.2"
api "org.apache.httpcomponents:httpclient:4.5.5"
api "com.fasterxml.uuid:java-uuid-generator:3.1.3"
api "com.github.ben-manes.caffeine:caffeine:2.6.2"
diff --git a/core/monitoring/user-events/build.gradle b/core/monitoring/user-events/build.gradle
index c6f4a9fc4..42269d1f6 100644
--- a/core/monitoring/user-events/build.gradle
+++ b/core/monitoring/user-events/build.gradle
@@ -45,9 +45,9 @@ dependencies {
testImplementation "junit:junit:4.11"
testImplementation "org.scalatest:scalatest_${gradle.scala.depVersion}:3.0.8"
- testImplementation "io.github.embeddedkafka:embedded-kafka_${gradle.scala.depVersion}:2.4.0"
+ testImplementation "io.github.embeddedkafka:embedded-kafka_${gradle.scala.depVersion}:2.8.1"
constraints {
- testImplementation("io.github.embeddedkafka:embedded-kafka_${gradle.scala.depVersion}:2.4.0")
+ testImplementation("io.github.embeddedkafka:embedded-kafka_${gradle.scala.depVersion}:2.8.1")
testImplementation('org.apache.avro:avro:1.11.1') {
because 'embeddedkafka dependency cannot be upgraded currently and avro in embedded kafka 2.4.0 has vulns'
}
diff --git a/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/KafkaSpecBase.scala b/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/KafkaSpecBase.scala
index 5ba749191..e9998b19f 100644
--- a/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/KafkaSpecBase.scala
+++ b/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/KafkaSpecBase.scala
@@ -17,8 +17,8 @@
package org.apache.openwhisk.core.monitoring.metrics
-import akka.kafka.testkit.scaladsl.{EmbeddedKafkaLike, ScalatestKafkaSpec}
-import net.manub.embeddedkafka.EmbeddedKafka
+import akka.kafka.testkit.scaladsl.ScalatestKafkaSpec
+import io.github.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig}
import org.scalatest._
import org.scalatest.concurrent.{Eventually, IntegrationPatience, ScalaFutures}
@@ -30,11 +30,25 @@ abstract class KafkaSpecBase
with ScalaFutures
with FlatSpecLike
with EmbeddedKafka
- with EmbeddedKafkaLike
with IntegrationPatience
with Eventually
with EventsTestHelper { this: Suite =>
implicit val timeoutConfig: PatienceConfig = PatienceConfig(1.minute)
override val sleepAfterProduce: FiniteDuration = 10.seconds
override protected val topicCreationTimeout = 60.seconds
+ override protected val producerPublishTimeout: FiniteDuration = 60.seconds
+
+ lazy implicit val embeddedKafkaConfig: EmbeddedKafkaConfig = EmbeddedKafkaConfig(kafkaPort, zooKeeperPort)
+
+ override def bootstrapServers = s"localhost:${embeddedKafkaConfig.kafkaPort}"
+
+ override def setUp(): Unit = {
+ EmbeddedKafka.start()(embeddedKafkaConfig)
+ super.setUp()
+ }
+
+ override def cleanUp(): Unit = {
+ super.cleanUp()
+ EmbeddedKafka.stop()
+ }
}
diff --git a/core/standalone/build.gradle b/core/standalone/build.gradle
index 031e529d8..46c526b9b 100644
--- a/core/standalone/build.gradle
+++ b/core/standalone/build.gradle
@@ -169,9 +169,9 @@ dependencies {
implementation project(':tools:admin')
implementation "org.rogach:scallop_${gradle.scala.depVersion}:3.3.2"
- implementation "io.github.embeddedkafka:embedded-kafka_${gradle.scala.depVersion}:2.4.0"
+ implementation "io.github.embeddedkafka:embedded-kafka_${gradle.scala.depVersion}:2.8.1"
constraints {
- implementation("io.github.embeddedkafka:embedded-kafka_${gradle.scala.depVersion}:2.4.0")
+ implementation("io.github.embeddedkafka:embedded-kafka_${gradle.scala.depVersion}:2.8.1")
implementation('org.apache.avro:avro:1.11.1') {
because 'embeddedkafka dependency cannot be upgraded currently and avro in embedded kafka 2.4.0 has vulns'
}
diff --git a/core/standalone/src/main/scala/org/apache/openwhisk/standalone/KafkaLauncher.scala b/core/standalone/src/main/scala/org/apache/openwhisk/standalone/KafkaLauncher.scala
index 2c6790868..41e91e4a0 100644
--- a/core/standalone/src/main/scala/org/apache/openwhisk/standalone/KafkaLauncher.scala
+++ b/core/standalone/src/main/scala/org/apache/openwhisk/standalone/KafkaLauncher.scala
@@ -18,10 +18,9 @@
package org.apache.openwhisk.standalone
import java.io.File
-
import akka.actor.ActorSystem
import kafka.server.KafkaConfig
-import net.manub.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig}
+import io.github.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig}
import org.apache.commons.io.FileUtils
import org.apache.openwhisk.common.{Logging, TransactionId}
import org.apache.openwhisk.core.WhiskConfig
@@ -30,6 +29,7 @@ import org.apache.openwhisk.core.entity.ControllerInstanceId
import org.apache.openwhisk.core.loadBalancer.{LeanBalancer, LoadBalancer, LoadBalancerProvider}
import org.apache.openwhisk.standalone.StandaloneDockerSupport.{checkOrAllocatePort, containerName, createRunCmd}
+import java.nio.file.FileSystems
import scala.concurrent.{ExecutionContext, Future}
import scala.reflect.io.Directory
import scala.util.Try
@@ -66,8 +66,10 @@ class KafkaLauncher(
EmbeddedKafkaConfig(kafkaPort = kafkaPort, zooKeeperPort = zkPort, customBrokerProperties = brokerProps)
val t = Try {
- EmbeddedKafka.startZooKeeper(createDir("zookeeper"))
- EmbeddedKafka.startKafka(createDir("kafka"))
+ createDir("zookeeper")
+ createDir("kafka")
+ EmbeddedKafka.startZooKeeper(FileSystems.getDefault.getPath(workDir.getPath,"zookeeper"))
+ EmbeddedKafka.startKafka(FileSystems.getDefault.getPath(workDir.getPath,"kafka"))
}
Future
diff --git a/settings.gradle b/settings.gradle
index 0ba8ae6be..9c9cca5e5 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -78,7 +78,7 @@ gradle.ext.scalafmt = [
]
gradle.ext.akka = [version : '2.6.12']
-gradle.ext.akka_kafka = [version : '2.0.2']
+gradle.ext.akka_kafka = [version : '2.0.5']
gradle.ext.akka_http = [version : '10.2.4']
gradle.ext.akka_management = [version : '1.0.5']
diff --git a/tests/build.gradle b/tests/build.gradle
index fe0ef9eeb..61e07b258 100644
--- a/tests/build.gradle
+++ b/tests/build.gradle
@@ -239,9 +239,9 @@ dependencies {
because 'swagger-request-validator-core cannot be upgraded to 2.x where vuln is remediated'
}
}
- implementation "io.github.embeddedkafka:embedded-kafka_${gradle.scala.depVersion}:2.4.0"
+ implementation "io.github.embeddedkafka:embedded-kafka_${gradle.scala.depVersion}:2.8.1"
constraints {
- implementation("io.github.embeddedkafka:embedded-kafka_${gradle.scala.depVersion}:2.4.0")
+ implementation("io.github.embeddedkafka:embedded-kafka_${gradle.scala.depVersion}:2.8.1")
implementation('org.apache.avro:avro:1.11.1') {
because 'embeddedkafka dependency cannot be upgraded currently and avro in embedded kafka 2.4.0 has vulns'
}
@@ -255,7 +255,6 @@ dependencies {
implementation "com.microsoft.azure:azure-cosmos:3.7.6"
implementation 'org.testcontainers:elasticsearch:1.17.6'
implementation 'org.testcontainers:mongodb:1.17.1'
-
implementation project(':common:scala')
implementation project(':core:controller')
implementation project(':core:scheduler')
diff --git a/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/cache/CacheInvalidatorTests.scala b/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/cache/CacheInvalidatorTests.scala
index e4d252801..a40ed64df 100644
--- a/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/cache/CacheInvalidatorTests.scala
+++ b/tests/src/test/scala/org/apache/openwhisk/core/database/cosmosdb/cache/CacheInvalidatorTests.scala
@@ -17,26 +17,17 @@
package org.apache.openwhisk.core.database.cosmosdb.cache
import java.net.UnknownHostException
-
import akka.Done
import akka.actor.CoordinatedShutdown
-import akka.kafka.testkit.scaladsl.{EmbeddedKafkaLike, ScalatestKafkaSpec}
+import akka.kafka.testkit.scaladsl.ScalatestKafkaSpec
import com.typesafe.config.ConfigFactory
-import net.manub.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig}
+import io.github.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig}
import org.apache.kafka.common.KafkaException
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.openwhisk.common.{AkkaLogging, TransactionId}
import org.apache.openwhisk.core.database.{CacheInvalidationMessage, RemoteCacheInvalidation}
import org.apache.openwhisk.core.database.cosmosdb.{CosmosDBArtifactStoreProvider, CosmosDBTestSupport}
-import org.apache.openwhisk.core.entity.{
- DocumentReader,
- EntityName,
- EntityPath,
- WhiskDocumentReader,
- WhiskEntity,
- WhiskEntityJsonFormat,
- WhiskPackage
-}
+import org.apache.openwhisk.core.entity.{DocumentReader, EntityName, EntityPath, WhiskDocumentReader, WhiskEntity, WhiskEntityJsonFormat, WhiskPackage}
import org.junit.runner.RunWith
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.junit.JUnitRunner
@@ -48,7 +39,6 @@ import scala.util.Random
@RunWith(classOf[JUnitRunner])
class CacheInvalidatorTests
extends ScalatestKafkaSpec(6061)
- with EmbeddedKafkaLike
with EmbeddedKafka
with CosmosDBTestSupport
with Matchers
@@ -58,7 +48,19 @@ class CacheInvalidatorTests
private implicit val logging = new AkkaLogging(system.log)
implicit override val patienceConfig: PatienceConfig = PatienceConfig(timeout = 300.seconds)
- override def createKafkaConfig: EmbeddedKafkaConfig = EmbeddedKafkaConfig(kafkaPort, zooKeeperPort)
+ def createKafkaConfig: EmbeddedKafkaConfig = EmbeddedKafkaConfig(kafkaPort, zooKeeperPort)
+
+ override def bootstrapServers = s"localhost:$kafkaPort"
+
+ override def setUp(): Unit = {
+ EmbeddedKafka.start()(createKafkaConfig)
+ super.setUp()
+ }
+
+ override def cleanUp(): Unit = {
+ super.cleanUp()
+ EmbeddedKafka.stop()
+ }
behavior of "CosmosDB CacheInvalidation"