You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ma...@apache.org on 2020/03/03 06:56:53 UTC

[openwhisk] branch master updated: Update Kafka clients to 2.4.0, make kafka related dependencies consistent. (#4844)

This is an automated email from the ASF dual-hosted git repository.

markusthoemmes pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 771a310  Update Kafka clients to 2.4.0, make kafka related dependencies consistent. (#4844)
771a310 is described below

commit 771a310935c9daa10982f2d3e9b01f832d3d31d6
Author: Markus Thömmes <ma...@me.com>
AuthorDate: Tue Mar 3 07:56:40 2020 +0100

    Update Kafka clients to 2.4.0, make kafka related dependencies consistent. (#4844)
---
 common/scala/build.gradle                                   |  2 +-
 core/monitoring/user-events/build.gradle                    |  1 +
 .../openwhisk/core/monitoring/metrics/EventConsumer.scala   | 13 ++++++-------
 settings.gradle                                             |  2 +-
 tests/build.gradle                                          |  2 +-
 5 files changed, 10 insertions(+), 10 deletions(-)

diff --git a/common/scala/build.gradle b/common/scala/build.gradle
index 9b6f098..4f72527 100644
--- a/common/scala/build.gradle
+++ b/common/scala/build.gradle
@@ -52,7 +52,7 @@ dependencies {
     compile "commons-codec:commons-codec:1.9"
     compile "commons-io:commons-io:2.6"
     compile "commons-collections:commons-collections:3.2.2"
-    compile "org.apache.kafka:kafka-clients:2.0.0"
+    compile "org.apache.kafka:kafka-clients:2.4.0"
     compile "org.apache.httpcomponents:httpclient:4.5.5"
     compile "com.fasterxml.uuid:java-uuid-generator:3.1.3"
     compile "com.github.ben-manes.caffeine:caffeine:2.6.2"
diff --git a/core/monitoring/user-events/build.gradle b/core/monitoring/user-events/build.gradle
index a657586..a458c8e 100644
--- a/core/monitoring/user-events/build.gradle
+++ b/core/monitoring/user-events/build.gradle
@@ -40,6 +40,7 @@ dependencies {
 
     testCompile "junit:junit:4.11"
     testCompile "org.scalatest:scalatest_${gradle.scala.depVersion}:3.0.8"
+    testCompile "io.github.embeddedkafka:embedded-kafka_${gradle.scala.depVersion}:2.4.0"
     testCompile "com.typesafe.akka:akka-stream-kafka-testkit_${gradle.scala.depVersion}:${gradle.akka_kafka.version}"
     testCompile "com.typesafe.akka:akka-testkit_${gradle.scala.depVersion}:${gradle.akka.version}"
     testCompile "com.typesafe.akka:akka-stream-testkit_${gradle.scala.depVersion}:${gradle.akka.version}"
diff --git a/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/EventConsumer.scala b/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/EventConsumer.scala
index 7b5709e..ab5023c 100644
--- a/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/EventConsumer.scala
+++ b/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/EventConsumer.scala
@@ -21,12 +21,11 @@ import java.lang.management.ManagementFactory
 
 import akka.Done
 import akka.actor.ActorSystem
-import akka.kafka.ConsumerMessage.CommittableOffsetBatch
-import akka.kafka.scaladsl.Consumer
+import akka.kafka.scaladsl.{Committer, Consumer}
 import akka.kafka.scaladsl.Consumer.DrainingControl
-import akka.kafka.{ConsumerSettings, Subscriptions}
+import akka.kafka.{CommitterSettings, ConsumerSettings, Subscriptions}
 import akka.stream.ActorMaterializer
-import akka.stream.scaladsl.{Keep, Sink}
+import akka.stream.scaladsl.Keep
 import javax.management.ObjectName
 import org.apache.kafka.clients.consumer.ConsumerConfig
 import kamon.Kamon
@@ -85,6 +84,8 @@ case class EventConsumer(settings: ConsumerSettings[String, String],
 
   override def metrics(): Future[Map[MetricName, common.Metric]] = control.metrics
 
+  private val committerSettings = CommitterSettings(system).withMaxBatch(20)
+
   //TODO Use RestartSource
   private val control: DrainingControl[Done] = Consumer
     .committableSource(updatedSettings, Subscriptions.topics(userEventTopic))
@@ -92,9 +93,7 @@ case class EventConsumer(settings: ConsumerSettings[String, String],
       processEvent(msg.record.value())
       msg.committableOffset
     }
-    .batch(max = 20, CommittableOffsetBatch(_))(_.updated(_))
-    .mapAsync(3)(_.commitScaladsl())
-    .toMat(Sink.ignore)(Keep.both)
+    .toMat(Committer.sink(committerSettings))(Keep.both)
     .mapMaterializedValue(DrainingControl.apply)
     .run()
 
diff --git a/settings.gradle b/settings.gradle
index 69ea863..501293f 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -46,7 +46,7 @@ gradle.ext.scalafmt = [
 ]
 
 gradle.ext.akka = [version : '2.5.26']
-gradle.ext.akka_kafka = [version : '1.1.0']
+gradle.ext.akka_kafka = [version : '2.0.2']
 gradle.ext.akka_http = [version : '10.1.11']
 gradle.ext.akka_management = [version : '1.0.5']
 
diff --git a/tests/build.gradle b/tests/build.gradle
index 396fb1f..fa9bff7 100644
--- a/tests/build.gradle
+++ b/tests/build.gradle
@@ -207,9 +207,9 @@ dependencies {
     compile "io.opentracing:opentracing-mock:0.31.0"
     compile "org.apache.curator:curator-test:${gradle.curator.version}"
     compile "com.atlassian.oai:swagger-request-validator-core:1.4.5"
+    compile "io.github.embeddedkafka:embedded-kafka_${gradle.scala.depVersion}:2.4.0"
     compile "com.typesafe.akka:akka-stream-kafka-testkit_${gradle.scala.depVersion}:${gradle.akka_kafka.version}"
     compile "com.typesafe.akka:akka-stream-testkit_${gradle.scala.depVersion}:${gradle.akka.version}"
-    compile "com.typesafe.akka:akka-stream-testkit_${gradle.scala.depVersion}:${gradle.akka.version}"
     compile "io.fabric8:kubernetes-server-mock:${gradle.kube_client.version}"
 
     compile "com.amazonaws:aws-java-sdk-s3:1.11.295"