You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ch...@apache.org on 2019/10/21 12:40:47 UTC

[openwhisk] branch master updated: Updates Alpakka Kafka to 1.1.0 and minor bug fixes in user-events (#4693)

This is an automated email from the ASF dual-hosted git repository.

chetanm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 00cf75a  Updates Alpakka Kafka to 1.1.0 and minor bug fixes in user-events (#4693)
00cf75a is described below

commit 00cf75a2bf9389549432bed23af4ed7a130f75c8
Author: Chetan Mehrotra <ch...@apache.org>
AuthorDate: Mon Oct 21 18:10:29 2019 +0530

    Updates Alpakka Kafka to 1.1.0 and minor bug fixes in user-events (#4693)
    
    Updates Alpakka Kafka to 1.1.0 and also does minor bug fixes in user-events module.
    
    * Update Alpakka Kafka to 1.1.0
    * Move akka-stream-kafka-testkit to test scope
    * Turn off logging from embedded Kafka server
    * Reduce test execution time by avoiding recreating consumer for each test
    * Set stop timeout to 0 as we use DrainingControl to properly stop the consumer upon shutdown
    * Make memoryGauge actually a guage
    * Explicitly ignore the ConcurrentInvocations case for now
    * Update Grafana chart to compute based on value now being Gauge instead of histogram as suggested by Cosmin
---
 core/monitoring/user-events/build.gradle           |  2 +-
 .../grafana/dashboards/openwhisk_events.json       |  4 +--
 .../core/monitoring/metrics/EventConsumer.scala    |  2 +-
 .../core/monitoring/metrics/KamonRecorder.scala    |  7 ++---
 .../monitoring/metrics/PrometheusRecorder.scala    | 16 +++++------
 .../src/test/resources/logback-test.xml}           |  9 +++++--
 .../core/monitoring/metrics/ApiTests.scala         | 31 ++++++++++++++++------
 .../core/monitoring/metrics/KafkaSpecBase.scala    |  1 +
 .../metrics/PrometheusRecorderTests.scala          |  4 +--
 .../src/main/resources/logback-standalone.xml      |  2 +-
 settings.gradle                                    |  2 +-
 tests/src/test/resources/logback-test.xml          |  2 +-
 12 files changed, 50 insertions(+), 32 deletions(-)

diff --git a/core/monitoring/user-events/build.gradle b/core/monitoring/user-events/build.gradle
index c727b13..14a5363 100644
--- a/core/monitoring/user-events/build.gradle
+++ b/core/monitoring/user-events/build.gradle
@@ -41,7 +41,7 @@ dependencies {
 
     testCompile 'junit:junit:4.11'
     testCompile 'org.scalatest:scalatest_2.12:3.0.1'
-    compile "com.typesafe.akka:akka-stream-kafka-testkit_2.12:${gradle.akka_kafka.version}"
+    testCompile "com.typesafe.akka:akka-stream-kafka-testkit_2.12:${gradle.akka_kafka.version}"
     testCompile "com.typesafe.akka:akka-testkit_2.12:${gradle.akka.version}"
     testCompile "com.typesafe.akka:akka-stream-testkit_2.12:${gradle.akka.version}"
     testCompile "com.typesafe.akka:akka-http-testkit_2.12:${gradle.akka_http.version}"
diff --git a/core/monitoring/user-events/compose/grafana/dashboards/openwhisk_events.json b/core/monitoring/user-events/compose/grafana/dashboards/openwhisk_events.json
index 441db67..d4a1d45 100644
--- a/core/monitoring/user-events/compose/grafana/dashboards/openwhisk_events.json
+++ b/core/monitoring/user-events/compose/grafana/dashboards/openwhisk_events.json
@@ -684,7 +684,7 @@
           "type": "string"
         },
         {
-          "alias": "Max memory",
+          "alias": "Memory",
           "colorMode": null,
           "colors": [
             "rgba(245, 54, 54, 0.9)",
@@ -716,7 +716,7 @@
       ],
       "targets": [
         {
-          "expr": "sum(increase(openwhisk_action_memory_sum{region=~\"$region\",stack=~\"$stack\",namespace=~\"$namespace\",action=~\"$action\",initiator=~\"$initiator\"}[$__range])) by (action) / sum(increase(openwhisk_action_memory_count{region=~\"$region\",stack=~\"$stack\",namespace=~\"$namespace\",action=~\"$action\"}[$__range])) by (action) > 0",
+          "expr": "openwhisk_action_memory{region=~\"$region\",stack=~\"$stack\",namespace=~\"$namespace\",action=~\"$action\",initiator=~\"$initiator\"}",
           "format": "time_series",
           "instant": false,
           "intervalFactor": 1,
diff --git a/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/EventConsumer.scala b/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/EventConsumer.scala
index e3be921..65f245e 100644
--- a/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/EventConsumer.scala
+++ b/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/EventConsumer.scala
@@ -141,7 +141,7 @@ case class EventConsumer(settings: ConsumerSettings[String, String], recorders:
     settings
       .withProperty(ConsumerConfig.CLIENT_ID_CONFIG, id)
       .withProperty(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, KamonMetricsReporter.name)
-
+      .withStopTimeout(Duration.Zero) // https://doc.akka.io/docs/alpakka-kafka/current/consumer.html#draining-control
 }
 
 object EventConsumer {
diff --git a/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/KamonRecorder.scala b/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/KamonRecorder.scala
index d0c0c67..5f480a7 100644
--- a/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/KamonRecorder.scala
+++ b/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/KamonRecorder.scala
@@ -66,9 +66,10 @@ object KamonRecorder extends MetricRecorder with KamonMetricNames with SLF4JLogg
 
     def record(m: Metric): Unit = {
       m.metricName match {
-        case "ConcurrentRateLimit" => concurrentLimit.increment()
-        case "TimedRateLimit"      => timedLimit.increment()
-        case x                     => log.warn(s"Unknown limit $x")
+        case "ConcurrentRateLimit"   => concurrentLimit.increment()
+        case "TimedRateLimit"        => timedLimit.increment()
+        case "ConcurrentInvocations" => //TODO Handle ConcurrentInvocations
+        case x                       => log.warn(s"Unknown limit $x")
       }
     }
   }
diff --git a/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/PrometheusRecorder.scala b/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/PrometheusRecorder.scala
index 42be2e4..79e232c 100644
--- a/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/PrometheusRecorder.scala
+++ b/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/PrometheusRecorder.scala
@@ -86,9 +86,10 @@ case class PrometheusRecorder(kamon: PrometheusReporter)
 
     def record(m: Metric): Unit = {
       m.metricName match {
-        case "ConcurrentRateLimit" => concurrentLimit.inc()
-        case "TimedRateLimit"      => timedLimit.inc()
-        case x                     => log.warn(s"Unknown limit $x")
+        case "ConcurrentRateLimit"   => concurrentLimit.inc()
+        case "TimedRateLimit"        => timedLimit.inc()
+        case "ConcurrentInvocations" => //TODO Handle ConcurrentInvocations
+        case x                       => log.warn(s"Unknown limit $x")
       }
     }
   }
@@ -116,7 +117,7 @@ case class PrometheusRecorder(kamon: PrometheusReporter)
       statusCounter.labels(namespace, initiatorNamespace, action, ActivationResponse.statusWhiskError)
 
     def record(a: Activation): Unit = {
-      gauge.observe(a.memory)
+      gauge.set(a.memory)
 
       activations.inc()
 
@@ -208,12 +209,7 @@ object PrometheusRecorder extends PrometheusMetricNames {
       initiatorNamespace,
       actionName)
   private val memoryGauge =
-    histogram(
-      memoryMetric,
-      "Memory consumption of the action containers",
-      actionNamespace,
-      initiatorNamespace,
-      actionName)
+    gauge(memoryMetric, "Memory consumption of the action containers", actionNamespace, initiatorNamespace, actionName)
 
   private val concurrentLimitCounter =
     counter(concurrentLimitMetric, "a user has exceeded its limit for concurrent invocations", actionNamespace)
diff --git a/core/standalone/src/main/resources/logback-standalone.xml b/core/monitoring/user-events/src/test/resources/logback-test.xml
similarity index 82%
copy from core/standalone/src/main/resources/logback-standalone.xml
copy to core/monitoring/user-events/src/test/resources/logback-test.xml
index a7d5ed0..b485a6e 100644
--- a/core/standalone/src/main/resources/logback-standalone.xml
+++ b/core/monitoring/user-events/src/test/resources/logback-test.xml
@@ -20,7 +20,7 @@
     <jmxConfigurator></jmxConfigurator>
     <appender name="console" class="ch.qos.logback.core.ConsoleAppender">
         <encoder>
-            <pattern>[%d{yyyy-MM-dd'T'HH:mm:ss.SSS'Z'}] %highlight([%p]) %msg%n</pattern>
+            <pattern>[%d{yyyy-MM-dd'T'HH:mm:ss.SSS'Z'}] [%p] [%logger] %msg%n</pattern>
         </encoder>
     </appender>
 
@@ -29,8 +29,13 @@
 
     <!-- Kafka -->
     <logger name="org.apache.kafka" level="ERROR" />
+    <logger name="kafka" level="ERROR" />
+
+    <!-- Zookeeper -->
     <logger name="org.apache.zookeeper" level="ERROR" />
-    <logger name="kafka.server" level="ERROR" />
+    <logger name="org.apache.curator" level="ERROR" />
+
+    <logger name="akka.event.slf4j.Slf4jLogger" level="WARN" />
 
     <root level="${logback.log.level:-INFO}">
         <appender-ref ref="console" />
diff --git a/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/ApiTests.scala b/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/ApiTests.scala
index a2cd5f7..21d5d14 100644
--- a/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/ApiTests.scala
+++ b/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/ApiTests.scala
@@ -25,34 +25,49 @@ import org.junit.runner.RunWith
 import org.scalatest.concurrent.ScalaFutures
 import org.scalatest.junit.JUnitRunner
 import org.scalatest.matchers.Matcher
-import org.scalatest.{FlatSpec, Matchers}
+import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
 
 import scala.concurrent.duration.DurationInt
 
 @RunWith(classOf[JUnitRunner])
-class ApiTests extends FlatSpec with Matchers with ScalatestRouteTest with EventsTestHelper with ScalaFutures {
+class ApiTests
+    extends FlatSpec
+    with Matchers
+    with ScalatestRouteTest
+    with EventsTestHelper
+    with ScalaFutures
+    with BeforeAndAfterAll {
   implicit val timeoutConfig = PatienceConfig(1.minute)
+
+  private var api: PrometheusEventsApi = _
+  private var consumer: EventConsumer = _
+
+  override protected def beforeAll(): Unit = {
+    super.beforeAll()
+    consumer = createConsumer(56754, system.settings.config)
+    api = new PrometheusEventsApi(consumer, createExporter())
+  }
+
+  protected override def afterAll(): Unit = {
+    consumer.shutdown().futureValue
+    super.afterAll()
+  }
+
   behavior of "EventsApi"
 
   it should "respond ping request" in {
-    val consumer = createConsumer(56754, system.settings.config)
-    val api = new PrometheusEventsApi(consumer, createExporter())
     Get("/ping") ~> api.routes ~> check {
       //Due to retries using a random port does not immediately result in failure
       handled shouldBe true
     }
-    consumer.shutdown().futureValue
   }
 
   it should "respond metrics request" in {
-    val consumer = createConsumer(56754, system.settings.config)
-    val api = new PrometheusEventsApi(consumer, createExporter())
     Get("/metrics") ~> `Accept-Encoding`(gzip) ~> api.routes ~> check {
       contentType.charsetOption shouldBe Some(HttpCharsets.`UTF-8`)
       contentType.mediaType.params("version") shouldBe "0.0.4"
       response should haveContentEncoding(gzip)
     }
-    consumer.shutdown().futureValue
   }
 
   private def haveContentEncoding(encoding: HttpEncoding): Matcher[HttpResponse] =
diff --git a/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/KafkaSpecBase.scala b/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/KafkaSpecBase.scala
index 0a7519c..6c7be2e 100644
--- a/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/KafkaSpecBase.scala
+++ b/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/KafkaSpecBase.scala
@@ -38,4 +38,5 @@ abstract class KafkaSpecBase
   implicit val timeoutConfig: PatienceConfig = PatienceConfig(1.minute)
   implicit val materializer: ActorMaterializer = ActorMaterializer()
   override val sleepAfterProduce: FiniteDuration = 10.seconds
+  override protected val topicCreationTimeout = 60.seconds
 }
diff --git a/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/PrometheusRecorderTests.scala b/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/PrometheusRecorderTests.scala
index e9dfe80..2241a8b 100644
--- a/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/PrometheusRecorderTests.scala
+++ b/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/PrometheusRecorderTests.scala
@@ -64,7 +64,7 @@ class PrometheusRecorderTests extends KafkaSpecBase with BeforeAndAfterEach with
     histogramCount(durationMetric, actionWithCustomPackage) shouldBe 1
     histogramSum(durationMetric, actionWithCustomPackage) shouldBe (1.254 +- 0.01)
 
-    gauge(memoryMetric, actionWithCustomPackage) shouldBe 1
+    gauge(memoryMetric, actionWithCustomPackage).intValue() shouldBe 256
 
     // Default package
     counterTotal(activationMetric, actionWithDefaultPackage) shouldBe 1
@@ -81,7 +81,7 @@ class PrometheusRecorderTests extends KafkaSpecBase with BeforeAndAfterEach with
 
   private def gauge(name: String, action: String) =
     CollectorRegistry.defaultRegistry.getSampleValue(
-      s"${name}_count",
+      name,
       Array("namespace", "initiator", "action"),
       Array(namespace, initiator, action))
 
diff --git a/core/standalone/src/main/resources/logback-standalone.xml b/core/standalone/src/main/resources/logback-standalone.xml
index a7d5ed0..feed1b1 100644
--- a/core/standalone/src/main/resources/logback-standalone.xml
+++ b/core/standalone/src/main/resources/logback-standalone.xml
@@ -30,7 +30,7 @@
     <!-- Kafka -->
     <logger name="org.apache.kafka" level="ERROR" />
     <logger name="org.apache.zookeeper" level="ERROR" />
-    <logger name="kafka.server" level="ERROR" />
+    <logger name="kafka" level="ERROR" />
 
     <root level="${logback.log.level:-INFO}">
         <appender-ref ref="console" />
diff --git a/settings.gradle b/settings.gradle
index 480daf8..a333e2c 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -45,7 +45,7 @@ gradle.ext.scalafmt = [
 ]
 
 gradle.ext.akka = [version : '2.5.22']
-gradle.ext.akka_kafka = [version : '1.0.5']
+gradle.ext.akka_kafka = [version : '1.1.0']
 gradle.ext.akka_http = [version : '10.1.8']
 
 gradle.ext.curator = [version : '4.0.0']
diff --git a/tests/src/test/resources/logback-test.xml b/tests/src/test/resources/logback-test.xml
index 98ca59d..08f24c4 100644
--- a/tests/src/test/resources/logback-test.xml
+++ b/tests/src/test/resources/logback-test.xml
@@ -12,7 +12,7 @@
 
     <!-- Kafka -->
     <logger name="org.apache.kafka" level="ERROR" />
-    <logger name="kafka.server" level="ERROR" />
+    <logger name="kafka" level="ERROR" />
 
     <!-- Zookeeper -->
     <logger name="org.apache.zookeeper" level="ERROR" />