You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ra...@apache.org on 2018/08/25 13:49:52 UTC
[incubator-openwhisk] branch master updated: remove prototype of
the invoker-agent based LogStore (#3975)
This is an automated email from the ASF dual-hosted git repository.
rabbah pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new acdf27e remove prototype of the invoker-agent based LogStore (#3975)
acdf27e is described below
commit acdf27ee31c8df1e184379ab8302c08c8637ea29
Author: David Grove <dg...@users.noreply.github.com>
AuthorDate: Sat Aug 25 09:49:47 2018 -0400
remove prototype of the invoker-agent based LogStore (#3975)
Matches the PR to remove the underlying prototype support
from the invoker agent in the kube-deploy project. Focusing
instead on evolving logging to avoid the need for the invoker
to be involved in log enrichment.
---
.../KubernetesClientWithInvokerAgent.scala | 38 +----------
.../kubernetes/KubernetesContainer.scala | 31 +--------
.../KubernetesInvokerAgentLogStore.scala | 76 ----------------------
.../kubernetes/test/KubernetesContainerTests.scala | 21 ------
4 files changed, 2 insertions(+), 164 deletions(-)
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesClientWithInvokerAgent.scala b/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesClientWithInvokerAgent.scala
index 417a287..bf308ac 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesClientWithInvokerAgent.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesClientWithInvokerAgent.scala
@@ -22,14 +22,12 @@ import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.Uri.Path
import akka.http.scaladsl.model.{HttpRequest, HttpResponse, MessageEntity, Uri}
-import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.http.scaladsl.marshalling.Marshal
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
import pureconfig.loadConfigOrThrow
import spray.json._
import spray.json.DefaultJsonProtocol._
import whisk.core.ConfigKeys
-import whisk.core.entity.ByteSize
import collection.JavaConverters._
import scala.concurrent.{blocking, ExecutionContext, Future}
@@ -37,7 +35,7 @@ import scala.concurrent.{blocking, ExecutionContext, Future}
/**
* An extended kubernetes client that works in tandem with an invokerAgent DaemonSet with
* instances running on every worker node that runs user containers to provide
- * suspend/resume capability and higher performance log processing.
+ * suspend/resume capability.
*/
class KubernetesClientWithInvokerAgent(config: KubernetesClientConfig =
loadConfigOrThrow[KubernetesClientConfig](ConfigKeys.kubernetes))(
@@ -86,23 +84,6 @@ class KubernetesClientWithInvokerAgent(config: KubernetesClientConfig =
.map(_.discardEntityBytes())
}
- override def forwardLogs(container: KubernetesContainer,
- lastOffset: Long,
- sizeLimit: ByteSize,
- sentinelledLogs: Boolean,
- additionalMetadata: Map[String, JsValue],
- augmentedActivation: JsObject)(implicit transid: TransactionId): Future[Long] = {
- val serializedData = Map(
- "lastOffset" -> JsNumber(lastOffset),
- "sizeLimit" -> JsNumber(sizeLimit.toBytes),
- "sentinelledLogs" -> JsBoolean(sentinelledLogs),
- "encodedLogLineMetadata" -> JsString(fieldsString(additionalMetadata)),
- "encodedActivation" -> JsString(augmentedActivation.compactPrint))
-
- agentCommand("logs", container, Some(serializedData))
- .flatMap(response => Unmarshal(response.entity).to[String].map(_.toLong))
- }
-
override def agentCommand(command: String,
container: KubernetesContainer,
payload: Option[Map[String, JsValue]] = None): Future[HttpResponse] = {
@@ -138,21 +119,4 @@ trait KubernetesApiWithInvokerAgent extends KubernetesApi {
container: KubernetesContainer,
payload: Option[Map[String, JsValue]] = None): Future[HttpResponse]
- /**
- * Forward a section the argument container's stdout/stderr output to an external logging service.
- *
- * @param container the container whose logs should be forwarded
- * @param lastOffset the last offset previously read in the remote log file
- * @param sizeLimit The maximum number of bytes of log that should be forwarded before truncation
- * @param sentinelledLogs Should the log forwarder expect a sentinel line at the end of stdout/stderr streams?
- * @param additionalMetadata Additional metadata that should be injected into every log line
- * @param augmentedActivation Activation record to be appended to the forwarded log.
- * @return the last offset read from the remote log file (to be used on next call to forwardLogs)
- */
- def forwardLogs(container: KubernetesContainer,
- lastOffset: Long,
- sizeLimit: ByteSize,
- sentinelledLogs: Boolean,
- additionalMetadata: Map[String, JsValue],
- augmentedActivation: JsObject)(implicit transid: TransactionId): Future[Long]
}
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainer.scala b/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainer.scala
index f812add..56e8f10 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainer.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainer.scala
@@ -19,12 +19,11 @@ package whisk.core.containerpool.kubernetes
import akka.actor.ActorSystem
import java.time.Instant
-import java.util.concurrent.atomic.{AtomicLong, AtomicReference}
+import java.util.concurrent.atomic.AtomicReference
import akka.stream.StreamLimitReachedException
import akka.stream.scaladsl.Framing.FramingException
import akka.stream.scaladsl.Source
import akka.util.ByteString
-import spray.json._
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.concurrent.duration._
@@ -98,9 +97,6 @@ class KubernetesContainer(protected[core] val id: ContainerId,
/** The last read timestamp in the log file */
private val lastTimestamp = new AtomicReference[Option[Instant]](None)
- /** The last offset read in the remote log file */
- private val lastOffset = new AtomicLong(0)
-
protected val waitForLogs: FiniteDuration = 2.seconds
def suspend()(implicit transid: TransactionId): Future[Unit] = kubernetes.suspend(this)
@@ -114,31 +110,6 @@ class KubernetesContainer(protected[core] val id: ContainerId,
private val stringSentinel = DockerContainer.ActivationSentinel.utf8String
- /**
- * Request that the activation's log output be forwarded to an external log service (implicit in LogProvider choice).
- * Additional per log line metadata and the activation record is provided to be optionally included
- * in the forwarded log entry.
- *
- * @param sizeLimit The maximum number of bytes of log that should be forwardewd
- * @param sentinelledLogs Should the log forwarder expect a sentinel line at the end of stdout/stderr streams?
- * @param additionalMetadata Additional metadata that should be injected into every log line
- * @param augmentedActivation Activation record to be appended to the forwarded log.
- */
- def forwardLogs(sizeLimit: ByteSize,
- sentinelledLogs: Boolean,
- additionalMetadata: Map[String, JsValue],
- augmentedActivation: JsObject)(implicit transid: TransactionId): Future[Unit] = {
- kubernetes match {
- case client: KubernetesApiWithInvokerAgent => {
- client
- .forwardLogs(this, lastOffset.get, sizeLimit, sentinelledLogs, additionalMetadata, augmentedActivation)
- .map(newOffset => lastOffset.set(newOffset))
- }
- case _ =>
- Future.failed(new UnsupportedOperationException("forwardLogs requires whisk.kubernetes.invokerAgent.enabled"))
- }
- }
-
def logs(limit: ByteSize, waitForSentinel: Boolean)(implicit transid: TransactionId): Source[ByteString, Any] = {
kubernetes
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesInvokerAgentLogStore.scala b/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesInvokerAgentLogStore.scala
deleted file mode 100644
index 63e6cb0..0000000
--- a/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesInvokerAgentLogStore.scala
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package whisk.core.containerpool.kubernetes
-
-import akka.actor.ActorSystem
-import akka.stream.ActorMaterializer
-import spray.json._
-import spray.json.DefaultJsonProtocol._
-import whisk.common.TransactionId
-import whisk.core.containerpool.Container
-import whisk.core.containerpool.logging.{LogCollectingException, LogDriverLogStore, LogStore, LogStoreProvider}
-import whisk.core.entity.{ActivationLogs, ExecutableWhiskAction, Identity, WhiskActivation}
-
-import scala.concurrent.{ExecutionContext, Future}
-
-/**
- * A LogStore implementation for Kubernetes that delegates all log processing to a remote invokerAgent that
- * runs on the worker node where the user container is executing. The remote invokerAgent will read container logs,
- * enrich them with the activation-specific metadata it is provided, and consolidate them into a remote
- * combined log file that can be processed asynchronously by log forwarding services.
- *
- * Logs are never processed by the invoker itself and therefore are not stored in the activation record;
- * collectLogs will return an empty ActivationLogs.
- */
-class KubernetesInvokerAgentLogStore(system: ActorSystem) extends LogDriverLogStore(system) {
- implicit val ec: ExecutionContext = system.dispatcher
- implicit val mat: ActorMaterializer = ActorMaterializer()(system)
-
- override def collectLogs(transid: TransactionId,
- user: Identity,
- activation: WhiskActivation,
- container: Container,
- action: ExecutableWhiskAction): Future[ActivationLogs] = {
-
- val sizeLimit = action.limits.logs.asMegaBytes
- val sentinelledLogs = action.exec.sentinelledLogs
-
- // Add the userId field to every written record, so any background process can properly correlate.
- val userIdField = Map("namespaceId" -> user.namespace.uuid.toJson)
-
- val additionalMetadata = Map(
- "activationId" -> activation.activationId.asString.toJson,
- "action" -> action.fullyQualifiedName(false).asString.toJson) ++ userIdField
-
- val augmentedActivation = JsObject(activation.toJson.fields ++ userIdField)
-
- container match {
- case kc: KubernetesContainer => {
- kc.forwardLogs(sizeLimit, sentinelledLogs, additionalMetadata, augmentedActivation)(transid)
- .map { _ =>
- ActivationLogs()
- }
- }
- case _ => Future.failed(LogCollectingException(ActivationLogs()))
- }
- }
-}
-
-object KubernetesInvokerAgentLogStoreProvider extends LogStoreProvider {
- override def instance(actorSystem: ActorSystem): LogStore = new KubernetesInvokerAgentLogStore(actorSystem)
-}
diff --git a/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala b/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala
index bd3c948..2354bc9 100644
--- a/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala
@@ -50,7 +50,6 @@ import whisk.core.entity.ActivationResponse.Timeout
import whisk.core.entity.size._
import whisk.http.Messages
import whisk.core.containerpool.docker.test.DockerContainerTests._
-import whisk.core.containerpool.kubernetes.test.KubernetesClientTests.TestKubernetesClientWithInvokerAgent
import scala.collection.{immutable, mutable}
@@ -297,26 +296,6 @@ class KubernetesContainerTests
}
/*
- * LOG FORWARDING
- */
- it should "container should maintain lastOffset across calls to forwardLogs" in {
- implicit val kubernetes = new TestKubernetesClientWithInvokerAgent
- val id = ContainerId("id")
- val container = new KubernetesContainer(id, ContainerAddress("ip"), "127.0.0.1", "docker://foo")
- val logChunk = 10.kilobytes
-
- await(container.forwardLogs(logChunk, false, Map.empty, JsObject.empty))
- await(container.forwardLogs(42.bytes, false, Map.empty, JsObject.empty))
- await(container.forwardLogs(logChunk, false, Map.empty, JsObject.empty))
- await(container.forwardLogs(42.bytes, false, Map.empty, JsObject.empty))
-
- kubernetes.forwardLogs(0) shouldBe (id, 0)
- kubernetes.forwardLogs(1) shouldBe (id, logChunk.toBytes)
- kubernetes.forwardLogs(2) shouldBe (id, logChunk.toBytes + 42)
- kubernetes.forwardLogs(3) shouldBe (id, 2 * logChunk.toBytes + 42)
- }
-
- /*
* LOGS
*/
it should "read a simple log with sentinel" in {