You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@openwhisk.apache.org by GitBox <gi...@apache.org> on 2018/08/25 13:49:49 UTC

[GitHub] rabbah closed pull request #3975: remove prototype of the invoker-agent based LogStore

rabbah closed pull request #3975: remove prototype of the invoker-agent based LogStore
URL: https://github.com/apache/incubator-openwhisk/pull/3975
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesClientWithInvokerAgent.scala b/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesClientWithInvokerAgent.scala
index 417a2877e1..bf308ac818 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesClientWithInvokerAgent.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesClientWithInvokerAgent.scala
@@ -22,14 +22,12 @@ import akka.actor.ActorSystem
 import akka.http.scaladsl.Http
 import akka.http.scaladsl.model.Uri.Path
 import akka.http.scaladsl.model.{HttpRequest, HttpResponse, MessageEntity, Uri}
-import akka.http.scaladsl.unmarshalling.Unmarshal
 import akka.http.scaladsl.marshalling.Marshal
 import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
 import pureconfig.loadConfigOrThrow
 import spray.json._
 import spray.json.DefaultJsonProtocol._
 import whisk.core.ConfigKeys
-import whisk.core.entity.ByteSize
 
 import collection.JavaConverters._
 import scala.concurrent.{blocking, ExecutionContext, Future}
