You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@openwhisk.apache.org by GitBox <gi...@apache.org> on 2020/04/21 07:23:28 UTC

[GitHub] [openwhisk] upgle opened a new pull request #4887: Add RestartSource to restart failing Kafka consumer for user-events service

upgle opened a new pull request #4887:
URL: https://github.com/apache/openwhisk/pull/4887


   <!--- Provide a concise summary of your changes in the Title -->
   
   ## Description
   <!--- Provide a detailed description of your changes. -->
   <!--- Include details of what problem you are solving and how your changes are tested. -->
   If there is a problem with the Kafka consumer, it no longer sends metrics. Because there is no error handling for user-events's Kafka consumer.
   
   https://doc.akka.io/docs/alpakka-kafka/current/errorhandling.html
   
   So I've added RestartSource to restart a failing consumer.
   
   
   ## Related issue and scope
   - no related issue
   
   ## My changes affect the following components
   <!--- Select below all system components are affected by your change. -->
   <!--- Enter an `x` in all applicable boxes. -->
   -  User events
   
   ## Types of changes
   <!--- What types of changes does your code introduce? Use `x` in all the boxes that apply: -->
   - [ ] Bug fix (generally a non-breaking change which closes an issue).
   - [x] Enhancement or new feature (adds new functionality).
   - [ ] Breaking change (a bug fix or enhancement which changes existing behavior).
   
   ## Checklist:
   <!--- Please review the points below which help you make sure you've covered all aspects of the change you're making. -->
   
   - [x] I signed an [Apache CLA](https://github.com/apache/openwhisk/blob/master/CONTRIBUTING.md).
   - [x] I reviewed the [style guides](https://github.com/apache/openwhisk/wiki/Contributing:-Git-guidelines#code-readiness) and followed the recommendations (Travis CI will check :).
   - [ ] I added tests to cover my changes.
   - [x] My changes require further changes to the documentation.
   - [ ] I updated the documentation where necessary.
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [openwhisk] upgle commented on a change in pull request #4887: Add RestartSource to restart failing Kafka consumer for user-events service

Posted by GitBox <gi...@apache.org>.
upgle commented on a change in pull request #4887:
URL: https://github.com/apache/openwhisk/pull/4887#discussion_r412184898



##########
File path: core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/OpenWhiskEvents.scala
##########
@@ -29,11 +29,14 @@ import org.apache.kafka.common.serialization.StringDeserializer
 import pureconfig._
 import pureconfig.generic.auto._
 
+import scala.concurrent.duration.FiniteDuration
 import scala.concurrent.{ExecutionContext, Future}
 
 object OpenWhiskEvents extends SLF4JLogging {
 
-  case class MetricConfig(port: Int, enableKamon: Boolean, ignoredNamespaces: Set[String])
+  case class MetricConfig(port: Int, enableKamon: Boolean, ignoredNamespaces: Set[String], retry: RetryConfig)
+
+  case class RetryConfig(minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double, maxRestarts: Int)

Review comment:
       @chetanmeh Thank you for your review. I've used the same RetryConfig code here:
     
   https://github.com/apache/openwhisk/pull/4632/files#diff-3753d510c1485477c63054dde55b759dR39




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [openwhisk] upgle commented on a change in pull request #4887: Add RestartSource to restart failing Kafka consumer for user-events service

Posted by GitBox <gi...@apache.org>.
upgle commented on a change in pull request #4887:
URL: https://github.com/apache/openwhisk/pull/4887#discussion_r412281730



##########
File path: core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/OpenWhiskEvents.scala
##########
@@ -29,11 +29,14 @@ import org.apache.kafka.common.serialization.StringDeserializer
 import pureconfig._
 import pureconfig.generic.auto._
 
+import scala.concurrent.duration.FiniteDuration
 import scala.concurrent.{ExecutionContext, Future}
 
 object OpenWhiskEvents extends SLF4JLogging {
 
-  case class MetricConfig(port: Int, enableKamon: Boolean, ignoredNamespaces: Set[String])
+  case class MetricConfig(port: Int, enableKamon: Boolean, ignoredNamespaces: Set[String], retry: RetryConfig)
+
+  case class RetryConfig(minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double, maxRestarts: Int)

Review comment:
       @chetanmeh I'm sorry that I forgot to tell you that I referenced your code 😅 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [openwhisk] upgle commented on issue #4887: Add RestartSource to restart failing Kafka consumer for user-events service

Posted by GitBox <gi...@apache.org>.
upgle commented on issue #4887:
URL: https://github.com/apache/openwhisk/pull/4887#issuecomment-617251375


   > Note that in k8s setups if the user-event Kafka stream fails then it should result in health check getting failed also and that should cause the pod to restart. So logically existing flow should handle Kafka failures fine. Having RestartSource though improves this
   
   I found the `/ping` API checks the running state of the consumer. I'll set up the livenessProbe as well. Thanks!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [openwhisk] upgle commented on a change in pull request #4887: Add RestartSource to restart failing Kafka consumer for user-events service

Posted by GitBox <gi...@apache.org>.
upgle commented on a change in pull request #4887:
URL: https://github.com/apache/openwhisk/pull/4887#discussion_r412184898



##########
File path: core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/OpenWhiskEvents.scala
##########
@@ -29,11 +29,14 @@ import org.apache.kafka.common.serialization.StringDeserializer
 import pureconfig._
 import pureconfig.generic.auto._
 
+import scala.concurrent.duration.FiniteDuration
 import scala.concurrent.{ExecutionContext, Future}
 
 object OpenWhiskEvents extends SLF4JLogging {
 
-  case class MetricConfig(port: Int, enableKamon: Boolean, ignoredNamespaces: Set[String])
+  case class MetricConfig(port: Int, enableKamon: Boolean, ignoredNamespaces: Set[String], retry: RetryConfig)
+
+  case class RetryConfig(minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double, maxRestarts: Int)

Review comment:
       @chetanmeh Thank you for your review. I've used the same code here:
     
   https://github.com/apache/openwhisk/pull/4632/files#diff-3753d510c1485477c63054dde55b759dR39




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [openwhisk] chetanmeh commented on a change in pull request #4887: Add RestartSource to restart failing Kafka consumer for user-events service

Posted by GitBox <gi...@apache.org>.
chetanmeh commented on a change in pull request #4887:
URL: https://github.com/apache/openwhisk/pull/4887#discussion_r412176055



##########
File path: core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/EventConsumer.scala
##########
@@ -77,25 +77,34 @@ case class EventConsumer(settings: ConsumerSettings[String, String],
 
   def shutdown(): Future[Done] = {
     lagRecorder.cancel()
-    control.drainAndShutdown()(system.dispatcher)
+    control.get().drainAndShutdown(result)(system.dispatcher)
   }
 
-  def isRunning: Boolean = !control.isShutdown.isCompleted
-
-  override def metrics(): Future[Map[MetricName, common.Metric]] = control.metrics
-
-  private val committerSettings = CommitterSettings(system).withMaxBatch(20)
-
-  //TODO Use RestartSource
-  private val control: DrainingControl[Done] = Consumer
-    .committableSource(updatedSettings, Subscriptions.topics(userEventTopic))
-    .map { msg =>
-      processEvent(msg.record.value())
-      msg.committableOffset
+  def isRunning: Boolean = !control.get().isShutdown.isCompleted
+
+  override def metrics(): Future[Map[MetricName, common.Metric]] = control.get().metrics
+
+  private val committerSettings = CommitterSettings(system)
+  private val control = new AtomicReference[Consumer.Control](Consumer.NoopControl)
+
+  private val result = RestartSource
+    .onFailuresWithBackoff(
+      minBackoff = metricConfig.retry.minBackoff,
+      maxBackoff = metricConfig.retry.maxBackoff,
+      randomFactor = metricConfig.retry.randomFactor,
+      maxRestarts = metricConfig.retry.maxRestarts) { () =>

Review comment:
       May be add a logger which we can later check how many restart happened




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [openwhisk] upgle commented on a change in pull request #4887: Add RestartSource to restart failing Kafka consumer for user-events service

Posted by GitBox <gi...@apache.org>.
upgle commented on a change in pull request #4887:
URL: https://github.com/apache/openwhisk/pull/4887#discussion_r412184898



##########
File path: core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/OpenWhiskEvents.scala
##########
@@ -29,11 +29,14 @@ import org.apache.kafka.common.serialization.StringDeserializer
 import pureconfig._
 import pureconfig.generic.auto._
 
+import scala.concurrent.duration.FiniteDuration
 import scala.concurrent.{ExecutionContext, Future}
 
 object OpenWhiskEvents extends SLF4JLogging {
 
-  case class MetricConfig(port: Int, enableKamon: Boolean, ignoredNamespaces: Set[String])
+  case class MetricConfig(port: Int, enableKamon: Boolean, ignoredNamespaces: Set[String], retry: RetryConfig)
+
+  case class RetryConfig(minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double, maxRestarts: Int)

Review comment:
       @chetanmeh Thank you for your review. I've used the same Configuration class here.   
   https://github.com/apache/openwhisk/pull/4632/files#diff-3753d510c1485477c63054dde55b759dR39




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [openwhisk] style95 commented on pull request #4887: Add RestartSource to restart failing Kafka consumer for user-events service

Posted by GitBox <gi...@apache.org>.
style95 commented on pull request #4887:
URL: https://github.com/apache/openwhisk/pull/4887#issuecomment-628600735


   @upgle Thanks for addressing this.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [openwhisk] upgle commented on a change in pull request #4887: Add RestartSource to restart failing Kafka consumer for user-events service

Posted by GitBox <gi...@apache.org>.
upgle commented on a change in pull request #4887:
URL: https://github.com/apache/openwhisk/pull/4887#discussion_r412184898



##########
File path: core/monitoring/user-events/src/main/scala/org/apache/openwhisk/core/monitoring/metrics/OpenWhiskEvents.scala
##########
@@ -29,11 +29,14 @@ import org.apache.kafka.common.serialization.StringDeserializer
 import pureconfig._
 import pureconfig.generic.auto._
 
+import scala.concurrent.duration.FiniteDuration
 import scala.concurrent.{ExecutionContext, Future}
 
 object OpenWhiskEvents extends SLF4JLogging {
 
-  case class MetricConfig(port: Int, enableKamon: Boolean, ignoredNamespaces: Set[String])
+  case class MetricConfig(port: Int, enableKamon: Boolean, ignoredNamespaces: Set[String], retry: RetryConfig)
+
+  case class RetryConfig(minBackoff: FiniteDuration, maxBackoff: FiniteDuration, randomFactor: Double, maxRestarts: Int)

Review comment:
       @chetanmeh Thank you for your review. I've used the same RetryConfig code here:
     
   https://github.com/apache/openwhisk/pull/4632/files#diff-3753d510c1485477c63054dde55b759dR47




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [openwhisk] style95 merged pull request #4887: Add RestartSource to restart failing Kafka consumer for user-events service

Posted by GitBox <gi...@apache.org>.
style95 merged pull request #4887:
URL: https://github.com/apache/openwhisk/pull/4887


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [openwhisk] style95 commented on pull request #4887: Add RestartSource to restart failing Kafka consumer for user-events service

Posted by GitBox <gi...@apache.org>.
style95 commented on pull request #4887:
URL: https://github.com/apache/openwhisk/pull/4887#issuecomment-625005961


   @upgle 
   Seems some configurations are missing for standalone openwhisk.
   Could you look into this?
   
   ```
   org.apache.openwhisk.core.monitoring.metrics.KamonRecorderTests > KamonConsumer should push user events to kamon FAILED
       pureconfig.error.ConfigReaderException: Cannot convert configuration to a org.apache.openwhisk.core.monitoring.metrics.OpenWhiskEvents$MetricConfig. Failures are:
         at 'user-events':
           - (file:/home/travis/build/apache/openwhisk/core/monitoring/user-events/build/resources/test/application.conf:18) Key not found: 'retry'.
           at pureconfig.package$.getResultOrThrow(package.scala:139)
           at pureconfig.package$.loadConfigOrThrow(package.scala:207)
           at org.apache.openwhisk.core.monitoring.metrics.EventsTestHelper.createConsumer(EventsTestHelper.scala:37)
           at org.apache.openwhisk.core.monitoring.metrics.EventsTestHelper.createConsumer$(EventsTestHelper.scala:31)
           at org.apache.openwhisk.core.monitoring.metrics.KafkaSpecBase.createConsumer(KafkaSpecBase.scala:28)
           at org.apache.openwhisk.core.monitoring.metrics.KamonRecorderTests.$anonfun$new$1(KamonRecorderTests.scala:72)
           at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
           at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
           at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
           at org.scalatest.Transformer.apply(Transformer.scala:22)
           at org.scalatest.Transformer.apply(Transformer.scala:20)
           at org.scalatest.FlatSpecLike$$anon$5.apply(FlatSpecLike.scala:1682)
           at org.scalatest.TestSuite.withFixture(TestSuite.scala:196)
           at org.scalatest.TestSuite.withFixture$(TestSuite.scala:195)
           at org.apache.openwhisk.core.monitoring.metrics.KafkaSpecBase.withFixture(KafkaSpecBase.scala:28)
           at org.scalatest.FlatSpecLike.invokeWithFixture$1(FlatSpecLike.scala:1680)
           at org.scalatest.FlatSpecLike.$anonfun$runTest$1(FlatSpecLike.scala:1692)
           at org.scalatest.SuperEngine.runTestImpl(Engine.scala:286)
           at org.scalatest.FlatSpecLike.runTest(FlatSpecLike.scala:1692)
           at org.scalatest.FlatSpecLike.runTest$(FlatSpecLike.scala:1674)
           at org.apache.openwhisk.core.monitoring.metrics.KamonRecorderTests.org$scalatest$BeforeAndAfterEach$$super$runTest(KamonRecorderTests.scala:36)
           at org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:221)
           at org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:214)
           at org.apache.openwhisk.core.monitoring.metrics.KamonRecorderTests.runTest(KamonRecorderTests.scala:36)
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org