You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@openwhisk.apache.org by GitBox <gi...@apache.org> on 2018/08/10 07:18:43 UTC

[GitHub] jiangpengcheng closed pull request #3656: Adjust the behavior when update controller cluster

jiangpengcheng closed pull request #3656: Adjust the behavior when update controller cluster
URL: https://github.com/apache/incubator-openwhisk/pull/3656
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/common/scala/src/main/scala/whisk/common/ForcibleSemaphore.scala b/common/scala/src/main/scala/whisk/common/ForcibleSemaphore.scala
index 8eb1252e85..440a6f446f 100644
--- a/common/scala/src/main/scala/whisk/common/ForcibleSemaphore.scala
+++ b/common/scala/src/main/scala/whisk/common/ForcibleSemaphore.scala
@@ -82,9 +82,22 @@ class ForcibleSemaphore(maxAllowed: Int) {
         forceAquireShared(acquires)
       }
     }
+
+    /**
+     * Reduce the permits, roughly the same as `java.util.concurrent.Semaphore.Sync#reducePermits`
+     */
+    @tailrec
+    final def reducePermits(reductions: Int): Unit = {
+      val available = getState
+      val newState = available - reductions
+      if (!compareAndSetState(available, newState)) {
+        reducePermits(reductions)
+      }
+    }
   }
 
   val sync = new Sync
+  private var maxPermits = maxAllowed
 
   /**
    * Acquires the given numbers of permits.
@@ -121,4 +134,18 @@ class ForcibleSemaphore(maxAllowed: Int) {
 
   /** Returns the number of currently available permits. Possibly negative. */
   def availablePermits: Int = sync.permits
+
+  /** Set the max permits for sync. */
+  def setMaxPermits(newMaxPermits: Int): Unit = synchronized {
+    require(newMaxPermits > 0, "maxPermits cannot be negative")
+    newMaxPermits - maxPermits match {
+      case 0 =>
+      case delta if delta > 0 =>
+        sync.releaseShared(delta) // increase the max permits
+      case delta if delta < 0 =>
+        sync.reducePermits(-delta) // reduce the max permits
+    }
+
+    maxPermits = newMaxPermits
+  }
 }
diff --git a/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala b/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
index 47be740710..0ba1c19b56 100644
--- a/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
+++ b/core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
@@ -589,7 +589,7 @@ case class ShardingContainerPoolBalancerState(
       _clusterSize = actualSize
       val newTreshold = (totalInvokerThreshold / actualSize) max 1 // letting this fall below 1 doesn't make sense
       currentInvokerThreshold = newTreshold
-      _invokerSlots = _invokerSlots.map(_ => new ForcibleSemaphore(currentInvokerThreshold))
+      _invokerSlots.foreach(_.setMaxPermits(currentInvokerThreshold))
 
       logging.info(
         this,
diff --git a/tests/src/test/scala/whisk/common/ForcibleSemaphoreTests.scala b/tests/src/test/scala/whisk/common/ForcibleSemaphoreTests.scala
index 87cea2e276..9ab9e6c580 100644
--- a/tests/src/test/scala/whisk/common/ForcibleSemaphoreTests.scala
+++ b/tests/src/test/scala/whisk/common/ForcibleSemaphoreTests.scala
@@ -85,4 +85,18 @@ class ForcibleSemaphoreTests extends FlatSpec with Matchers {
       acquires should contain theSameElementsAs result
     }
   }
+
+  it should "set the max allowed permits dynamically" in {
+    val s = new ForcibleSemaphore(10)
+    s.tryAcquire(2) shouldBe true // 8 permits left
+
+    s.setMaxPermits(5) // reduce max permits
+    s.availablePermits shouldBe 3
+
+    s.setMaxPermits(10) // increase max permits
+    s.availablePermits shouldBe 8
+
+    s.setMaxPermits(10) // nothing changed
+    s.availablePermits shouldBe 8
+  }
 }
diff --git a/tests/src/test/scala/whisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala b/tests/src/test/scala/whisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala
index 4e906d0144..8360c2c064 100644
--- a/tests/src/test/scala/whisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala
+++ b/tests/src/test/scala/whisk/core/loadBalancer/test/ShardingContainerPoolBalancerTests.scala
@@ -123,7 +123,8 @@ class ShardingContainerPoolBalancerTests extends FlatSpec with Matchers with Str
     state.invokerSlots.head.availablePermits shouldBe slots - 1
 
     state.updateCluster(2)
-    state.invokerSlots.head.availablePermits shouldBe slots / 2 // state reset + divided by 2
+    // the acquired shared is 1, so the availablePermits should be the `(new threshold) - 1`
+    state.invokerSlots.head.availablePermits shouldBe slots / 2 - 1
   }
 
   it should "fallback to a size of 1 (alone) if cluster size is < 1" in {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services