You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by ra...@apache.org on 2018/08/25 13:49:52 UTC

[incubator-openwhisk] branch master updated: remove prototype of the invoker-agent based LogStore (#3975)

This is an automated email from the ASF dual-hosted git repository.

rabbah pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new acdf27e  remove prototype of the invoker-agent based LogStore (#3975)
acdf27e is described below

commit acdf27ee31c8df1e184379ab8302c08c8637ea29
Author: David Grove <dg...@users.noreply.github.com>
AuthorDate: Sat Aug 25 09:49:47 2018 -0400

    remove prototype of the invoker-agent based LogStore (#3975)
    
    Matches the PR to remove the underlying prototype support
    from the invoker agent in the kube-deploy project. Focusing
    instead on evolving logging to avoid the need for the invoker
    to be involved in log enrichment.
---
 .../KubernetesClientWithInvokerAgent.scala         | 38 +----------
 .../kubernetes/KubernetesContainer.scala           | 31 +--------
 .../KubernetesInvokerAgentLogStore.scala           | 76 ----------------------
 .../kubernetes/test/KubernetesContainerTests.scala | 21 ------
 4 files changed, 2 insertions(+), 164 deletions(-)

diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesClientWithInvokerAgent.scala b/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesClientWithInvokerAgent.scala
index 417a287..bf308ac 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesClientWithInvokerAgent.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesClientWithInvokerAgent.scala
@@ -22,14 +22,12 @@ import akka.actor.ActorSystem
 import akka.http.scaladsl.Http
 import akka.http.scaladsl.model.Uri.Path
 import akka.http.scaladsl.model.{HttpRequest, HttpResponse, MessageEntity, Uri}
-import akka.http.scaladsl.unmarshalling.Unmarshal
 import akka.http.scaladsl.marshalling.Marshal
 import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
 import pureconfig.loadConfigOrThrow
 import spray.json._
 import spray.json.DefaultJsonProtocol._
 import whisk.core.ConfigKeys
-import whisk.core.entity.ByteSize
 
 import collection.JavaConverters._
 import scala.concurrent.{blocking, ExecutionContext, Future}
@@ -37,7 +35,7 @@ import scala.concurrent.{blocking, ExecutionContext, Future}
 /**
  * An extended kubernetes client that works in tandem with an invokerAgent DaemonSet with
  * instances running on every worker node that runs user containers to provide
- * suspend/resume capability and higher performance log processing.
+ * suspend/resume capability.
  */
 class KubernetesClientWithInvokerAgent(config: KubernetesClientConfig =
                                          loadConfigOrThrow[KubernetesClientConfig](ConfigKeys.kubernetes))(
@@ -86,23 +84,6 @@ class KubernetesClientWithInvokerAgent(config: KubernetesClientConfig =
       .map(_.discardEntityBytes())
   }
 
-  override def forwardLogs(container: KubernetesContainer,
-                           lastOffset: Long,
-                           sizeLimit: ByteSize,
-                           sentinelledLogs: Boolean,
-                           additionalMetadata: Map[String, JsValue],
-                           augmentedActivation: JsObject)(implicit transid: TransactionId): Future[Long] = {
-    val serializedData = Map(
-      "lastOffset" -> JsNumber(lastOffset),
-      "sizeLimit" -> JsNumber(sizeLimit.toBytes),
-      "sentinelledLogs" -> JsBoolean(sentinelledLogs),
-      "encodedLogLineMetadata" -> JsString(fieldsString(additionalMetadata)),
-      "encodedActivation" -> JsString(augmentedActivation.compactPrint))
-
-    agentCommand("logs", container, Some(serializedData))
-      .flatMap(response => Unmarshal(response.entity).to[String].map(_.toLong))
-  }
-
   override def agentCommand(command: String,
                             container: KubernetesContainer,
                             payload: Option[Map[String, JsValue]] = None): Future[HttpResponse] = {
@@ -138,21 +119,4 @@ trait KubernetesApiWithInvokerAgent extends KubernetesApi {
                    container: KubernetesContainer,
                    payload: Option[Map[String, JsValue]] = None): Future[HttpResponse]
 
-  /**
-   * Forward a section the argument container's stdout/stderr output to an external logging service.
-   *
-   * @param container the container whose logs should be forwarded
-   * @param lastOffset the last offset previously read in the remote log file
-   * @param sizeLimit The maximum number of bytes of log that should be forwarded before truncation
-   * @param sentinelledLogs Should the log forwarder expect a sentinel line at the end of stdout/stderr streams?
-   * @param additionalMetadata Additional metadata that should be injected into every log line
-   * @param augmentedActivation Activation record to be appended to the forwarded log.
-   * @return the last offset read from the remote log file (to be used on next call to forwardLogs)
-   */
-  def forwardLogs(container: KubernetesContainer,
-                  lastOffset: Long,
-                  sizeLimit: ByteSize,
-                  sentinelledLogs: Boolean,
-                  additionalMetadata: Map[String, JsValue],
-                  augmentedActivation: JsObject)(implicit transid: TransactionId): Future[Long]
 }
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainer.scala b/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainer.scala
index f812add..56e8f10 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainer.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainer.scala
@@ -19,12 +19,11 @@ package whisk.core.containerpool.kubernetes
 
 import akka.actor.ActorSystem
 import java.time.Instant
-import java.util.concurrent.atomic.{AtomicLong, AtomicReference}
+import java.util.concurrent.atomic.AtomicReference
 import akka.stream.StreamLimitReachedException
 import akka.stream.scaladsl.Framing.FramingException
 import akka.stream.scaladsl.Source
 import akka.util.ByteString
-import spray.json._
 import scala.concurrent.ExecutionContext
 import scala.concurrent.Future
 import scala.concurrent.duration._
@@ -98,9 +97,6 @@ class KubernetesContainer(protected[core] val id: ContainerId,
   /** The last read timestamp in the log file */
   private val lastTimestamp = new AtomicReference[Option[Instant]](None)
 
-  /** The last offset read in the remote log file */
-  private val lastOffset = new AtomicLong(0)
-
   protected val waitForLogs: FiniteDuration = 2.seconds
 
   def suspend()(implicit transid: TransactionId): Future[Unit] = kubernetes.suspend(this)
@@ -114,31 +110,6 @@ class KubernetesContainer(protected[core] val id: ContainerId,
 
   private val stringSentinel = DockerContainer.ActivationSentinel.utf8String
 
-  /**
-   * Request that the activation's log output be forwarded to an external log service (implicit in LogProvider choice).
-   * Additional per log line metadata and the activation record is provided to be optionally included
-   * in the forwarded log entry.
-   *
-   * @param sizeLimit The maximum number of bytes of log that should be forwardewd
-   * @param sentinelledLogs Should the log forwarder expect a sentinel line at the end of stdout/stderr streams?
-   * @param additionalMetadata Additional metadata that should be injected into every log line
-   * @param augmentedActivation Activation record to be appended to the forwarded log.
-   */
-  def forwardLogs(sizeLimit: ByteSize,
-                  sentinelledLogs: Boolean,
-                  additionalMetadata: Map[String, JsValue],
-                  augmentedActivation: JsObject)(implicit transid: TransactionId): Future[Unit] = {
-    kubernetes match {
-      case client: KubernetesApiWithInvokerAgent => {
-        client
-          .forwardLogs(this, lastOffset.get, sizeLimit, sentinelledLogs, additionalMetadata, augmentedActivation)
-          .map(newOffset => lastOffset.set(newOffset))
-      }
-      case _ =>
-        Future.failed(new UnsupportedOperationException("forwardLogs requires whisk.kubernetes.invokerAgent.enabled"))
-    }
-  }
-
   def logs(limit: ByteSize, waitForSentinel: Boolean)(implicit transid: TransactionId): Source[ByteString, Any] = {
 
     kubernetes
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesInvokerAgentLogStore.scala b/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesInvokerAgentLogStore.scala
deleted file mode 100644
index 63e6cb0..0000000
--- a/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesInvokerAgentLogStore.scala
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package whisk.core.containerpool.kubernetes
-
-import akka.actor.ActorSystem
-import akka.stream.ActorMaterializer
-import spray.json._
-import spray.json.DefaultJsonProtocol._
-import whisk.common.TransactionId
-import whisk.core.containerpool.Container
-import whisk.core.containerpool.logging.{LogCollectingException, LogDriverLogStore, LogStore, LogStoreProvider}
-import whisk.core.entity.{ActivationLogs, ExecutableWhiskAction, Identity, WhiskActivation}
-
-import scala.concurrent.{ExecutionContext, Future}
-
-/**
- * A LogStore implementation for Kubernetes that delegates all log processing to a remote invokerAgent that
- * runs on the worker node where the user container is executing.  The remote invokerAgent will read container logs,
- * enrich them with the activation-specific metadata it is provided, and consolidate them into a remote
- * combined log file that can be processed asynchronously by log forwarding services.
- *
- * Logs are never processed by the invoker itself and therefore are not stored in the activation record;
- * collectLogs will return an empty ActivationLogs.
- */
-class KubernetesInvokerAgentLogStore(system: ActorSystem) extends LogDriverLogStore(system) {
-  implicit val ec: ExecutionContext = system.dispatcher
-  implicit val mat: ActorMaterializer = ActorMaterializer()(system)
-
-  override def collectLogs(transid: TransactionId,
-                           user: Identity,
-                           activation: WhiskActivation,
-                           container: Container,
-                           action: ExecutableWhiskAction): Future[ActivationLogs] = {
-
-    val sizeLimit = action.limits.logs.asMegaBytes
-    val sentinelledLogs = action.exec.sentinelledLogs
-
-    // Add the userId field to every written record, so any background process can properly correlate.
-    val userIdField = Map("namespaceId" -> user.namespace.uuid.toJson)
-
-    val additionalMetadata = Map(
-      "activationId" -> activation.activationId.asString.toJson,
-      "action" -> action.fullyQualifiedName(false).asString.toJson) ++ userIdField
-
-    val augmentedActivation = JsObject(activation.toJson.fields ++ userIdField)
-
-    container match {
-      case kc: KubernetesContainer => {
-        kc.forwardLogs(sizeLimit, sentinelledLogs, additionalMetadata, augmentedActivation)(transid)
-          .map { _ =>
-            ActivationLogs()
-          }
-      }
-      case _ => Future.failed(LogCollectingException(ActivationLogs()))
-    }
-  }
-}
-
-object KubernetesInvokerAgentLogStoreProvider extends LogStoreProvider {
-  override def instance(actorSystem: ActorSystem): LogStore = new KubernetesInvokerAgentLogStore(actorSystem)
-}
diff --git a/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala b/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala
index bd3c948..2354bc9 100644
--- a/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala
@@ -50,7 +50,6 @@ import whisk.core.entity.ActivationResponse.Timeout
 import whisk.core.entity.size._
 import whisk.http.Messages
 import whisk.core.containerpool.docker.test.DockerContainerTests._
-import whisk.core.containerpool.kubernetes.test.KubernetesClientTests.TestKubernetesClientWithInvokerAgent
 
 import scala.collection.{immutable, mutable}
 
@@ -297,26 +296,6 @@ class KubernetesContainerTests
   }
 
   /*
-   * LOG FORWARDING
-   */
-  it should "container should maintain lastOffset across calls to forwardLogs" in {
-    implicit val kubernetes = new TestKubernetesClientWithInvokerAgent
-    val id = ContainerId("id")
-    val container = new KubernetesContainer(id, ContainerAddress("ip"), "127.0.0.1", "docker://foo")
-    val logChunk = 10.kilobytes
-
-    await(container.forwardLogs(logChunk, false, Map.empty, JsObject.empty))
-    await(container.forwardLogs(42.bytes, false, Map.empty, JsObject.empty))
-    await(container.forwardLogs(logChunk, false, Map.empty, JsObject.empty))
-    await(container.forwardLogs(42.bytes, false, Map.empty, JsObject.empty))
-
-    kubernetes.forwardLogs(0) shouldBe (id, 0)
-    kubernetes.forwardLogs(1) shouldBe (id, logChunk.toBytes)
-    kubernetes.forwardLogs(2) shouldBe (id, logChunk.toBytes + 42)
-    kubernetes.forwardLogs(3) shouldBe (id, 2 * logChunk.toBytes + 42)
-  }
-
-  /*
    * LOGS
    */
   it should "read a simple log with sentinel" in {