You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@openwhisk.apache.org by GitBox <gi...@apache.org> on 2018/02/08 11:12:20 UTC

[GitHub] rabbah commented on a change in pull request #3240: Add a loadbalancer with local state and horizontal invoker sharding.

rabbah commented on a change in pull request #3240: Add a loadbalancer with local state and horizontal invoker sharding.
URL: https://github.com/apache/incubator-openwhisk/pull/3240#discussion_r166901098
 
 

 ##########
 File path: core/controller/src/main/scala/whisk/core/loadBalancer/ShardingContainerPoolBalancer.scala
 ##########
 @@ -0,0 +1,439 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package whisk.core.loadBalancer
+
+import java.nio.charset.StandardCharsets
+import java.util.concurrent.atomic.LongAdder
+import java.util.concurrent.ThreadLocalRandom
+
+import akka.actor.{Actor, ActorSystem, Props}
+import akka.cluster.ClusterEvent._
+import akka.cluster.{Cluster, Member, MemberStatus}
+import akka.event.Logging.InfoLevel
+import akka.stream.ActorMaterializer
+import org.apache.kafka.clients.producer.RecordMetadata
+import pureconfig._
+import whisk.common.{ForcableSemaphore, Logging, LoggingMarkers, TransactionId}
+import whisk.core.WhiskConfig._
+import whisk.core.connector._
+import whisk.core.entity._
+import whisk.core.{ConfigKeys, WhiskConfig}
+import whisk.spi.SpiLoader
+
+import scala.annotation.tailrec
+import scala.collection.concurrent.TrieMap
+import scala.concurrent.duration._
+import scala.concurrent.{ExecutionContext, Future, Promise}
+import scala.util.{Failure, Success}
+
+/**
+ * A loadbalancer that uses "horizontal" sharding to not collide with fellow loadbalancers.
+ *
+ * Horizontal sharding means, that each invoker's capacity is evenly divided between the loadbalancers. If an invoker
+ * has at most 16 slots available, those will be divided to 8 slots for each loadbalancer (if there are 2).
+ */
+class ShardingContainerPoolBalancer(config: WhiskConfig, controllerInstance: InstanceId)(
+  implicit val actorSystem: ActorSystem,
+  logging: Logging,
+  materializer: ActorMaterializer)
+    extends LoadBalancer {
+
+  private implicit val executionContext: ExecutionContext = actorSystem.dispatcher
+
+  /** Build a cluster of all loadbalancers */
+  val seedNodesProvider = new StaticSeedNodesProvider(config.controllerSeedNodes, actorSystem.name)
+  val cluster = Cluster(actorSystem)
+  cluster.joinSeedNodes(seedNodesProvider.getSeedNodes())
+
+  /** Used to manage an action for testing invoker health */
+  private val entityStore = WhiskEntityStore.datastore(config)
+
+  /** State related to invocations and throttling */
+  private val activations = TrieMap[ActivationId, ActivationEntry]()
+  private val activationsPerNamespace = TrieMap[UUID, LongAdder]()
+  private val totalActivations = new LongAdder()
+
+  /** State needed for scheduling. */
+  private val schedulingState = ShardingContainerPoolBalancerState()()
+
+  /**
+   * Monitors invoker supervision and the cluster to update the state sequentially
+   *
+   * All state updates should go through this actor to guarantee, that `updateState` and `updateCluster` are called
+   * mutually exclusive and not concurrently.
+   */
+  private val monitor = actorSystem.actorOf(Props(new Actor {
+    override def preStart(): Unit = {
+      cluster.subscribe(self, classOf[MemberEvent], classOf[ReachabilityEvent])
+    }
+
+    // all members of the cluster that are available
+    var availableMembers = Set.empty[Member]
+
+    override def receive: Receive = {
+      case CurrentInvokerPoolState(newState) =>
+        schedulingState.updateInvokers(newState)
+
+      // State of the cluster as it is right now
+      case CurrentClusterState(members, _, _, _, _) =>
+        availableMembers = members.filter(_.status == MemberStatus.Up)
+        schedulingState.updateCluster(availableMembers.size)
+
+      // General lifecycle events and events concerning the reachability of members. Split-brain is not a huge concern
+      // in this case as only the invoker-threshold is adjusted according to the perceived cluster-size.
+      // Taking the unreachable member out of the cluster from that point-of-view results in a better experience
+      // even under split-brain-conditions, as that (in the worst-case) results in premature overloading of invokers vs.
+      // going into overflow mode prematurely.
+      case event: ClusterDomainEvent =>
+        availableMembers = event match {
+          case MemberUp(member)          => availableMembers + member
+          case ReachableMember(member)   => availableMembers + member
+          case MemberRemoved(member, _)  => availableMembers - member
+          case UnreachableMember(member) => availableMembers - member
+          case _                         => availableMembers
+        }
+
+        schedulingState.updateCluster(availableMembers.size)
+    }
+  }))
+
+  /** Loadbalancer interface methods */
+  override def invokerHealth(): Future[IndexedSeq[InvokerHealth]] = Future.successful(schedulingState.invokers)
+  override def activeActivationsFor(namespace: UUID): Future[Int] =
+    Future.successful(activationsPerNamespace.get(namespace).map(_.intValue()).getOrElse(0))
+  override def totalActiveActivations: Future[Int] = Future.successful(totalActivations.intValue())
+
+  /** 1. Publish a message to the loadbalancer */
+  override def publish(action: ExecutableWhiskActionMetaData, msg: ActivationMessage)(
+    implicit transid: TransactionId): Future[Future[Either[ActivationId, WhiskActivation]]] = {
+
+    val invokersToUse = if (action.exec.pull) schedulingState.blackboxInvokers else schedulingState.managedInvokers
+    val chosen = if (invokersToUse.nonEmpty) {
+      val hash = ShardingContainerPoolBalancer.generateHash(msg.user.namespace, action.fullyQualifiedName(false))
+      val homeInvoker = hash % invokersToUse.size
+      val stepSize = schedulingState.stepSizes(hash % schedulingState.stepSizes.size)
+      ShardingContainerPoolBalancer.schedule(invokersToUse, schedulingState.invokerSlots, homeInvoker, stepSize)
+    } else {
+      None
+    }
+
+    chosen
+      .map { invoker =>
+        val entry = setupActivation(msg, action, invoker)
+        sendActivationToInvoker(messageProducer, msg, invoker).map { _ =>
+          entry.promise.future
+        }
+      }
+      .getOrElse(Future.failed(LoadBalancerException("No invokers available")))
+  }
+
+  /** 2. Update local state with the to be executed activation */
+  private def setupActivation(msg: ActivationMessage,
+                              action: ExecutableWhiskActionMetaData,
+                              instance: InstanceId): ActivationEntry = {
+
+    totalActivations.increment()
+    activationsPerNamespace.getOrElseUpdate(msg.user.uuid, new LongAdder()).increment()
+
+    val timeout = action.limits.timeout.duration.max(TimeLimit.STD_DURATION) + 1.minute
+    // Install a timeout handler for the catastrophic case where an active ack is not received at all
+    // (because say an invoker is down completely, or the connection to the message bus is disrupted) or when
+    // the active ack is significantly delayed (possibly dues to long queues but the subject should not be penalized);
+    // in this case, if the activation handler is still registered, remove it and update the books.
+    activations.getOrElseUpdate(
+      msg.activationId, {
+        val timeoutHandler = actorSystem.scheduler.scheduleOnce(timeout) {
+          processCompletion(Left(msg.activationId), msg.transid, forced = true, invoker = instance)
+        }
+
+        // please note: timeoutHandler.cancel must be called on all non-timeout paths, e.g. Success
+        ActivationEntry(
+          msg.activationId,
+          msg.user.uuid,
+          instance,
+          timeoutHandler,
+          Promise[Either[ActivationId, WhiskActivation]]())
+      })
+  }
+
+  private val messagingProvider = SpiLoader.get[MessagingProvider]
+  private val messageProducer = messagingProvider.getProducer(config, executionContext)
+
+  /** 3. Send the activation to the invoker */
+  private def sendActivationToInvoker(producer: MessageProducer,
+                                      msg: ActivationMessage,
+                                      invoker: InstanceId): Future[RecordMetadata] = {
+    implicit val transid: TransactionId = msg.transid
+
+    val topic = s"invoker${invoker.toInt}"
+    val start = transid.started(
+      this,
+      LoggingMarkers.CONTROLLER_KAFKA,
+      s"posting topic '$topic' with activation id '${msg.activationId}'",
 
 Review comment:
   Pick up message from @mhenkes ?s pr?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services