@@ -37,7 +35,7 @@ import scala.concurrent.{blocking, ExecutionContext, Future}
 /**
  * An extended kubernetes client that works in tandem with an invokerAgent DaemonSet with
  * instances running on every worker node that runs user containers to provide
- * suspend/resume capability and higher performance log processing.
+ * suspend/resume capability.
  */
 class KubernetesClientWithInvokerAgent(config: KubernetesClientConfig =
                                          loadConfigOrThrow[KubernetesClientConfig](ConfigKeys.kubernetes))(
@@ -86,23 +84,6 @@ class KubernetesClientWithInvokerAgent(config: KubernetesClientConfig =
       .map(_.discardEntityBytes())
   }
 
-  override def forwardLogs(container: KubernetesContainer,
-                           lastOffset: Long,
-                           sizeLimit: ByteSize,
-                           sentinelledLogs: Boolean,
-                           additionalMetadata: Map[String, JsValue],
-                           augmentedActivation: JsObject)(implicit transid: TransactionId): Future[Long] = {
-    val serializedData = Map(
-      "lastOffset" -> JsNumber(lastOffset),
-      "sizeLimit" -> JsNumber(sizeLimit.toBytes),
-      "sentinelledLogs" -> JsBoolean(sentinelledLogs),
-      "encodedLogLineMetadata" -> JsString(fieldsString(additionalMetadata)),
-      "encodedActivation" -> JsString(augmentedActivation.compactPrint))
-
-    agentCommand("logs", container, Some(serializedData))
-      .flatMap(response => Unmarshal(response.entity).to[String].map(_.toLong))
-  }
-
   override def agentCommand(command: String,
                             container: KubernetesContainer,
                             payload: Option[Map[String, JsValue]] = None): Future[HttpResponse] = {
@@ -138,21 +119,4 @@ trait KubernetesApiWithInvokerAgent extends KubernetesApi {
                    container: KubernetesContainer,
                    payload: Option[Map[String, JsValue]] = None): Future[HttpResponse]
 
-  /**
-   * Forward a section the argument container's stdout/stderr output to an external logging service.
-   *
-   * @param container the container whose logs should be forwarded
-   * @param lastOffset the last offset previously read in the remote log file
-   * @param sizeLimit The maximum number of bytes of log that should be forwarded before truncation
-   * @param sentinelledLogs Should the log forwarder expect a sentinel line at the end of stdout/stderr streams?
-   * @param additionalMetadata Additional metadata that should be injected into every log line
-   * @param augmentedActivation Activation record to be appended to the forwarded log.
-   * @return the last offset read from the remote log file (to be used on next call to forwardLogs)
-   */
-  def forwardLogs(container: KubernetesContainer,
-                  lastOffset: Long,
-                  sizeLimit: ByteSize,
-                  sentinelledLogs: Boolean,
-                  additionalMetadata: Map[String, JsValue],
-                  augmentedActivation: JsObject)(implicit transid: TransactionId): Future[Long]
 }
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainer.scala b/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainer.scala
index f812add784..56e8f10d03 100644
--- a/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainer.scala
+++ b/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesContainer.scala
@@ -19,12 +19,11 @@ package whisk.core.containerpool.kubernetes
 
 import akka.actor.ActorSystem
 import java.time.Instant
-import java.util.concurrent.atomic.{AtomicLong, AtomicReference}
+import java.util.concurrent.atomic.AtomicReference
 import akka.stream.StreamLimitReachedException
 import akka.stream.scaladsl.Framing.FramingException
 import akka.stream.scaladsl.Source
 import akka.util.ByteString
-import spray.json._
 import scala.concurrent.ExecutionContext
 import scala.concurrent.Future
 import scala.concurrent.duration._
@@ -98,9 +97,6 @@ class KubernetesContainer(protected[core] val id: ContainerId,
   /** The last read timestamp in the log file */
   private val lastTimestamp = new AtomicReference[Option[Instant]](None)
 
-  /** The last offset read in the remote log file */
-  private val lastOffset = new AtomicLong(0)
-
   protected val waitForLogs: FiniteDuration = 2.seconds
 
   def suspend()(implicit transid: TransactionId): Future[Unit] = kubernetes.suspend(this)
@@ -114,31 +110,6 @@ class KubernetesContainer(protected[core] val id: ContainerId,
 
   private val stringSentinel = DockerContainer.ActivationSentinel.utf8String
 
-  /**
-   * Request that the activation's log output be forwarded to an external log service (implicit in LogProvider choice).
-   * Additional per log line metadata and the activation record is provided to be optionally included
-   * in the forwarded log entry.
-   *
-   * @param sizeLimit The maximum number of bytes of log that should be forwardewd
-   * @param sentinelledLogs Should the log forwarder expect a sentinel line at the end of stdout/stderr streams?
-   * @param additionalMetadata Additional metadata that should be injected into every log line
-   * @param augmentedActivation Activation record to be appended to the forwarded log.
-   */
-  def forwardLogs(sizeLimit: ByteSize,
-                  sentinelledLogs: Boolean,
-                  additionalMetadata: Map[String, JsValue],
-                  augmentedActivation: JsObject)(implicit transid: TransactionId): Future[Unit] = {
-    kubernetes match {
-      case client: KubernetesApiWithInvokerAgent => {
-        client
-          .forwardLogs(this, lastOffset.get, sizeLimit, sentinelledLogs, additionalMetadata, augmentedActivation)
-          .map(newOffset => lastOffset.set(newOffset))
-      }
-      case _ =>
-        Future.failed(new UnsupportedOperationException("forwardLogs requires whisk.kubernetes.invokerAgent.enabled"))
-    }
-  }
-
   def logs(limit: ByteSize, waitForSentinel: Boolean)(implicit transid: TransactionId): Source[ByteString, Any] = {
 
     kubernetes
diff --git a/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesInvokerAgentLogStore.scala b/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesInvokerAgentLogStore.scala
deleted file mode 100644
index 63e6cb070d..0000000000
--- a/core/invoker/src/main/scala/whisk/core/containerpool/kubernetes/KubernetesInvokerAgentLogStore.scala
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package whisk.core.containerpool.kubernetes
-
-import akka.actor.ActorSystem
-import akka.stream.ActorMaterializer
-import spray.json._
-import spray.json.DefaultJsonProtocol._
-import whisk.common.TransactionId
-import whisk.core.containerpool.Container
-import whisk.core.containerpool.logging.{LogCollectingException, LogDriverLogStore, LogStore, LogStoreProvider}
-import whisk.core.entity.{ActivationLogs, ExecutableWhiskAction, Identity, WhiskActivation}
-
-import scala.concurrent.{ExecutionContext, Future}
-
-/**
- * A LogStore implementation for Kubernetes that delegates all log processing to a remote invokerAgent that
- * runs on the worker node where the user container is executing.  The remote invokerAgent will read container logs,
- * enrich them with the activation-specific metadata it is provided, and consolidate them into a remote
- * combined log file that can be processed asynchronously by log forwarding services.
- *
- * Logs are never processed by the invoker itself and therefore are not stored in the activation record;
- * collectLogs will return an empty ActivationLogs.
- */
-class KubernetesInvokerAgentLogStore(system: ActorSystem) extends LogDriverLogStore(system) {
-  implicit val ec: ExecutionContext = system.dispatcher
-  implicit val mat: ActorMaterializer = ActorMaterializer()(system)
-
-  override def collectLogs(transid: TransactionId,
-                           user: Identity,
-                           activation: WhiskActivation,
-                           container: Container,
-                           action: ExecutableWhiskAction): Future[ActivationLogs] = {
-
-    val sizeLimit = action.limits.logs.asMegaBytes
-    val sentinelledLogs = action.exec.sentinelledLogs
-
-    // Add the userId field to every written record, so any background process can properly correlate.
-    val userIdField = Map("namespaceId" -> user.namespace.uuid.toJson)
-
-    val additionalMetadata = Map(
-      "activationId" -> activation.activationId.asString.toJson,
-      "action" -> action.fullyQualifiedName(false).asString.toJson) ++ userIdField
-
-    val augmentedActivation = JsObject(activation.toJson.fields ++ userIdField)
-
-    container match {
-      case kc: KubernetesContainer => {
-        kc.forwardLogs(sizeLimit, sentinelledLogs, additionalMetadata, augmentedActivation)(transid)
-          .map { _ =>
-            ActivationLogs()
-          }
-      }
-      case _ => Future.failed(LogCollectingException(ActivationLogs()))
-    }
-  }
-}
-
-object KubernetesInvokerAgentLogStoreProvider extends LogStoreProvider {
-  override def instance(actorSystem: ActorSystem): LogStore = new KubernetesInvokerAgentLogStore(actorSystem)
-}
diff --git a/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala b/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala
index bd3c948410..2354bc9d18 100644
--- a/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala
+++ b/tests/src/test/scala/whisk/core/containerpool/kubernetes/test/KubernetesContainerTests.scala
@@ -50,7 +50,6 @@ import whisk.core.entity.ActivationResponse.Timeout
 import whisk.core.entity.size._
 import whisk.http.Messages
 import whisk.core.containerpool.docker.test.DockerContainerTests._
-import whisk.core.containerpool.kubernetes.test.KubernetesClientTests.TestKubernetesClientWithInvokerAgent
 
 import scala.collection.{immutable, mutable}
 
@@ -296,26 +295,6 @@ class KubernetesContainerTests
     end.token shouldBe INVOKER_ACTIVATION_RUN.asFinish
   }
 
-  /*
-   * LOG FORWARDING
-   */
-  it should "container should maintain lastOffset across calls to forwardLogs" in {
-    implicit val kubernetes = new TestKubernetesClientWithInvokerAgent
-    val id = ContainerId("id")
-    val container = new KubernetesContainer(id, ContainerAddress("ip"), "127.0.0.1", "docker://foo")
-    val logChunk = 10.kilobytes
-
-    await(container.forwardLogs(logChunk, false, Map.empty, JsObject.empty))
-    await(container.forwardLogs(42.bytes, false, Map.empty, JsObject.empty))
-    await(container.forwardLogs(logChunk, false, Map.empty, JsObject.empty))
-    await(container.forwardLogs(42.bytes, false, Map.empty, JsObject.empty))
-
-    kubernetes.forwardLogs(0) shouldBe (id, 0)
-    kubernetes.forwardLogs(1) shouldBe (id, logChunk.toBytes)
-    kubernetes.forwardLogs(2) shouldBe (id, logChunk.toBytes + 42)
-    kubernetes.forwardLogs(3) shouldBe (id, 2 * logChunk.toBytes + 42)
-  }
-
   /*
    * LOGS
    */


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services