You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@openwhisk.apache.org by GitBox <gi...@apache.org> on 2020/08/26 16:02:18 UTC

[GitHub] [openwhisk] tysonnorris opened a new pull request #4951: add defensive code for dealing with loadbalancer concurrency tracking

tysonnorris opened a new pull request #4951:
URL: https://github.com/apache/openwhisk/pull/4951


   Defensive code for concurrency tracking in sharding loadbalancer.
   
   ## Description
   We saw some cases where controller ack processing was incomplete for some activations, which turned out to be:
   * cluster state change would reset the concurrency tracking (for concurrent activations, -c > 1)
   * the concurrency tracking logic did not tolerate the resets, which happen several times for each cluster state change
   * the ack message processing would fail silently
   
   This PR does 2 things:
   * add defensive code to the concurrency tracking logic (to tolerate these state resets during cluster state changes)
   * add extra logging to message processing to increase visibility of failed message handling
   
   This was not caught in tests due to the multi-controller cluster state changes, plus concurrency, required to reproduce the symptom.
   
   ## Related issue and scope
   <!--- Please include a link to a related issue if there is one. -->
   - [ ] I opened an issue to propose and discuss this change (#????)
   
   ## My changes affect the following components
   <!--- Select below all system components are affected by your change. -->
   <!--- Enter an `x` in all applicable boxes. -->
   - [ ] API
   - [x] Controller
   - [ ] Message Bus (e.g., Kafka)
   - [x] Loadbalancer
   - [ ] Invoker
   - [ ] Intrinsic actions (e.g., sequences, conductors)
   - [ ] Data stores (e.g., CouchDB)
   - [ ] Tests
   - [ ] Deployment
   - [ ] CLI
   - [ ] General tooling
   - [ ] Documentation
   
   ## Types of changes
   <!--- What types of changes does your code introduce? Use `x` in all the boxes that apply: -->
   - [x] Bug fix (generally a non-breaking change which closes an issue).
   - [ ] Enhancement or new feature (adds new functionality).
   - [ ] Breaking change (a bug fix or enhancement which changes existing behavior).
   
   ## Checklist:
   <!--- Please review the points below which help you make sure you've covered all aspects of the change you're making. -->
   
   - [x] I signed an [Apache CLA](https://github.com/apache/openwhisk/blob/master/CONTRIBUTING.md).
   - [x] I reviewed the [style guides](https://github.com/apache/openwhisk/wiki/Contributing:-Git-guidelines#code-readiness) and followed the recommendations (Travis CI will check :).
   - [ ] I added tests to cover my changes.
   - [ ] My changes require further changes to the documentation.
   - [ ] I updated the documentation where necessary.
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [openwhisk] tysonnorris commented on a change in pull request #4951: add defensive code for dealing with loadbalancer concurrency tracking

Posted by GitBox <gi...@apache.org>.
tysonnorris commented on a change in pull request #4951:
URL: https://github.com/apache/openwhisk/pull/4951#discussion_r485199861



##########
File path: common/scala/src/main/scala/org/apache/openwhisk/core/connector/MessageConsumer.scala
##########
@@ -213,7 +213,13 @@ class MessageFeed(description: String,
       outstandingMessages = outstandingMessages.tail
 
       if (logHandoff) logging.debug(this, s"processing $topic[$partition][$offset] ($occupancy/$handlerCapacity)")
-      handler(bytes)
+      handler(bytes).andThen {
+        {
+          case Failure(e) =>
+            logging.error(this, s"Failed to process message for topic $topic : $e  (stack trace included)")
+            e.printStackTrace()

Review comment:
       Now it is, and I forgot to remove the `e.printStackTrace()` when I changed the log line to include it... 😔  fixing that...




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [openwhisk] tysonnorris commented on a change in pull request #4951: add defensive code for dealing with loadbalancer concurrency tracking

Posted by GitBox <gi...@apache.org>.
tysonnorris commented on a change in pull request #4951:
URL: https://github.com/apache/openwhisk/pull/4951#discussion_r486749255



##########
File path: common/scala/src/main/scala/org/apache/openwhisk/common/NestedSemaphore.scala
##########
@@ -100,14 +100,19 @@ class NestedSemaphore[T](memoryPermits: Int) extends ForcibleSemaphore(memoryPer
     if (maxConcurrent == 1) {
       super.release(memoryPermits)
     } else {
-      val concurrentSlots = actionConcurrentSlotsMap(actionid)
-      val (memoryRelease, actionRelease) = concurrentSlots.release(1, true)
-      //concurrent slots
-      if (memoryRelease) {
-        super.release(memoryPermits)
-      }
-      if (actionRelease) {
-        actionConcurrentSlotsMap.remove(actionid)
+      //This map may be recreated (multiple times) due to cluster membership events, so don't assume the entry exists...
+      actionConcurrentSlotsMap.get(actionid) match {

Review comment:
       The stack trace is added to logging here for the catastrophic case where message consumer processing fails - previously there is just a stack trace added without indicating it was part of message processing etc. This is slightly better, I think, since it provides an ERROR log, and also associates it with message consumption.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [openwhisk] tysonnorris commented on a change in pull request #4951: add defensive code for dealing with loadbalancer concurrency tracking

Posted by GitBox <gi...@apache.org>.
tysonnorris commented on a change in pull request #4951:
URL: https://github.com/apache/openwhisk/pull/4951#discussion_r486749255



##########
File path: common/scala/src/main/scala/org/apache/openwhisk/common/NestedSemaphore.scala
##########
@@ -100,14 +100,19 @@ class NestedSemaphore[T](memoryPermits: Int) extends ForcibleSemaphore(memoryPer
     if (maxConcurrent == 1) {
       super.release(memoryPermits)
     } else {
-      val concurrentSlots = actionConcurrentSlotsMap(actionid)
-      val (memoryRelease, actionRelease) = concurrentSlots.release(1, true)
-      //concurrent slots
-      if (memoryRelease) {
-        super.release(memoryPermits)
-      }
-      if (actionRelease) {
-        actionConcurrentSlotsMap.remove(actionid)
+      //This map may be recreated (multiple times) due to cluster membership events, so don't assume the entry exists...
+      actionConcurrentSlotsMap.get(actionid) match {

Review comment:
       The stack trace is added to logging here for the catastrophic case where message consumer processing fails - previously there is just a stack trace added without indicating it was part of message processing etc. This is slightly better, I think, since it provides an ERROR log, and also associates it with message consumption.

##########
File path: common/scala/src/main/scala/org/apache/openwhisk/common/NestedSemaphore.scala
##########
@@ -100,14 +100,19 @@ class NestedSemaphore[T](memoryPermits: Int) extends ForcibleSemaphore(memoryPer
     if (maxConcurrent == 1) {
       super.release(memoryPermits)
     } else {
-      val concurrentSlots = actionConcurrentSlotsMap(actionid)
-      val (memoryRelease, actionRelease) = concurrentSlots.release(1, true)
-      //concurrent slots
-      if (memoryRelease) {
-        super.release(memoryPermits)
-      }
-      if (actionRelease) {
-        actionConcurrentSlotsMap.remove(actionid)
+      //This map may be recreated (multiple times) due to cluster membership events, so don't assume the entry exists...
+      actionConcurrentSlotsMap.get(actionid) match {

Review comment:
       The stack trace is added to logging here for the catastrophic case where message consumer processing fails - previously there is just a stack trace added without indicating it was part of message processing etc. This is slightly better, I think, since it provides an ERROR log, and also associates it with message consumption.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [openwhisk] markusthoemmes commented on a change in pull request #4951: add defensive code for dealing with loadbalancer concurrency tracking

Posted by GitBox <gi...@apache.org>.
markusthoemmes commented on a change in pull request #4951:
URL: https://github.com/apache/openwhisk/pull/4951#discussion_r483413869



##########
File path: common/scala/src/main/scala/org/apache/openwhisk/common/NestedSemaphore.scala
##########
@@ -100,14 +100,19 @@ class NestedSemaphore[T](memoryPermits: Int) extends ForcibleSemaphore(memoryPer
     if (maxConcurrent == 1) {
       super.release(memoryPermits)
     } else {
-      val concurrentSlots = actionConcurrentSlotsMap(actionid)
-      val (memoryRelease, actionRelease) = concurrentSlots.release(1, true)
-      //concurrent slots
-      if (memoryRelease) {
-        super.release(memoryPermits)
-      }
-      if (actionRelease) {
-        actionConcurrentSlotsMap.remove(actionid)
+      //This map may be recreated (multiple times) due to cluster membership events, so don't assume the entry exists...
+      actionConcurrentSlotsMap.get(actionid) match {

Review comment:
       ```suggestion
         actionConcurrentSlotsMap.get(actionid).foreach { concurrentSlots =>
   ```
   
   Would spare you from having the awkward empty `None` case

##########
File path: common/scala/src/main/scala/org/apache/openwhisk/core/connector/MessageConsumer.scala
##########
@@ -213,7 +213,13 @@ class MessageFeed(description: String,
       outstandingMessages = outstandingMessages.tail
 
       if (logHandoff) logging.debug(this, s"processing $topic[$partition][$offset] ($occupancy/$handlerCapacity)")
-      handler(bytes)
+      handler(bytes).andThen {
+        {
+          case Failure(e) =>
+            logging.error(this, s"Failed to process message for topic $topic : $e  (stack trace included)")
+            e.printStackTrace()

Review comment:
       Isn't it already part of the log line above anyway? I guess the broader question is: Why is the stack trace even needed here?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [openwhisk] tysonnorris commented on a change in pull request #4951: add defensive code for dealing with loadbalancer concurrency tracking

Posted by GitBox <gi...@apache.org>.
tysonnorris commented on a change in pull request #4951:
URL: https://github.com/apache/openwhisk/pull/4951#discussion_r477502950



##########
File path: common/scala/src/main/scala/org/apache/openwhisk/core/connector/MessageConsumer.scala
##########
@@ -213,7 +213,13 @@ class MessageFeed(description: String,
       outstandingMessages = outstandingMessages.tail
 
       if (logHandoff) logging.debug(this, s"processing $topic[$partition][$offset] ($occupancy/$handlerCapacity)")
-      handler(bytes)
+      handler(bytes).andThen {
+        {
+          case Failure(e) =>
+            logging.error(this, s"Failed to process message for topic $topic : $e  (stack trace included)")
+            e.printStackTrace()

Review comment:
       not intentional - can you refer me to an example of better formatting for stack trace?
   Thanks!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [openwhisk] tysonnorris commented on a change in pull request #4951: add defensive code for dealing with loadbalancer concurrency tracking

Posted by GitBox <gi...@apache.org>.
tysonnorris commented on a change in pull request #4951:
URL: https://github.com/apache/openwhisk/pull/4951#discussion_r486749255



##########
File path: common/scala/src/main/scala/org/apache/openwhisk/common/NestedSemaphore.scala
##########
@@ -100,14 +100,19 @@ class NestedSemaphore[T](memoryPermits: Int) extends ForcibleSemaphore(memoryPer
     if (maxConcurrent == 1) {
       super.release(memoryPermits)
     } else {
-      val concurrentSlots = actionConcurrentSlotsMap(actionid)
-      val (memoryRelease, actionRelease) = concurrentSlots.release(1, true)
-      //concurrent slots
-      if (memoryRelease) {
-        super.release(memoryPermits)
-      }
-      if (actionRelease) {
-        actionConcurrentSlotsMap.remove(actionid)
+      //This map may be recreated (multiple times) due to cluster membership events, so don't assume the entry exists...
+      actionConcurrentSlotsMap.get(actionid) match {

Review comment:
       The stack trace is added to logging here for the catastrophic case where message consumer processing fails - previously there is just a stack trace added without indicating it was part of message processing etc. This is slightly better, I think, since it provides an ERROR log, and also associates it with message consumption.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [openwhisk] tysonnorris commented on a change in pull request #4951: add defensive code for dealing with loadbalancer concurrency tracking

Posted by GitBox <gi...@apache.org>.
tysonnorris commented on a change in pull request #4951:
URL: https://github.com/apache/openwhisk/pull/4951#discussion_r485200531



##########
File path: common/scala/src/main/scala/org/apache/openwhisk/common/NestedSemaphore.scala
##########
@@ -100,14 +100,19 @@ class NestedSemaphore[T](memoryPermits: Int) extends ForcibleSemaphore(memoryPer
     if (maxConcurrent == 1) {
       super.release(memoryPermits)
     } else {
-      val concurrentSlots = actionConcurrentSlotsMap(actionid)
-      val (memoryRelease, actionRelease) = concurrentSlots.release(1, true)
-      //concurrent slots
-      if (memoryRelease) {
-        super.release(memoryPermits)
-      }
-      if (actionRelease) {
-        actionConcurrentSlotsMap.remove(actionid)
+      //This map may be recreated (multiple times) due to cluster membership events, so don't assume the entry exists...
+      actionConcurrentSlotsMap.get(actionid) match {

Review comment:
       👍 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [openwhisk] rabbah commented on a change in pull request #4951: add defensive code for dealing with loadbalancer concurrency tracking

Posted by GitBox <gi...@apache.org>.
rabbah commented on a change in pull request #4951:
URL: https://github.com/apache/openwhisk/pull/4951#discussion_r477499064



##########
File path: common/scala/src/main/scala/org/apache/openwhisk/core/connector/MessageConsumer.scala
##########
@@ -213,7 +213,13 @@ class MessageFeed(description: String,
       outstandingMessages = outstandingMessages.tail
 
       if (logHandoff) logging.debug(this, s"processing $topic[$partition][$offset] ($occupancy/$handlerCapacity)")
-      handler(bytes)
+      handler(bytes).andThen {
+        {
+          case Failure(e) =>
+            logging.error(this, s"Failed to process message for topic $topic : $e  (stack trace included)")
+            e.printStackTrace()

Review comment:
       printing the stack this way excludes it from the logger formatting - is that intended? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [openwhisk] markusthoemmes commented on a change in pull request #4951: add defensive code for dealing with loadbalancer concurrency tracking

Posted by GitBox <gi...@apache.org>.
markusthoemmes commented on a change in pull request #4951:
URL: https://github.com/apache/openwhisk/pull/4951#discussion_r530235102



##########
File path: common/scala/src/main/scala/org/apache/openwhisk/core/connector/MessageConsumer.scala
##########
@@ -213,7 +214,13 @@ class MessageFeed(description: String,
       outstandingMessages = outstandingMessages.tail
 
       if (logHandoff) logging.debug(this, s"processing $topic[$partition][$offset] ($occupancy/$handlerCapacity)")
-      handler(bytes)
+      handler(bytes).andThen {
+        {

Review comment:
       Are the inner braces needed?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org