You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@openwhisk.apache.org by cb...@apache.org on 2018/01/08 11:53:49 UTC
[incubator-openwhisk] branch master updated: Use alpakka's
FileRotator instead of the copied one. (#3102)
This is an automated email from the ASF dual-hosted git repository.
cbickel pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new b485c5e Use alpakka's FileRotator instead of the copied one. (#3102)
b485c5e is described below
commit b485c5e249ed45e0395f2e8056972adb737f0353
Author: Markus Thömmes <ma...@me.com>
AuthorDate: Mon Jan 8 12:53:45 2018 +0100
Use alpakka's FileRotator instead of the copied one. (#3102)
---
common/scala/build.gradle | 2 +
.../containerpool/logging/LogRotatorSink.scala | 176 ---------------------
core/invoker/build.gradle | 1 -
3 files changed, 2 insertions(+), 177 deletions(-)
diff --git a/common/scala/build.gradle b/common/scala/build.gradle
index 31781f4..2c04731 100644
--- a/common/scala/build.gradle
+++ b/common/scala/build.gradle
@@ -21,6 +21,8 @@ dependencies {
compile 'com.typesafe.akka:akka-http-core_2.11:10.0.10'
compile 'com.typesafe.akka:akka-http-spray-json_2.11:10.0.10'
+ compile 'com.lightbend.akka:akka-stream-alpakka-file_2.11:0.15'
+
compile 'ch.qos.logback:logback-classic:1.2.3'
compile 'org.slf4j:jcl-over-slf4j:1.7.25'
compile 'org.slf4j:log4j-over-slf4j:1.7.25'
diff --git a/common/scala/src/main/scala/whisk/core/containerpool/logging/LogRotatorSink.scala b/common/scala/src/main/scala/whisk/core/containerpool/logging/LogRotatorSink.scala
deleted file mode 100644
index 6c5681b..0000000
--- a/common/scala/src/main/scala/whisk/core/containerpool/logging/LogRotatorSink.scala
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-// TO BE TAKEN OUT AFTER ALPAKKA 0.15 RELEASE
-
-/*
- * Copyright (C) 2016-2017 Lightbend Inc. <http://www.lightbend.com>
- */
-
-package akka.stream.alpakka.file.scaladsl
-
-import java.nio.file.{OpenOption, Path, StandardOpenOption}
-
-import akka.Done
-import akka.stream.ActorAttributes.SupervisionStrategy
-import akka.stream._
-import akka.stream.impl.fusing.MapAsync.{Holder, NotYetThere}
-import akka.stream.scaladsl.{FileIO, Sink, Source}
-import akka.stream.stage._
-import akka.util.ByteString
-
-import scala.concurrent.{ExecutionContext, Future, Promise}
-import scala.util.{Failure, Success}
-
-object LogRotatorSink {
- def apply(functionGeneratorFunction: () => ByteString => Option[Path],
- fileOpenOptions: Set[OpenOption] = Set(StandardOpenOption.APPEND, StandardOpenOption.CREATE))
- : Sink[ByteString, Future[Done]] =
- Sink.fromGraph(new LogRotatorSink(functionGeneratorFunction, fileOpenOptions))
-}
-
-final private[scaladsl] class LogRotatorSink(functionGeneratorFunction: () => ByteString => Option[Path],
- fileOpenOptions: Set[OpenOption])
- extends GraphStageWithMaterializedValue[SinkShape[ByteString], Future[Done]] {
-
- val in = Inlet[ByteString]("FRotator.in")
- override val shape = SinkShape.of(in)
-
- override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[Done]) = {
- val promise = Promise[Done]()
- val logic = new GraphStageLogic(shape) {
- val pathGeneratorFunction: ByteString => Option[Path] = functionGeneratorFunction()
- var sourceOut: SubSourceOutlet[ByteString] = _
- var fileSinkCompleted: Seq[Future[IOResult]] = Seq.empty
- val decider =
- inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider)
-
- def failThisStage(ex: Throwable): Unit =
- if (!promise.isCompleted) {
- if (sourceOut != null) {
- sourceOut.fail(ex)
- }
- cancel(in)
- promise.failure(ex)
- }
-
- def generatePathOrFailPeacefully(data: ByteString): Option[Path] = {
- var ret = Option.empty[Path]
- try {
- ret = pathGeneratorFunction(data)
- } catch {
- case ex: Throwable =>
- failThisStage(ex)
- }
- ret
- }
-
- def fileSinkFutureCallbackHandler(future: Future[IOResult])(h: Holder[IOResult]): Unit =
- h.elem match {
- case Success(IOResult(_, Failure(ex))) if decider(ex) == Supervision.Stop =>
- promise.failure(ex)
- case Success(x) if fileSinkCompleted.size == 1 && fileSinkCompleted.head == future =>
- promise.trySuccess(Done)
- completeStage()
- case x: Success[IOResult] =>
- fileSinkCompleted = fileSinkCompleted.filter(_ != future)
- case Failure(ex) =>
- failThisStage(ex)
- case _ =>
- }
-
- //init stage where we are waiting for the first path
- setHandler(
- in,
- new InHandler {
- override def onPush(): Unit = {
- val data = grab(in)
- val pathO = generatePathOrFailPeacefully(data)
- pathO.fold(if (!isClosed(in)) pull(in))(switchPath(_, data))
- }
-
- override def onUpstreamFinish(): Unit =
- completeStage()
-
- override def onUpstreamFailure(ex: Throwable): Unit =
- failThisStage(ex)
- })
-
- //we must pull the first element cos we are a sink
- override def preStart(): Unit = {
- super.preStart()
- pull(in)
- }
-
- def futureCB(newFuture: Future[IOResult]) =
- getAsyncCallback[Holder[IOResult]](fileSinkFutureCallbackHandler(newFuture))
-
- //we recreate the tail of the stream, and emit the data for the next req
- def switchPath(path: Path, data: ByteString): Unit = {
- val prevOut = Option(sourceOut)
-
- sourceOut = new SubSourceOutlet[ByteString]("FRotatorSource")
- sourceOut.setHandler(new OutHandler {
- override def onPull(): Unit = {
- sourceOut.push(data)
- switchToNormalMode()
- }
- })
- val newFuture = Source
- .fromGraph(sourceOut.source)
- .runWith(FileIO.toPath(path, fileOpenOptions))(interpreter.subFusingMaterializer)
-
- fileSinkCompleted = fileSinkCompleted :+ newFuture
-
- val holder = new Holder[IOResult](NotYetThere, futureCB(newFuture))
-
- newFuture.onComplete(holder)(akka.dispatch.ExecutionContexts.sameThreadExecutionContext)
-
- prevOut.foreach(_.complete())
- }
-
- //we change path if needed or push the grabbed data
- def switchToNormalMode(): Unit = {
- setHandler(
- in,
- new InHandler {
- override def onPush(): Unit = {
- val data = grab(in)
- val pathO = generatePathOrFailPeacefully(data)
- pathO.fold(sourceOut.push(data))(switchPath(_, data))
- }
-
- override def onUpstreamFinish(): Unit = {
- implicit val executionContext: ExecutionContext =
- akka.dispatch.ExecutionContexts.sameThreadExecutionContext
- promise.completeWith(Future.sequence(fileSinkCompleted).map(_ => Done))
- sourceOut.complete()
- }
-
- override def onUpstreamFailure(ex: Throwable): Unit =
- failThisStage(ex)
- })
- sourceOut.setHandler(new OutHandler {
- override def onPull(): Unit =
- pull(in)
- })
- }
- }
- (logic, promise.future)
- }
-
-}
diff --git a/core/invoker/build.gradle b/core/invoker/build.gradle
index c0108a0..4a9ceff 100644
--- a/core/invoker/build.gradle
+++ b/core/invoker/build.gradle
@@ -16,7 +16,6 @@ dependencies {
compile 'org.apache.curator:curator-recipes:4.0.0', { exclude group: 'org.apache.zookeeper', module:'zookeeper' }
compile 'org.apache.zookeeper:zookeeper:3.4.11'
- compile 'com.lightbend.akka:akka-stream-alpakka-file_2.11:0.14'
}
tasks.withType(ScalaCompile) {
--
To stop receiving notification emails like this one, please contact
['"commits@openwhisk.apache.org" <co...@openwhisk.apache.org>'].