You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ch...@apache.org on 2019/10/21 12:40:47 UTC
[openwhisk] branch master updated: Updates Alpakka Kafka to 1.1.0
and minor bug fixes in user-events (#4693)
This is an automated email from the ASF dual-hosted git repository.
chetanm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new 00cf75a Updates Alpakka Kafka to 1.1.0 and minor bug fixes in user-events (#4693)
00cf75a is described below
commit 00cf75a2bf9389549432bed23af4ed7a130f75c8
Author: Chetan Mehrotra <ch...@apache.org>
AuthorDate: Mon Oct 21 18:10:29 2019 +0530
Updates Alpakka Kafka to 1.1.0 and minor bug fixes in user-events (#4693)
Updates Alpakka Kafka to 1.1.0 and also does minor bug fixes in user-events module.
* Update Alpakka Kafka to 1.1.0
* Move akka-stream-kafka-testkit to test scope
* Turn off logging from embedded Kafka server
* Reduce test execution time by avoiding recreating consumer for each test
* Set stop timeout to 0 as we use DrainingControl to properly stop the consumer upon shutdown
* Make memoryGauge actually a guage
* Explicitly ignore the ConcurrentInvocations case for now
* Update Grafana chart to compute based on value now being Gauge instead of histogram as suggested by Cosmin
---
core/monitoring/user-events/build.gradle | 2 +-
.../grafana/dashboards/openwhisk_events.json | 4 +--
.../core/monitoring/metrics/EventConsumer.scala | 2 +-
.../core/monitoring/metrics/KamonRecorder.scala | 7 ++---
.../monitoring/metrics/PrometheusRecorder.scala | 16 +++++------
.../src/test/resources/logback-test.xml} | 9 +++++--
.../core/monitoring/metrics/ApiTests.scala | 31 ++++++++++++++++------
.../core/monitoring/metrics/KafkaSpecBase.scala | 1 +
.../metrics/PrometheusRecorderTests.scala | 4 +--
.../src/main/resources/logback-standalone.xml | 2 +-
settings.gradle | 2 +-
tests/src/test/resources/logback-test.xml | 2 +-
12 files changed, 50 insertions(+), 32 deletions(-)
diff --git a/core/monitoring/user-events/build.gradle b/core/monitoring/user-events/build.gradle
index c727b13..14a5363 100644
--- a/core/monitoring/user-events/build.gradle
+++ b/core/monitoring/user-events/build.gradle
@@ -41,7 +41,7 @@ dependencies {
testCompile 'junit:junit:4.11'
testCompile 'org.scalatest:scalatest_2.12:3.0.1'
- compile "com.typesafe.akka:akka-stream-kafka-testkit_2.12:${gradle.akka_kafka.version}"
+ testCompile "com.typesafe.akka:akka-stream-kafka-testkit_2.12:${gradle.akka_kafka.version}"
testCompile "com.typesafe.akka:akka-testkit_2.12:${gradle.akka.version}"
testCompile "com.typesafe.akka:akka-stream-testkit_2.12:${gradle.akka.version}"
testCompile "com.typesafe.akka:akka-http-testkit_2.12:${gradle.akka_http.version}"
diff --git a/core/monitoring/user-events/compose/grafana/dashboards/openwhisk_events.json b/core/monitoring/user-events/compose/grafana/dashboards/openwhisk_events.json
index 441db67..d4a1d45 100644
--- a/core/monitoring/user-events/compose/grafana/dashboards/openwhisk_events.json
+++ b/core/monitoring/user-events/compose/grafana/dashboards/openwhisk_events.json
@@ -684,7 +684,7 @@
"type": "string"
},
{
- "alias": "Max memory",
+ "alias": "Memory",
"colorMode": null,
"colors": [
"rgba(245, 54, 54, 0.9)",
@@ -716,7 +716,7 @@
],
"targets": [
{
- "expr": "sum(increase(openwhisk_action_memory_sum{region=~\"$region\",stack=~\"$stack\",namespace=~\"$namespace\",action=~\"$action\",initiator=~\"$initiator\"}[$__range])) by (action) / sum(increase(openwhisk_action_memory_count{region=~\"$region\",stack=~\"$stack\",namespace=~\"$namespace\",action=~\"$action\"}[$__range])) by (action) > 0",
+ "expr": "openwhisk_action_memory{region=~\"$region\",stack=~\"$stack\",namespace=~\"$namespace\",action=~\"$action\",initiator=~\"$initiator\"}",
"format": "time_series",
"instant": false,
"intervalFactor": 1,
diff --git a/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/EventConsumer.scala b/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/EventConsumer.scala
index e3be921..65f245e 100644
--- a/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/EventConsumer.scala
+++ b/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/EventConsumer.scala
@@ -141,7 +141,7 @@ case class EventConsumer(settings: ConsumerSettings[String, String], recorders:
settings
.withProperty(ConsumerConfig.CLIENT_ID_CONFIG, id)
.withProperty(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, KamonMetricsReporter.name)
-
+ .withStopTimeout(Duration.Zero) // https://doc.akka.io/docs/alpakka-kafka/current/consumer.html#draining-control
}
object EventConsumer {
diff --git a/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/KamonRecorder.scala b/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/KamonRecorder.scala
index d0c0c67..5f480a7 100644
--- a/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/KamonRecorder.scala
+++ b/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/KamonRecorder.scala
@@ -66,9 +66,10 @@ object KamonRecorder extends MetricRecorder with KamonMetricNames with SLF4JLogg
def record(m: Metric): Unit = {
m.metricName match {
- case "ConcurrentRateLimit" => concurrentLimit.increment()
- case "TimedRateLimit" => timedLimit.increment()
- case x => log.warn(s"Unknown limit $x")
+ case "ConcurrentRateLimit" => concurrentLimit.increment()
+ case "TimedRateLimit" => timedLimit.increment()
+ case "ConcurrentInvocations" => //TODO Handle ConcurrentInvocations
+ case x => log.warn(s"Unknown limit $x")
}
}
}
diff --git a/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/PrometheusRecorder.scala b/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/PrometheusRecorder.scala
index 42be2e4..79e232c 100644
--- a/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/PrometheusRecorder.scala
+++ b/core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/PrometheusRecorder.scala
@@ -86,9 +86,10 @@ case class PrometheusRecorder(kamon: PrometheusReporter)
def record(m: Metric): Unit = {
m.metricName match {
- case "ConcurrentRateLimit" => concurrentLimit.inc()
- case "TimedRateLimit" => timedLimit.inc()
- case x => log.warn(s"Unknown limit $x")
+ case "ConcurrentRateLimit" => concurrentLimit.inc()
+ case "TimedRateLimit" => timedLimit.inc()
+ case "ConcurrentInvocations" => //TODO Handle ConcurrentInvocations
+ case x => log.warn(s"Unknown limit $x")
}
}
}
@@ -116,7 +117,7 @@ case class PrometheusRecorder(kamon: PrometheusReporter)
statusCounter.labels(namespace, initiatorNamespace, action, ActivationResponse.statusWhiskError)
def record(a: Activation): Unit = {
- gauge.observe(a.memory)
+ gauge.set(a.memory)
activations.inc()
@@ -208,12 +209,7 @@ object PrometheusRecorder extends PrometheusMetricNames {
initiatorNamespace,
actionName)
private val memoryGauge =
- histogram(
- memoryMetric,
- "Memory consumption of the action containers",
- actionNamespace,
- initiatorNamespace,
- actionName)
+ gauge(memoryMetric, "Memory consumption of the action containers", actionNamespace, initiatorNamespace, actionName)
private val concurrentLimitCounter =
counter(concurrentLimitMetric, "a user has exceeded its limit for concurrent invocations", actionNamespace)
diff --git a/core/standalone/src/main/resources/logback-standalone.xml b/core/monitoring/user-events/src/test/resources/logback-test.xml
similarity index 82%
copy from core/standalone/src/main/resources/logback-standalone.xml
copy to core/monitoring/user-events/src/test/resources/logback-test.xml
index a7d5ed0..b485a6e 100644
--- a/core/standalone/src/main/resources/logback-standalone.xml
+++ b/core/monitoring/user-events/src/test/resources/logback-test.xml
@@ -20,7 +20,7 @@
<jmxConfigurator></jmxConfigurator>
<appender name="console" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
- <pattern>[%d{yyyy-MM-dd'T'HH:mm:ss.SSS'Z'}] %highlight([%p]) %msg%n</pattern>
+ <pattern>[%d{yyyy-MM-dd'T'HH:mm:ss.SSS'Z'}] [%p] [%logger] %msg%n</pattern>
</encoder>
</appender>
@@ -29,8 +29,13 @@
<!-- Kafka -->
<logger name="org.apache.kafka" level="ERROR" />
+ <logger name="kafka" level="ERROR" />
+
+ <!-- Zookeeper -->
<logger name="org.apache.zookeeper" level="ERROR" />
- <logger name="kafka.server" level="ERROR" />
+ <logger name="org.apache.curator" level="ERROR" />
+
+ <logger name="akka.event.slf4j.Slf4jLogger" level="WARN" />
<root level="${logback.log.level:-INFO}">
<appender-ref ref="console" />
diff --git a/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/ApiTests.scala b/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/ApiTests.scala
index a2cd5f7..21d5d14 100644
--- a/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/ApiTests.scala
+++ b/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/ApiTests.scala
@@ -25,34 +25,49 @@ import org.junit.runner.RunWith
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.junit.JUnitRunner
import org.scalatest.matchers.Matcher
-import org.scalatest.{FlatSpec, Matchers}
+import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
import scala.concurrent.duration.DurationInt
@RunWith(classOf[JUnitRunner])
-class ApiTests extends FlatSpec with Matchers with ScalatestRouteTest with EventsTestHelper with ScalaFutures {
+class ApiTests
+ extends FlatSpec
+ with Matchers
+ with ScalatestRouteTest
+ with EventsTestHelper
+ with ScalaFutures
+ with BeforeAndAfterAll {
implicit val timeoutConfig = PatienceConfig(1.minute)
+
+ private var api: PrometheusEventsApi = _
+ private var consumer: EventConsumer = _
+
+ override protected def beforeAll(): Unit = {
+ super.beforeAll()
+ consumer = createConsumer(56754, system.settings.config)
+ api = new PrometheusEventsApi(consumer, createExporter())
+ }
+
+ protected override def afterAll(): Unit = {
+ consumer.shutdown().futureValue
+ super.afterAll()
+ }
+
behavior of "EventsApi"
it should "respond ping request" in {
- val consumer = createConsumer(56754, system.settings.config)
- val api = new PrometheusEventsApi(consumer, createExporter())
Get("/ping") ~> api.routes ~> check {
//Due to retries using a random port does not immediately result in failure
handled shouldBe true
}
- consumer.shutdown().futureValue
}
it should "respond metrics request" in {
- val consumer = createConsumer(56754, system.settings.config)
- val api = new PrometheusEventsApi(consumer, createExporter())
Get("/metrics") ~> `Accept-Encoding`(gzip) ~> api.routes ~> check {
contentType.charsetOption shouldBe Some(HttpCharsets.`UTF-8`)
contentType.mediaType.params("version") shouldBe "0.0.4"
response should haveContentEncoding(gzip)
}
- consumer.shutdown().futureValue
}
private def haveContentEncoding(encoding: HttpEncoding): Matcher[HttpResponse] =
diff --git a/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/KafkaSpecBase.scala b/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/KafkaSpecBase.scala
index 0a7519c..6c7be2e 100644
--- a/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/KafkaSpecBase.scala
+++ b/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/KafkaSpecBase.scala
@@ -38,4 +38,5 @@ abstract class KafkaSpecBase
implicit val timeoutConfig: PatienceConfig = PatienceConfig(1.minute)
implicit val materializer: ActorMaterializer = ActorMaterializer()
override val sleepAfterProduce: FiniteDuration = 10.seconds
+ override protected val topicCreationTimeout = 60.seconds
}
diff --git a/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/PrometheusRecorderTests.scala b/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/PrometheusRecorderTests.scala
index e9dfe80..2241a8b 100644
--- a/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/PrometheusRecorderTests.scala
+++ b/core/monitoring/user-events/src/test/scala/org/apache/openwhisk/core/monitoring/metrics/PrometheusRecorderTests.scala
@@ -64,7 +64,7 @@ class PrometheusRecorderTests extends KafkaSpecBase with BeforeAndAfterEach with
histogramCount(durationMetric, actionWithCustomPackage) shouldBe 1
histogramSum(durationMetric, actionWithCustomPackage) shouldBe (1.254 +- 0.01)
- gauge(memoryMetric, actionWithCustomPackage) shouldBe 1
+ gauge(memoryMetric, actionWithCustomPackage).intValue() shouldBe 256
// Default package
counterTotal(activationMetric, actionWithDefaultPackage) shouldBe 1
@@ -81,7 +81,7 @@ class PrometheusRecorderTests extends KafkaSpecBase with BeforeAndAfterEach with
private def gauge(name: String, action: String) =
CollectorRegistry.defaultRegistry.getSampleValue(
- s"${name}_count",
+ name,
Array("namespace", "initiator", "action"),
Array(namespace, initiator, action))
diff --git a/core/standalone/src/main/resources/logback-standalone.xml b/core/standalone/src/main/resources/logback-standalone.xml
index a7d5ed0..feed1b1 100644
--- a/core/standalone/src/main/resources/logback-standalone.xml
+++ b/core/standalone/src/main/resources/logback-standalone.xml
@@ -30,7 +30,7 @@
<!-- Kafka -->
<logger name="org.apache.kafka" level="ERROR" />
<logger name="org.apache.zookeeper" level="ERROR" />
- <logger name="kafka.server" level="ERROR" />
+ <logger name="kafka" level="ERROR" />
<root level="${logback.log.level:-INFO}">
<appender-ref ref="console" />
diff --git a/settings.gradle b/settings.gradle
index 480daf8..a333e2c 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -45,7 +45,7 @@ gradle.ext.scalafmt = [
]
gradle.ext.akka = [version : '2.5.22']
-gradle.ext.akka_kafka = [version : '1.0.5']
+gradle.ext.akka_kafka = [version : '1.1.0']
gradle.ext.akka_http = [version : '10.1.8']
gradle.ext.curator = [version : '4.0.0']
diff --git a/tests/src/test/resources/logback-test.xml b/tests/src/test/resources/logback-test.xml
index 98ca59d..08f24c4 100644
--- a/tests/src/test/resources/logback-test.xml
+++ b/tests/src/test/resources/logback-test.xml
@@ -12,7 +12,7 @@
<!-- Kafka -->
<logger name="org.apache.kafka" level="ERROR" />
- <logger name="kafka.server" level="ERROR" />
+ <logger name="kafka" level="ERROR" />
<!-- Zookeeper -->
<logger name="org.apache.zookeeper" level="ERROR" />