You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2016/10/11 02:33:21 UTC
[4/5] incubator-gearpump git commit: Fixes #22 support akka-streams
Gearpump Materializer
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/akka/stream/gearpump/materializer/LocalMaterializerImpl.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/materializer/LocalMaterializerImpl.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/materializer/LocalMaterializerImpl.scala
deleted file mode 100644
index 1ec724e..0000000
--- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/materializer/LocalMaterializerImpl.scala
+++ /dev/null
@@ -1,284 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package akka.stream.gearpump.materializer
-
-import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
-
-import akka.actor.{ActorRef, ActorSystem}
-import akka.dispatch.Dispatchers
-import akka.stream.ModuleGraph.{Edge, MaterializedValueSourceAttribute}
-import akka.stream.actor.ActorSubscriber
-import akka.stream.gearpump.materializer.LocalMaterializerImpl.MaterializedModule
-import akka.stream.gearpump.module.ReduceModule
-import akka.stream.gearpump.util.MaterializedValueOps
-import akka.stream.impl.Stages.{DirectProcessor, Fold, StageModule}
-import akka.stream.impl.StreamLayout.Module
-import akka.stream.impl.{ActorProcessorFactory, ActorPublisher, ExposedPublisher, FanIn, FanOut, SinkModule, SourceModule, VirtualProcessor}
-import akka.stream.{ActorMaterializerSettings, Attributes, Graph => AkkaGraph, InPort, MaterializationContext, Materializer, OutPort, Shape}
-import org.reactivestreams.{Processor, Publisher, Subscriber}
-
-import org.apache.gearpump.util.Graph
-
-/**
- * This materializer is functional equivalent to [[akka.stream.impl.ActorMaterializerImpl]]
- *
- * @param system
- * @param settings
- * @param dispatchers
- * @param supervisor
- * @param haveShutDown
- * @param flowNameCounter
- * @param namePrefix
- * @param optimizations
- */
-class LocalMaterializerImpl (
- system: ActorSystem,
- settings: ActorMaterializerSettings,
- dispatchers: Dispatchers,
- supervisor: ActorRef,
- haveShutDown: AtomicBoolean,
- flowNameCounter: AtomicLong,
- namePrefix: String,
- optimizations: Optimizations)
- extends LocalMaterializer(
- system, settings, dispatchers, supervisor,
- haveShutDown, flowNameCounter, namePrefix, optimizations) {
-
- override def materialize(graph: Graph[Module, Edge], inputMatValues: Map[Module, Any]): Map[Module, Any] = {
- val materializedGraph = graph.mapVertex { module =>
- materializeAtomic(module)
- }
-
- materializedGraph.edges.foreach { nodeEdgeNode =>
- val (node1, edge, node2) = nodeEdgeNode
- val from = edge.from
- val to = edge.to
- val publisher = node1.outputs(from).asInstanceOf[Publisher[Any]]
- val subscriber = node2.inputs(to).asInstanceOf[Subscriber[Any]]
- publisher.subscribe(subscriber)
- }
-
- val matValues = inputMatValues ++ materializedGraph.vertices.map { vertex =>
- (vertex.module, vertex.matValue)
- }.toMap
-
- val matValueSources = materializedGraph.vertices.filter(_.module.isInstanceOf[MaterializedValueSource[_]])
- publishToMaterializedValueSource(matValueSources, matValues)
-
- matValues
- }
-
- private def publishToMaterializedValueSource(modules: List[MaterializedModule], matValues: Map[Module, Any]) = {
- modules.foreach { module =>
- val source = module.module.asInstanceOf[MaterializedValueSource[_]]
- val attr = source.attributes.getAttribute(classOf[MaterializedValueSourceAttribute], null)
-
- Option(attr).map { attr =>
- val valueToPublish = MaterializedValueOps(attr.mat).resolve[Any](matValues)
- module.outputs.foreach { portAndPublisher =>
- val (port, publisher) = portAndPublisher
- publisher match {
- case valuePublisher: MaterializedValuePublisher =>
- valuePublisher.setValue(valueToPublish)
- }
- }
- }
- }
- }
-
- private[this] def nextFlowNameCount(): Long = flowNameCounter.incrementAndGet()
-
- private[this] def createFlowName(): String = s"$namePrefix-${nextFlowNameCount()}"
-
- val flowName = createFlowName()
- var nextId = 0
-
- def stageName(attr: Attributes): String = {
- val name = s"$flowName-$nextId-${attr.nameOrDefault()}"
- nextId += 1
- name
- }
-
- private def materializeAtomic(atomic: Module): MaterializedModule = {
- val effectiveAttributes = atomic.attributes
-
- def newMaterializationContext() =
- new MaterializationContext(LocalMaterializerImpl.this, effectiveAttributes, stageName(effectiveAttributes))
-
- atomic match {
- case matValue: MaterializedValueSource[_] =>
- val pub = new MaterializedValuePublisher
- val outputs = Map[OutPort, Publisher[_]](matValue.shape.outlet -> pub)
- MaterializedModule(matValue, (), outputs = outputs)
- case sink: SinkModule[_, _] =>
- val (sub, mat) = sink.create(newMaterializationContext())
- val inputs = Map[InPort, Subscriber[_]](sink.shape.inlet -> sub)
- MaterializedModule(sink, mat, inputs)
- case source: SourceModule[_, _] =>
- val (pub, mat) = source.create(newMaterializationContext())
- val outputs = Map[OutPort, Publisher[_]](source.shape.outlet -> pub)
- MaterializedModule(source, mat, outputs = outputs)
-
- case reduce: ReduceModule[Any] =>
- //TODO: remove this after the official akka-stream API support the Reduce Module
- val stage = LocalMaterializerImpl.toFoldModule(reduce)
- val (processor, mat) = processorFor(stage, effectiveAttributes, effectiveSettings(effectiveAttributes))
- val inputs = Map[InPort, Subscriber[_]](stage.inPort -> processor)
- val outputs = Map[OutPort, Publisher[_]](stage.outPort -> processor)
- MaterializedModule(stage, mat, inputs, outputs)
-
- case stage: StageModule =>
- val (processor, mat) = processorFor(stage, effectiveAttributes, effectiveSettings(effectiveAttributes))
- val inputs = Map[InPort, Subscriber[_]](stage.inPort -> processor)
- val outputs = Map[OutPort, Publisher[_]](stage.outPort -> processor)
- MaterializedModule(stage, mat, inputs, outputs)
- case tls: TlsModule => // TODO solve this so TlsModule doesn't need special treatment here
- val es = effectiveSettings(effectiveAttributes)
- val props =
- SslTlsCipherActor.props(es, tls.sslContext, tls.firstSession, tracing = false, tls.role, tls.closing)
- val impl = actorOf(props, stageName(effectiveAttributes), es.dispatcher)
- def factory(id: Int) = new ActorPublisher[Any](impl) {
- override val wakeUpMsg = FanOut.SubstreamSubscribePending(id)
- }
- val publishers = Vector.tabulate(2)(factory)
- impl ! FanOut.ExposedPublishers(publishers)
-
- val inputs = Map[InPort, Subscriber[_]](
- tls.plainIn -> FanIn.SubInput[Any](impl, SslTlsCipherActor.UserIn),
- tls.cipherIn -> FanIn.SubInput[Any](impl, SslTlsCipherActor.TransportIn))
-
- val outputs = Map[OutPort, Publisher[_]](
- tls.plainOut -> publishers(SslTlsCipherActor.UserOut),
- tls.cipherOut -> publishers(SslTlsCipherActor.TransportOut))
- MaterializedModule(tls, (), inputs, outputs)
-
- case junction: JunctionModule =>
- materializeJunction(junction, effectiveAttributes, effectiveSettings(effectiveAttributes))
- }
- }
-
- private def processorFor(op: StageModule,
- effectiveAttributes: Attributes,
- effectiveSettings: ActorMaterializerSettings): (Processor[Any, Any], Any) = op match {
- case DirectProcessor(processorFactory, _) => processorFactory()
- case Identity(attr) => (new VirtualProcessor, ())
- case _ =>
- val (opprops, mat) = ActorProcessorFactory.props(LocalMaterializerImpl.this, op, effectiveAttributes)
- ActorProcessorFactory[Any, Any](
- actorOf(opprops, stageName(effectiveAttributes), effectiveSettings.dispatcher)) -> mat
- }
-
- private def materializeJunction(
- op: JunctionModule,
- effectiveAttributes: Attributes,
- effectiveSettings: ActorMaterializerSettings): MaterializedModule = {
- op match {
- case fanin: FanInModule =>
- val (props, inputs, output) = fanin match {
-
- case MergeModule(shape, _) =>
- (FairMerge.props(effectiveSettings, shape.inSeq.size), shape.inSeq, shape.out)
-
- case f: FlexiMergeModule[_, Shape] =>
- val flexi = f.flexi(f.shape)
- val shape: Shape = f.shape
- (FlexiMerge.props(effectiveSettings, f.shape, flexi), shape.inlets, shape.outlets.head)
-
- case MergePreferredModule(shape, _) =>
- (UnfairMerge.props(effectiveSettings, shape.inlets.size), shape.preferred +: shape.inSeq, shape.out)
-
- case ConcatModule(shape, _) =>
- require(shape.inSeq.size == 2, "currently only supporting concatenation of exactly two inputs") // TODO
- (Concat.props(effectiveSettings), shape.inSeq, shape.out)
-
- case zip: ZipWithModule =>
- (zip.props(effectiveSettings), zip.shape.inlets, zip.outPorts.head)
- }
-
- val impl = actorOf(props, stageName(effectiveAttributes), effectiveSettings.dispatcher)
- val publisher = new ActorPublisher[Any](impl)
- // Resolve cyclic dependency with actor. This MUST be the first message no matter what.
- impl ! ExposedPublisher(publisher)
-
- val inputMapping: Map[InPort, Subscriber[_]] = inputs.zipWithIndex.map { pair =>
- val (in, id) = pair
- (in, FanIn.SubInput[Any](impl, id))
- }.toMap
-
- val outMapping = Map(output -> publisher)
- MaterializedModule(fanin, (), inputMapping, outMapping)
-
- case fanout: FanOutModule =>
- val (props, in, outs) = fanout match {
-
- case r: FlexiRouteModule[t, Shape] =>
- val flexi = r.flexi(r.shape)
- val shape: Shape = r.shape
- (FlexiRoute.props(effectiveSettings, r.shape, flexi), shape.inlets.head: InPort, r.shape.outlets)
-
- case BroadcastModule(shape, eagerCancel, _) =>
- (Broadcast.props(effectiveSettings, eagerCancel, shape.outArray.size), shape.in, shape.outArray.toSeq)
-
- case BalanceModule(shape, waitForDownstreams, _) =>
- (Balance.props(effectiveSettings, shape.outArray.size, waitForDownstreams), shape.in, shape.outArray.toSeq)
-
- case unzip: UnzipWithModule =>
- (unzip.props(effectiveSettings), unzip.inPorts.head, unzip.shape.outlets)
- }
- val impl = actorOf(props, stageName(effectiveAttributes), effectiveSettings.dispatcher)
- val size = outs.size
- def factory(id: Int) =
- new ActorPublisher[Any](impl) {
- override val wakeUpMsg = FanOut.SubstreamSubscribePending(id)
- }
- val publishers =
- if (outs.size < 8) Vector.tabulate(size)(factory)
- else List.tabulate(size)(factory)
-
- impl ! FanOut.ExposedPublishers(publishers)
- val outputs: Map[OutPort, Publisher[_]] = publishers.iterator.zip(outs.iterator).map { case (pub, out) =>
- (out, pub)
- }.toMap
-
- val inputs: Map[InPort, Subscriber[_]] = Map(in -> ActorSubscriber[Any](impl))
- MaterializedModule(fanout, (), inputs, outputs)
- }
- }
-
- override def withNamePrefix(name: String): Materializer = {
- new LocalMaterializerImpl(system, settings, dispatchers, supervisor,
- haveShutDown, flowNameCounter, namePrefix = name, optimizations)
- }
-}
-
-object LocalMaterializerImpl {
- case class MaterializedModule(val module: Module, val matValue: Any, inputs: Map[InPort, Subscriber[_]] = Map.empty[InPort, Subscriber[_]], outputs: Map[OutPort, Publisher[_]] = Map.empty[OutPort, Publisher[_]])
-
- def toFoldModule(reduce: ReduceModule[Any]): Fold = {
- val f = reduce.f
- val aggregator = { (zero: Any, input: Any) =>
- if (zero == null) {
- input
- } else {
- f(zero, input)
- }
- }
- new Fold(null, aggregator)
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/akka/stream/gearpump/materializer/RemoteMaterializerImpl.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/materializer/RemoteMaterializerImpl.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/materializer/RemoteMaterializerImpl.scala
deleted file mode 100644
index 47ed1f2..0000000
--- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/materializer/RemoteMaterializerImpl.scala
+++ /dev/null
@@ -1,453 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package akka.stream.gearpump.materializer
-
-import akka.actor.ActorSystem
-import akka.stream.ModuleGraph.Edge
-import akka.stream.gearpump.GearAttributes
-import akka.stream.gearpump.module.{GroupByModule, ProcessorModule, ReduceModule, SinkBridgeModule, SinkTaskModule, SourceBridgeModule, SourceTaskModule}
-import akka.stream.gearpump.task.{BalanceTask, BroadcastTask, GraphTask, SinkBridgeTask, SourceBridgeTask, UnZip2Task}
-import akka.stream.impl.Stages
-import akka.stream.impl.Stages.StageModule
-import akka.stream.impl.StreamLayout.Module
-import org.slf4j.LoggerFactory
-
-import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.dsl.StreamApp
-import org.apache.gearpump.streaming.dsl.op.{DataSinkOp, DataSourceOp, Direct, FlatMapOp, GroupByOp, MasterOp, MergeOp, Op, OpEdge, ProcessorOp, Shuffle, SlaveOp}
-import org.apache.gearpump.streaming.{ProcessorId, StreamApplication}
-import org.apache.gearpump.util.Graph
-
-/**
- * [[RemoteMaterializerImpl]] will materialize the [[Graph[Module, Edge]] to a Gearpump
- * Streaming Application.
- *
- * @param graph
- * @param system
- */
-class RemoteMaterializerImpl(graph: Graph[Module, Edge], system: ActorSystem) {
-
- import RemoteMaterializerImpl._
-
- type Clue = String
- private implicit val actorSystem = system
-
- private def uuid: String = {
- java.util.UUID.randomUUID.toString
- }
-
- /**
- * @return a mapping from Module to Materialized Processor Id.
- */
- def materialize: (StreamApplication, Map[Module, ProcessorId]) = {
- val (opGraph, clues) = toOpGraph()
- val app: StreamApplication = new StreamApp("app", system, UserConfig.empty, opGraph)
- val processorIds = resolveClues(app, clues)
-
- val updatedApp = updateJunctionConfig(processorIds, app)
- (cleanClues(updatedApp), processorIds)
- }
-
- private def updateJunctionConfig(processorIds: Map[Module, ProcessorId], app: StreamApplication): StreamApplication = {
- val config = junctionConfig(processorIds)
-
- val dag = app.dag.mapVertex { vertex =>
- val processorId = vertex.id
- val newConf = vertex.taskConf.withConfig(config(processorId))
- vertex.copy(taskConf = newConf)
- }
- new StreamApplication(app.name, app.inputUserConfig, dag)
- }
-
- /**
- * Update junction config so that each GraphTask know its upstream and downstream.
- * @param processorIds
- * @return
- */
- private def junctionConfig(processorIds: Map[Module, ProcessorId]): Map[ProcessorId, UserConfig] = {
- val updatedConfigs = graph.vertices.map { vertex =>
- val processorId = processorIds(vertex)
- vertex match {
- case junction: JunctionModule =>
- val inProcessors = junction.shape.inlets.map { inlet =>
- val upstreamModule = graph.incomingEdgesOf(junction).find(_._2.to == inlet).map(_._1)
- val upstreamProcessorId = processorIds(upstreamModule.get)
- upstreamProcessorId
- }.toList
-
- val outProcessors = junction.shape.outlets.map { outlet =>
- val downstreamModule = graph.outgoingEdgesOf(junction).find(_._2.from == outlet).map(_._3)
- val downstreamProcessorId = downstreamModule.map(processorIds(_))
- downstreamProcessorId.get
- }.toList
-
- (processorId, UserConfig.empty.withValue(GraphTask.OUT_PROCESSORS, outProcessors)
- .withValue(GraphTask.IN_PROCESSORS, inProcessors))
- case _ =>
- (processorId, UserConfig.empty)
- }
- }.toMap
- updatedConfigs
- }
-
- private def resolveClues(app: StreamApplication, clues: Map[Module, Clue]): Map[Module, ProcessorId] = {
- clues.flatMap { kv =>
- val (module, clue) = kv
- val processorId = app.dag.vertices.find { processor =>
- processor.taskConf.getString(clue).isDefined
- }.map(_.id)
- processorId.map((module, _))
- }
- }
-
- private def cleanClues(app: StreamApplication): StreamApplication = {
- val graph = app.dag.mapVertex { processor =>
- val conf = cleanClue(processor.taskConf)
- processor.copy(taskConf = conf)
- }
- new StreamApplication(app.name, app.inputUserConfig, graph)
- }
-
- private def cleanClue(conf: UserConfig): UserConfig = {
- conf.filter { kv =>
- kv._2 != RemoteMaterializerImpl.STAINS
- }
- }
-
- private def toOpGraph(): (Graph[Op, OpEdge], Map[Module, Clue]) = {
- var matValues = Map.empty[Module, Clue]
- val opGraph = graph.mapVertex{ module =>
- val name = uuid
- val conf = UserConfig.empty.withString(name, RemoteMaterializerImpl.STAINS)
- matValues += module -> name
- val parallelism = GearAttributes.count(module.attributes)
- val op = module match {
- case source: SourceTaskModule[t] =>
- val updatedConf = conf.withConfig(source.conf)
- new DataSourceOp[t](source.source, parallelism, updatedConf, "source")
- case sink: SinkTaskModule[t] =>
- val updatedConf = conf.withConfig(sink.conf)
- new DataSinkOp[t](sink.sink, parallelism, updatedConf, "sink")
- case sourceBridge: SourceBridgeModule[_, _] =>
- new ProcessorOp(classOf[SourceBridgeTask], parallelism = 1, conf, "source")
- case processor: ProcessorModule[_, _, _] =>
- val updatedConf = conf.withConfig(processor.conf)
- new ProcessorOp(processor.processor, parallelism, updatedConf, "source")
- case sinkBridge: SinkBridgeModule[_, _] =>
- new ProcessorOp(classOf[SinkBridgeTask], parallelism, conf, "sink")
- case groupBy: GroupByModule[t, g] =>
- new GroupByOp[t, g](groupBy.groupBy, parallelism, "groupBy", conf)
- case reduce: ReduceModule[Any] =>
- reduceOp(reduce.f, conf)
- case stage: StageModule =>
- translateStage(stage, conf)
- case fanIn: FanInModule =>
- translateFanIn(fanIn, graph.incomingEdgesOf(fanIn), parallelism, conf)
- case fanOut: FanOutModule =>
- translateFanOut(fanOut, graph.outgoingEdgesOf(fanOut), parallelism, conf)
- }
-
- if (op == null) {
- throw new UnsupportedOperationException(module.getClass.toString + " is not supported with RemoteMaterializer")
- }
- op
- }.mapEdge[OpEdge]{(n1, edge, n2) =>
- n2 match {
- case master: MasterOp =>
- Shuffle
- case slave: SlaveOp[_] if n1.isInstanceOf[ProcessorOp[_]] =>
- Shuffle
- case slave: SlaveOp[_] =>
- Direct
- }
- }
- (opGraph, matValues)
- }
-
- private def translateStage(module: StageModule, conf: UserConfig): Op = {
- module match {
- case buffer: Stages.Buffer =>
- //ignore the buffering operation
- identity("buffer", conf)
- case collect: Stages.Collect =>
- collectOp(collect.pf, conf)
- case concatAll: Stages.ConcatAll =>
- //TODO:
- null
- case conflat: Stages.Conflate =>
- conflatOp(conflat.seed, conflat.aggregate, conf)
- case drop: Stages.Drop =>
- dropOp(drop.n, conf)
- case dropWhile: Stages.DropWhile =>
- dropWhileOp(dropWhile.p, conf)
- case expand: Stages.Expand =>
- //TODO
- null
- case filter: Stages.Filter =>
- filterOp(filter.p, conf)
- case fold: Stages.Fold =>
- foldOp(fold.zero, fold.f, conf)
- case groupBy: Stages.GroupBy =>
- //TODO
- null
- case grouped: Stages.Grouped =>
- groupedOp(grouped.n, conf)
- case _: Stages.Identity =>
- identity("identity", conf)
- case log: Stages.Log =>
- logOp(log.name, log.extract, conf)
- case map: Stages.Map =>
- mapOp(map.f, conf)
- case mapAsync: Stages.MapAsync =>
- //TODO
- null
- case mapAsync: Stages.MapAsyncUnordered =>
- //TODO
- null
- case flatMap: Stages.MapConcat =>
- flatMapOp(flatMap.f, "mapConcat", conf)
- case stage: MaterializingStageFactory =>
- //TODO
- null
- case prefixAndTail: Stages.PrefixAndTail =>
- //TODO
- null
- case recover: Stages.Recover =>
- //TODO: we will just ignore this
- identity("recover", conf)
- case scan: Stages.Scan =>
- scanOp(scan.zero, scan.f, conf)
- case split: Stages.Split =>
- //TODO
- null
- case stage: Stages.StageFactory =>
- //TODO
- null
- case take: Stages.Take =>
- takeOp(take.n, conf)
- case takeWhile: Stages.TakeWhile =>
- filterOp(takeWhile.p, conf)
- case time: Stages.TimerTransform =>
- //TODO
- null
- }
- }
-
- private def translateFanIn(
- fanIn: FanInModule,
- edges: List[(Module, Edge, Module)],
- parallelism: Int,
- conf: UserConfig): Op = {
- fanIn match {
- case merge: MergeModule[_] =>
- MergeOp("merge", conf)
- case mergePrefered: MergePreferredModule[_] =>
- //TODO, support "prefer" merge
- MergeOp("mergePrefered", conf)
- case zip: ZipWithModule =>
- //TODO: support zip module
- null
- case concat: ConcatModule[_] =>
- //TODO: support concat module
- null
- case flexiMerge: FlexiMergeModule[_, _] =>
- //TODO: Suport flexi merge module
- null
- }
- }
-
- private def translateFanOut(
- fanOut: FanOutModule,
- edges: List[(Module, Edge, Module)],
- parallelism: Int,
- conf: UserConfig): Op = {
- fanOut match {
- case unzip2: UnzipWith2Module[Any, Any, Any] =>
- val updatedConf = conf.withValue(UnZip2Task.UNZIP2_FUNCTION, new UnZip2Task.UnZipFunction(unzip2.f))
- new ProcessorOp(classOf[UnZip2Task], parallelism, updatedConf, "unzip")
- case broadcast: BroadcastModule[_] =>
- new ProcessorOp(classOf[BroadcastTask], parallelism, conf, "broadcast")
- case broadcast: BalanceModule[_] =>
- new ProcessorOp(classOf[BalanceTask], parallelism, conf, "balance")
- case flexi: FlexiRouteImpl[_, _] =>
- //TODO
- null
- }
- }
-}
-
-object RemoteMaterializerImpl {
- final val NotApplied: Any => Any = _ => NotApplied
-
- def collectOp(collect: PartialFunction[Any, Any], conf: UserConfig): Op = {
- flatMapOp({ data =>
- collect.applyOrElse(data, NotApplied) match {
- case NotApplied => None
- case result: Any => Option(result)
- }
- }, "collect", conf)
- }
-
- def filterOp(filter: Any => Boolean, conf: UserConfig): Op = {
- flatMapOp({ data =>
- if (filter(data)) Option(data) else None
- }, "filter", conf)
- }
-
- def reduceOp(reduce: (Any, Any) => Any, conf: UserConfig): Op = {
- var result: Any = null
- val flatMap = { elem: Any =>
- if (result == null) {
- result = elem
- } else {
- result = reduce(result, elem)
- }
- List(result)
- }
- flatMapOp(flatMap, "reduce", conf)
- }
-
- def identity(description: String, conf: UserConfig): Op = {
- flatMapOp({ data =>
- List(data)
- }, description, conf)
- }
-
- def mapOp(map: Any => Any, conf: UserConfig): Op = {
- flatMapOp({ data: Any =>
- List(map(data))
- }, "map", conf)
- }
-
- def flatMapOp(flatMap: Any => Iterable[Any], conf: UserConfig): Op = {
- flatMapOp(flatMap, "flatmap", conf)
- }
-
- def flatMapOp(fun: Any => TraversableOnce[Any], description: String, conf: UserConfig): Op = {
- FlatMapOp(fun, description, conf)
- }
-
- def conflatOp(seed: Any => Any, aggregate: (Any, Any) => Any, conf: UserConfig): Op = {
- var agg: Any = null
- val flatMap = { elem: Any =>
- agg = if (agg == null) {
- seed(elem)
- } else {
- aggregate(agg, elem)
- }
- List(agg)
- }
-
- flatMapOp(flatMap, "map", conf)
- }
-
- def foldOp(zero: Any, fold: (Any, Any) => Any, conf: UserConfig): Op = {
- var aggregator: Any = zero
- val map = { elem: Any =>
- aggregator = fold(aggregator, elem)
- List(aggregator)
- }
- flatMapOp(map, "fold", conf)
- }
-
- def groupedOp(count: Int, conf: UserConfig): Op = {
- var left = count
- val buf = {
- val b = Vector.newBuilder[Any]
- b.sizeHint(count)
- b
- }
-
- val flatMap: Any => Iterable[Any] = { input: Any =>
- buf += input
- left -= 1
- if (left == 0) {
- val emit = buf.result()
- buf.clear()
- left = count
- Some(emit)
- } else {
- None
- }
- }
- flatMapOp(flatMap, conf: UserConfig)
- }
-
- def dropOp(number: Long, conf: UserConfig): Op = {
- var left = number
- val flatMap: Any => Iterable[Any] = { input: Any =>
- if (left > 0) {
- left -= 1
- None
- } else {
- Some(input)
- }
- }
- flatMapOp(flatMap, "drop", conf)
- }
-
- def dropWhileOp(drop: Any => Boolean, conf: UserConfig): Op = {
- flatMapOp({ data =>
- if (drop(data)) None else Option(data)
- }, "dropWhile", conf)
- }
-
- def logOp(name: String, extract: Any => Any, conf: UserConfig): Op = {
- val flatMap = { elem: Any =>
- LoggerFactory.getLogger(name).info(s"Element: {${extract(elem)}}")
- List(elem)
- }
- flatMapOp(flatMap, "log", conf)
- }
-
- def scanOp(zero: Any, f: (Any, Any) => Any, conf: UserConfig): Op = {
- var aggregator = zero
- var pushedZero = false
-
- val flatMap = { elem: Any =>
- aggregator = f(aggregator, elem)
-
- if (pushedZero) {
- pushedZero = true
- List(zero, aggregator)
- } else {
- List(aggregator)
- }
- }
- flatMapOp(flatMap, "scan", conf)
- }
-
- def takeOp(count: Long, conf: UserConfig): Op = {
- var left: Long = count
-
- val filter: Any => Iterable[Any] = { elem: Any =>
- left -= 1
- if (left > 0) Some(elem)
- else if (left == 0) Some(elem)
- else None
- }
- flatMapOp(filter, "take", conf)
- }
-
- /**
- * We use stains to track how module maps to Processor
- *
- */
- val STAINS = "track how module is fused to processor"
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/BridgeModule.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/BridgeModule.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/BridgeModule.scala
deleted file mode 100644
index c5dfc9a..0000000
--- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/BridgeModule.scala
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package akka.stream.gearpump.module
-
-import akka.stream.Attributes
-import akka.stream.impl.FlowModule
-import akka.stream.impl.StreamLayout.Module
-import org.reactivestreams.{Publisher, Subscriber}
-
-/**
- *
- *
- * [[IN]] -> [[BridgeModule]] -> [[OUT]]
- * /
- * /
- * out of band data input or output channel [[MAT]]
- *
- *
- * [[BridgeModule]] is used as a bridge between different materializers.
- * Different [[akka.stream.Materializer]]s can use out of band channel to
- * exchange messages.
- *
- * For example:
- *
- * Remote Materializer
- * -----------------------------
- * | |
- * | BridgeModule -> RemoteSink |
- * | / |
- * --/----------------------------
- * Local Materializer / out of band channel.
- * ----------------------/----
- * | Local / |
- * | Source -> BridgeModule |
- * | |
- * ---------------------------
- *
- *
- * Typically [[BridgeModule]] is created implicitly as a temporary intermediate
- * module during materialization.
- *
- * However, user can still declare it explicitly. In this case, it means we have a
- * boundary Source or Sink module which accept out of band channel inputs or
- * outputs.
- *
- *
- * @tparam IN
- * @tparam OUT
- * @tparam MAT
- */
-abstract class BridgeModule[IN, OUT, MAT] extends FlowModule[IN, OUT, MAT] {
- def attributes: Attributes
- def withAttributes(attributes: Attributes): BridgeModule[IN, OUT, MAT]
-
- protected def newInstance: BridgeModule[IN, OUT, MAT]
- override def carbonCopy: Module = newInstance
-}
-
-/**
- *
- * Bridge module which accept out of band channel Input
- * [[org.reactivestreams.Publisher]][IN].
- *
- *
- * [[SourceBridgeModule]] -> [[OUT]]
- * /|
- * /
- * out of band data input [[org.reactivestreams.Publisher]][IN]
- *
- * @see [[BridgeModule]]
- *
- * @param attributes
- * @tparam IN, input data type from out of band [[org.reactivestreams.Publisher]]
- * @tparam OUT out put data type to next module.
- */
-class SourceBridgeModule[IN, OUT](val attributes: Attributes = Attributes.name("sourceBridgeModule")) extends BridgeModule[IN, OUT, Subscriber[IN]] {
- override protected def newInstance: BridgeModule[IN, OUT, Subscriber[IN]] = new SourceBridgeModule[IN, OUT](attributes)
-
- override def withAttributes(attributes: Attributes): BridgeModule[IN, OUT, Subscriber[IN]] = {
- new SourceBridgeModule(attributes)
- }
-}
-
-/**
- *
- * Bridge module which accept out of band channel Output
- * [[org.reactivestreams.Subscriber]][OUT].
- *
- *
- * [[IN]] -> [[BridgeModule]]
- * \
- * \
- * \|
- * out of band data output [[org.reactivestreams.Subscriber]][OUT]
- *
- * @see [[BridgeModule]]
- *
- * @param attributes
- * @tparam IN, input data type from previous module
- * @tparam OUT out put data type to out of band subscriber
- */
-class SinkBridgeModule[IN, OUT](val attributes: Attributes = Attributes.name("sourceBridgeModule")) extends BridgeModule[IN, OUT, Publisher[OUT]] {
- override protected def newInstance: BridgeModule[IN, OUT, Publisher[OUT]] = new SinkBridgeModule[IN, OUT](attributes)
-
- override def withAttributes(attributes: Attributes): BridgeModule[IN, OUT, Publisher[OUT]] = {
- new SinkBridgeModule[IN, OUT](attributes)
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/DummyModule.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/DummyModule.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/DummyModule.scala
deleted file mode 100644
index bc744f9..0000000
--- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/DummyModule.scala
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package akka.stream.gearpump.module
-
-import akka.stream.impl.StreamLayout.Module
-import akka.stream.impl.{SinkModule, SourceModule}
-import akka.stream.{Attributes, MaterializationContext, SinkShape, SourceShape}
-import org.reactivestreams.{Publisher, Subscriber}
-
-/**
- * [[DummyModule]] is a set of special module to help construct a RunnableGraph,
- * so that all ports are closed.
- *
- * In runtime, [[DummyModule]] should be ignored during materialization.
- *
- * For example, if you have a [[BridgeModule]] which only accept the input
- * message from out of band channel, then you can use DummySource to fake
- * a Message Source Like this.
- *
- * [[DummySource]] -> [[BridgeModule]] -> Sink
- * /|
- * /
- * out of band input message [[Publisher]]
- *
- * After materialization, [[DummySource]] will be removed.
-
- * [[BridgeModule]] -> Sink
- * /|
- * /
- * [[akka.stream.impl.PublisherSource]]
- *
- *
- */
-trait DummyModule extends Module
-
-/**
- *
- * [[DummySource]]-> [[BridgeModule]] -> Sink
- * /|
- * /
- * out of band input message Source
- *
- * @param attributes
- * @param shape
- * @tparam Out
- */
-class DummySource[Out](val attributes: Attributes, shape: SourceShape[Out])
- extends SourceModule[Out, Unit](shape) with DummyModule {
-
- override def create(context: MaterializationContext): (Publisher[Out], Unit) = {
- throw new UnsupportedOperationException()
- }
-
- override protected def newInstance(shape: SourceShape[Out]): SourceModule[Out, Unit] = {
- new DummySource[Out](attributes, shape)
- }
-
- override def withAttributes(attr: Attributes): Module = {
- new DummySource(attr, amendShape(attr))
- }
-}
-
-/**
- *
- * Source-> [[BridgeModule]] -> [[DummySink]]
- * \
- * \
- * \|
- * out of band output message [[Subscriber]]
- *
- * @param attributes
- * @param shape
- */
-class DummySink[IN](val attributes: Attributes, shape: SinkShape[IN])
- extends SinkModule[IN, Unit](shape) with DummyModule {
- override def create(context: MaterializationContext): (Subscriber[IN], Unit) = {
- throw new UnsupportedOperationException()
- }
-
- override protected def newInstance(shape: SinkShape[IN]): SinkModule[IN, Unit] = {
- new DummySink[IN](attributes, shape)
- }
-
- override def withAttributes(attr: Attributes): Module = {
- new DummySink[IN](attr, amendShape(attr))
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/GearpumpTaskModule.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/GearpumpTaskModule.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/GearpumpTaskModule.scala
deleted file mode 100644
index c4c78cc..0000000
--- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/GearpumpTaskModule.scala
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package akka.stream.gearpump.module
-
-import akka.stream.impl.FlowModule
-import akka.stream.impl.StreamLayout.Module
-import akka.stream.{Attributes, Inlet, Outlet, Shape, SinkShape, SourceShape}
-
-import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.sink.DataSink
-import org.apache.gearpump.streaming.source.DataSource
-import org.apache.gearpump.streaming.task.Task
-
-/**
- * [[GearpumpTaskModule]] represent modules that can be materialized as Gearpump Tasks.
- *
- * This is specially designed for Gearpump runtime. It is not supposed to be used
- * for local materializer.
- *
- */
-trait GearpumpTaskModule extends Module
-
-/**
- * This is used to represent the Gearpump Data Source
- * @param source
- * @param conf
- * @param shape
- * @param attributes
- * @tparam T
- */
-final case class SourceTaskModule[T](
- source: DataSource,
- conf: UserConfig,
- shape: SourceShape[T] = SourceShape[T](Outlet[T]("SourceTaskModule.out")),
- attributes: Attributes = Attributes.name("SourceTaskModule"))
- extends GearpumpTaskModule {
-
- override def subModules: Set[Module] = Set.empty
- override def withAttributes(attr: Attributes): Module = {
- this.copy(shape = amendShape(attr), attributes = attr)
- }
- override def carbonCopy: Module = {
- this.copy(shape = SourceShape(Outlet[T]("SourceTaskModule.out")))
- }
-
- override def replaceShape(s: Shape): Module =
- if (s == shape) this
- else throw new UnsupportedOperationException("cannot replace the shape of SourceTaskModule")
-
- private def amendShape(attr: Attributes): SourceShape[T] = {
- val thisN = attributes.nameOrDefault(null)
- val thatN = attr.nameOrDefault(null)
-
- if ((thatN eq null) || thisN == thatN) shape
- else shape.copy(outlet = Outlet(thatN + ".out"))
- }
-}
-
-/**
- * This is used to represent the Gearpump Data Sink
- * @param sink
- * @param conf
- * @param shape
- * @param attributes
- * @tparam IN
- */
-final case class SinkTaskModule[IN](
- sink: DataSink,
- conf: UserConfig,
- shape: SinkShape[IN] = SinkShape[IN](Inlet[IN]("SinkTaskModule.in")),
- attributes: Attributes = Attributes.name("SinkTaskModule"))
- extends GearpumpTaskModule {
-
- override def subModules: Set[Module] = Set.empty
- override def withAttributes(attr: Attributes): Module = {
- this.copy(shape = amendShape(attr), attributes = attr)
- }
- override def carbonCopy: Module = this.copy(shape = SinkShape(Inlet[IN]("SinkTaskModule.in")))
-
- override def replaceShape(s: Shape): Module =
- if (s == shape) this
- else throw new UnsupportedOperationException("cannot replace the shape of SinkTaskModule")
-
- private def amendShape(attr: Attributes): SinkShape[IN] = {
- val thisN = attributes.nameOrDefault(null)
- val thatN = attr.nameOrDefault(null)
-
- if ((thatN eq null) || thisN == thatN) shape
- else shape.copy(inlet = Inlet(thatN + ".out"))
- }
-}
-
-/**
- * This is to represent the Gearpump Processor which has exact one input and one output
- * @param processor
- * @param conf
- * @param attributes
- * @tparam IN
- * @tparam OUT
- * @tparam Unit
- */
-case class ProcessorModule[IN, OUT, Unit](
- processor: Class[_ <: Task],
- conf: UserConfig,
- val attributes: Attributes = Attributes.name("processorModule"))
- extends FlowModule[IN, OUT, Unit] with GearpumpTaskModule {
-
- override def carbonCopy: Module = newInstance
-
- protected def newInstance: ProcessorModule[IN, OUT, Unit] = {
- new ProcessorModule[IN, OUT, Unit](processor, conf, attributes)
- }
-
- override def withAttributes(attributes: Attributes): ProcessorModule[IN, OUT, Unit] = {
- new ProcessorModule[IN, OUT, Unit](processor, conf, attributes)
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/GroupByModule.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/GroupByModule.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/GroupByModule.scala
deleted file mode 100644
index e57a6f6..0000000
--- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/GroupByModule.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package akka.stream.gearpump.module
-
-import akka.stream.Attributes
-import akka.stream.impl.FlowModule
-import akka.stream.impl.StreamLayout.Module
-
-/**
- *
- * Group the T value groupBy function
- *
- * @param f
- * @param attributes
- * @tparam T
- * @tparam Group
- */
-case class GroupByModule[T, Group](val groupBy: T => Group,
- val attributes: Attributes = Attributes.name("groupByModule")) extends FlowModule[T, T, Unit] {
-
- override def carbonCopy: Module = newInstance
-
- protected def newInstance: GroupByModule[T, Group] = {
- new GroupByModule[T, Group](groupBy, attributes)
- }
-
- override def withAttributes(attributes: Attributes): GroupByModule[T, Group] = {
- new GroupByModule[T, Group](groupBy, attributes)
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/ReduceModule.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/ReduceModule.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/ReduceModule.scala
deleted file mode 100644
index 926feb6..0000000
--- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/ReduceModule.scala
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package akka.stream.gearpump.module
-
-import akka.stream.Attributes
-import akka.stream.impl.FlowModule
-import akka.stream.impl.StreamLayout.Module
-
-/**
- *
- * Reduce Module
- *
- * @param f
- * @param attributes
- * @tparam T
- */
-case class ReduceModule[T](
- val f: (T, T) => T, val attributes: Attributes = Attributes.name("reduceModule"))
- extends FlowModule[T, T, Unit] {
-
- override def carbonCopy: Module = newInstance
-
- protected def newInstance: ReduceModule[T] = new ReduceModule[T](f, attributes)
-
- override def withAttributes(attributes: Attributes): ReduceModule[T] = {
- new ReduceModule[T](f, attributes)
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/akka/stream/gearpump/scaladsl/Api.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/scaladsl/Api.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/scaladsl/Api.scala
deleted file mode 100644
index 9cc46c9..0000000
--- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/scaladsl/Api.scala
+++ /dev/null
@@ -1,282 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package akka.stream.gearpump.scaladsl
-
-import akka.stream.Attributes
-import akka.stream.gearpump.module.{DummySink, DummySource, GroupByModule, ProcessorModule, ReduceModule, SinkBridgeModule, SinkTaskModule, SourceBridgeModule, SourceTaskModule}
-import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
-import org.reactivestreams.{Publisher, Subscriber}
-
-import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.sink.DataSink
-import org.apache.gearpump.streaming.source.DataSource
-import org.apache.gearpump.streaming.task.Task
-
-object GearSource {
-
- /**
- * Construct a Source which accepts out of band input messages.
- *
- * [[SourceBridgeModule]] -> Sink
- * /
- * /
- * V
- * materialize to [[Subscriber]]
- * /|
- * /
- * upstream [[Publisher]] send out of band message
- *
- */
- def bridge[IN, OUT]: Source[OUT, Subscriber[IN]] = {
- val source = new Source(new DummySource[IN](Attributes.name("dummy"), Source.shape("dummy")))
- val flow = new Flow[IN, OUT, Subscriber[IN]](new SourceBridgeModule[IN, OUT]())
- source.viaMat(flow)(Keep.right)
- }
-
- /**
- * Construct a Source from Gearpump [[DataSource]].
- *
- * [[SourceTaskModule]] -> downstream Sink
- *
- */
- def from[OUT](source: DataSource): Source[OUT, Unit] = {
- val taskSource = new Source[OUT, Unit](new SourceTaskModule(source, UserConfig.empty))
- taskSource
- }
-
- /**
- * Construct a Source from Gearpump [[org.apache.gearpump.streaming.Processor]].
- *
- * [[ProcessorModule]] -> downstream Sink
- *
- */
- def from[OUT](processor: Class[_ <: Task], conf: UserConfig): Source[OUT, Unit] = {
- val source = new Source(new DummySource[Unit](Attributes.name("dummy"), Source.shape("dummy")))
- val flow = Processor.apply[Unit, OUT](processor, conf)
- source.viaMat(flow)(Keep.right)
- }
-}
-
-object GearSink {
-
- /**
- * Construct a Sink which output messages to a out of band channel.
- *
- * Souce -> [[SinkBridgeModule]]
- * \
- * \|
- * materialize to [[Publisher]]
- * \
- * \
- * \|
- * send out of band message to downstream [[Subscriber]]
- *
- */
- def bridge[IN, OUT]: Sink[IN, Publisher[OUT]] = {
- val sink = new Sink(new DummySink[OUT](Attributes.name("dummy"), Sink.shape("dummy")))
- val flow = new Flow[IN, OUT, Publisher[OUT]](new SinkBridgeModule[IN, OUT]())
- flow.to(sink)
- }
-
- /**
- * Construct a Sink from Gearpump [[DataSink]].
- *
- * Upstream Source -> [[SinkTaskModule]]
- *
- */
- def to[IN](sink: DataSink): Sink[IN, Unit] = {
- val taskSink = new Sink[IN, Unit](new SinkTaskModule(sink, UserConfig.empty))
- taskSink
- }
-
- /**
- * Construct a Sink from Gearpump [[org.apache.gearpump.streaming.Processor]].
- *
- * Upstream Source -> [[ProcessorModule]]
- *
- */
- def to[IN](processor: Class[_ <: Task], conf: UserConfig): Sink[IN, Unit] = {
- val sink = new Sink(new DummySink[Unit](Attributes.name("dummy"), Sink.shape("dummy")))
- val flow = Processor.apply[IN, Unit](processor, conf)
- flow.to(sink)
- }
-}
-
-/**
- *
- * GroupBy will divide the main input stream to a set of sub-streams.
- * This is a work-around to bypass the limitation of official API Flow.groupBy
- *
- *
- * For example, to do a word count, we can write code like this:
- *
- * case class KV(key: String, value: String)
- * case class Count(key: String, count: Int)
- *
- * val flow: Flow[KV] = GroupBy[KV](foo).map{ kv =>
- * Count(kv.key, 1)
- * }.fold(Count(null, 0)){(base, add) =>
- * Count(add.key, base.count + add.count)
- * }.log("count of current key")
- * .flatten()
- * .to(sink)
- *
- * map, fold will transform data on all sub-streams, If there are 10 groups,
- * then there will be 10 sub-streams, and for each sub-stream, there will be
- * a map and fold.
- *
- * flatten will collect all sub-stream into the main stream,
- *
- * sink will only operate on the main stream.
- *
- */
-object GroupBy {
- def apply[T, Group](groupBy: T => Group): Flow[T, T, Unit] = {
- new Flow[T, T, Unit](new GroupByModule(groupBy))
- }
-}
-
-/**
- * Aggregate on the data.
- *
- * val flow = Reduce({(a: Int, b: Int) => a + b})
- *
- *
- */
-object Reduce {
- def apply[T](reduce: (T, T) => T): Flow[T, T, Unit] = {
- new Flow[T, T, Unit](new ReduceModule(reduce))
- }
-}
-
-/**
- * Create a Flow by providing a Gearpump Processor class and configuration
- *
- *
- */
-object Processor {
- def apply[In, Out](processor: Class[_ <: Task], conf: UserConfig): Flow[In, Out, Unit] = {
- new Flow[In, Out, Unit](new ProcessorModule[In, Out, Unit](processor, conf))
- }
-}
-
-object Implicits {
-
- /**
- * Help util to support reduce and groupBy
- */
- implicit class SourceOps[T, Mat](source: Source[T, Mat]) {
-
- //TODO It is named as groupBy2 to avoid conflict with built-in
- // groupBy. Eventually, we think the built-in groupBy should
- // be replace with this implementation.
- def groupBy2[Group](groupBy: T => Group): Source[T, Mat] = {
- val stage = GroupBy.apply(groupBy)
- source.via[T, Unit](stage)
- }
-
- def reduce(reduce: (T, T) => T): Source[T, Mat] = {
- val stage = Reduce.apply(reduce)
- source.via[T, Unit](stage)
- }
-
- def process[R](processor: Class[_ <: Task], conf: UserConfig): Source[R, Mat] = {
- val stage = Processor.apply[T, R](processor, conf)
- source.via(stage)
- }
- }
-
- /**
- * Help util to support reduce and groupBy
- */
- implicit class FlowOps[IN, OUT, Mat](flow: Flow[IN, OUT, Mat]) {
- def groupBy2[Group](groupBy: OUT => Group): Flow[IN, OUT, Mat] = {
- val stage = GroupBy.apply(groupBy)
- flow.via(stage)
- }
-
- def reduce(reduce: (OUT, OUT) => OUT): Flow[IN, OUT, Mat] = {
- val stage = Reduce.apply(reduce)
- flow.via(stage)
- }
-
- def process[R](processor: Class[_ <: Task], conf: UserConfig): Flow[IN, R, Mat] = {
- val stage = Processor.apply[OUT, R](processor, conf)
- flow.via(stage)
- }
- }
-
- /**
- * Help util to support groupByKey and sum
- */
- implicit class KVSourceOps[K, V, Mat](source: Source[(K, V), Mat]) {
-
- /**
- * if it is a KV Pair, we can group the KV pair by the key.
- * @return
- */
- def groupByKey: Source[(K, V), Mat] = {
- val stage = GroupBy.apply(getTupleKey[K, V])
- source.via(stage)
- }
-
- /**
- * Does sum on values
- *
- * Before doing this, you need to do groupByKey to group same key together
- * , otherwise, it will do the sum no matter what current key is.
- */
- def sumOnValue(implicit numeric: Numeric[V]): Source[(K, V), Mat] = {
- val stage = Reduce.apply(sumByKey[K, V](numeric))
- source.via(stage)
- }
- }
-
- /**
- * Helper util to support groupByKey and sum
- */
- implicit class KVFlowOps[K, V, Mat](flow: Flow[(K, V), (K, V), Mat]) {
-
- /**
- * If it is a KV Pair, we can group the KV pair by the key.
- *
- */
- def groupByKey: Flow[(K, V), (K, V), Mat] = {
- val stage = GroupBy.apply(getTupleKey[K, V])
- flow.via(stage)
- }
-
- /**
- * do sum on values
- *
- * Before doing this, you need to do groupByKey to group same key together
- * , otherwise, it will do the sum no matter what current key is.
- *
- */
- def sumOnValue(implicit numeric: Numeric[V]): Flow[(K, V), (K, V), Mat] = {
- val stage = Reduce.apply(sumByKey[K, V](numeric))
- flow.via(stage)
- }
- }
-
- private def getTupleKey[K, V](tuple: Tuple2[K, V]): K = tuple._1
-
- private def sumByKey[K, V](numeric: Numeric[V]): (Tuple2[K, V], Tuple2[K, V]) => Tuple2[K, V] =
- (tuple1, tuple2) => Tuple2(tuple1._1, numeric.plus(tuple1._2, tuple2._2))
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/BalanceTask.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/BalanceTask.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/BalanceTask.scala
deleted file mode 100644
index 2eb0612..0000000
--- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/BalanceTask.scala
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package akka.stream.gearpump.task
-
-import org.apache.gearpump.Message
-import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.task.TaskContext
-
-class BalanceTask(context: TaskContext, userConf: UserConfig) extends GraphTask(context, userConf) {
-
- val sizeOfOutputs = sizeOfOutPorts
- var index = 0
-
- override def onNext(msg: Message): Unit = {
- output(index, msg)
- index += 1
- if (index == sizeOfOutputs) {
- index = 0
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/BroadcastTask.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/BroadcastTask.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/BroadcastTask.scala
deleted file mode 100644
index 925bf21..0000000
--- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/BroadcastTask.scala
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package akka.stream.gearpump.task
-
-import org.apache.gearpump.Message
-import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.task.TaskContext
-
-class BroadcastTask(context: TaskContext, userConf: UserConfig) extends GraphTask(context, userConf) {
- override def onNext(msg: Message): Unit = {
- context.output(msg)
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/GraphTask.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/GraphTask.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/GraphTask.scala
deleted file mode 100644
index 9a4e24e..0000000
--- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/GraphTask.scala
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package akka.stream.gearpump.task
-
-import akka.stream.gearpump.task.GraphTask.{Index, PortId}
-
-import org.apache.gearpump.Message
-import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.ProcessorId
-import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext, TaskWrapper}
-
-class GraphTask(inputTaskContext: TaskContext, userConf: UserConfig)
- extends Task(inputTaskContext, userConf) {
-
- private val context = inputTaskContext.asInstanceOf[TaskWrapper]
- private val outMapping = portsMapping(userConf.getValue[List[ProcessorId]](
- GraphTask.OUT_PROCESSORS).get)
- private val inMapping = portsMapping(userConf.getValue[List[ProcessorId]](
- GraphTask.IN_PROCESSORS).get)
-
- val sizeOfOutPorts = outMapping.keys.size
- val sizeOfInPorts = inMapping.keys.size
-
- private def portsMapping(processors: List[ProcessorId]): Map[PortId, Index] = {
- val portToProcessor = processors.zipWithIndex.map { kv =>
- (kv._2, kv._1)
- }.toMap
-
- val processorToIndex = processors.sorted.zipWithIndex.toMap
-
- val portToIndex = portToProcessor.map { kv =>
- val (outlet, processorId) = kv
- val index = processorToIndex(processorId)
- (outlet, index)
- }
- portToIndex
- }
-
- def output(outletId: Int, msg: Message): Unit = {
- context.output(outMapping(outletId), msg)
- }
-
- override def onStart(startTime: StartTime): Unit = {}
-
- override def onStop(): Unit = {}
-}
-
-object GraphTask {
- val OUT_PROCESSORS = "akka.stream.gearpump.task.outprocessors"
- val IN_PROCESSORS = "akka.stream.gearpump.task.inprocessors"
-
- type PortId = Int
- type Index = Int
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/SinkBridgeTask.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/SinkBridgeTask.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/SinkBridgeTask.scala
deleted file mode 100644
index b681852..0000000
--- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/SinkBridgeTask.scala
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package akka.stream.gearpump.task
-
-import java.util
-import java.util.concurrent.TimeUnit
-
-import akka.actor.Actor.Receive
-import akka.actor.{Actor, ActorRef, ActorSystem, Props}
-import akka.stream.gearpump.task.SinkBridgeTask.RequestMessage
-import akka.util.Timeout
-import org.reactivestreams.{Publisher, Subscriber, Subscription}
-
-import org.apache.gearpump.Message
-import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.cluster.client.ClientContext
-import org.apache.gearpump.streaming.ProcessorId
-import org.apache.gearpump.streaming.appmaster.AppMaster.{LookupTaskActorRef, TaskActorRef}
-import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext, TaskId}
-import org.apache.gearpump.util.LogUtil
-
-/**
- * Bridge Task when data flow is from remote Gearpump Task to local Akka-Stream Module
- *
- *
- * upstream [[Task]] -> [[SinkBridgeTask]]
- * \ Remote Cluster
- * -------------------------\----------------------
- * \ Local JVM
- * \|
- * Akka Stream [[Subscriber]]
- *
- */
-class SinkBridgeTask(taskContext: TaskContext, userConf: UserConfig) extends Task(taskContext, userConf) {
- import taskContext.taskId
-
- val queue = new util.LinkedList[Message]()
- var subscriber: ActorRef = null
-
- var request: Int = 0
-
- override def onStart(startTime: StartTime): Unit = {}
-
- override def onNext(msg: Message): Unit = {
- queue.add(msg)
- trySendingData()
- }
-
- override def onStop(): Unit = {}
-
- private def trySendingData(): Unit = {
- if (subscriber != null) {
- (0 to request).map(_ => queue.poll()).filter(_ != null).foreach { msg =>
- subscriber ! msg.msg
- request -= 1
- }
- }
- }
-
- override def receiveUnManagedMessage: Receive = {
- case RequestMessage(n) =>
- this.subscriber = sender
- LOG.info("the downstream has requested " + n + " messages from " + subscriber)
- request += n.toInt
- trySendingData()
- case msg =>
- LOG.error("Failed! Received unknown message " + "taskId: " + taskId + ", " + msg.toString)
- }
-}
-
-object SinkBridgeTask {
-
- case class RequestMessage(number: Int)
-
- class SinkBridgeTaskClient(system: ActorSystem, context: ClientContext, appId: Int, processorId: ProcessorId) extends Publisher[AnyRef] with Subscription {
- private val taskId = TaskId(processorId, index = 0)
-
- private val LOG = LogUtil.getLogger(getClass)
-
- private var actor: ActorRef = null
- import system.dispatcher
-
- private val task = context.askAppMaster[TaskActorRef](appId, LookupTaskActorRef(taskId)).map { container =>
- // println("Successfully resolved taskRef for taskId " + taskId + ", " + container.task)
- container.task
- }
-
- override def subscribe(subscriber: Subscriber[_ >: AnyRef]): Unit = {
- this.actor = system.actorOf(Props(new ClientActor(subscriber)))
- subscriber.onSubscribe(this)
- }
-
- override def cancel(): Unit = Unit
-
- private implicit val timeout = Timeout(5, TimeUnit.SECONDS)
-
- override def request(l: Long): Unit = {
- task.foreach { task =>
- task.tell(RequestMessage(l.toInt), actor)
- }
- }
- }
-
- class ClientActor(subscriber: Subscriber[_ >: AnyRef]) extends Actor {
- def receive: Receive = {
- case result: AnyRef => subscriber.onNext(result)
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/SourceBridgeTask.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/SourceBridgeTask.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/SourceBridgeTask.scala
deleted file mode 100644
index ccbd350..0000000
--- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/SourceBridgeTask.scala
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package akka.stream.gearpump.task
-
-import scala.concurrent.ExecutionContext
-
-import akka.actor.Actor.Receive
-import akka.stream.gearpump.task.SourceBridgeTask.{AkkaStreamMessage, Complete, Error}
-import org.reactivestreams.{Subscriber, Subscription}
-
-import org.apache.gearpump.Message
-import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.cluster.client.ClientContext
-import org.apache.gearpump.streaming.ProcessorId
-import org.apache.gearpump.streaming.appmaster.AppMaster.{LookupTaskActorRef, TaskActorRef}
-import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext, TaskId}
-
-/**
- * Bridge Task when data flow is from local Akka-Stream Module to remote Gearpump Task
- *
- *
- *
- * [[SourceBridgeTask]] --> downstream [[Task]]
- * /| Remote Cluster
- * ---------------/--------------------------------
- * / Local JVM
- * Akka Stream [[org.reactivestreams.Publisher]]
- *
- */
-class SourceBridgeTask(taskContext: TaskContext, userConf: UserConfig) extends Task(taskContext, userConf) {
- import taskContext.taskId
-
- override def onStart(startTime: StartTime): Unit = {}
-
- override def onNext(msg: Message): Unit = {
- LOG.info("AkkaStreamSource receiving message " + msg)
- }
-
- override def onStop(): Unit = {}
-
- override def receiveUnManagedMessage: Receive = {
- case Error(ex) =>
- LOG.error("the stream has error", ex)
- case AkkaStreamMessage(msg) =>
- LOG.error("we have received message from akka stream source: " + msg)
- taskContext.output(Message(msg, System.currentTimeMillis()))
- case Complete(description) =>
- LOG.error("the stream is completed: " + description)
- case msg =>
- LOG.error("Failed! Received unknown message " + "taskId: " + taskId + ", " + msg.toString)
- }
-}
-
-object SourceBridgeTask {
- case class Error(ex: java.lang.Throwable)
-
- case class Complete(description: String)
-
- case class AkkaStreamMessage(msg: AnyRef)
-
- class SourceBridgeTaskClient[T <: AnyRef](ec: ExecutionContext, context: ClientContext, appId: Int, processorId: ProcessorId) extends Subscriber[T] {
- val taskId = TaskId(processorId, 0)
- var subscription: Subscription = null
- implicit val dispatcher = ec
-
- val task = context.askAppMaster[TaskActorRef](appId, LookupTaskActorRef(taskId)).map { container =>
- // println("Successfully resolved taskRef for taskId " + taskId + ", " + container.task)
- container.task
- }
-
- override def onError(throwable: Throwable): Unit = {
- task.map(task => task ! Error(throwable))
- }
-
- override def onSubscribe(subscription: Subscription): Unit = {
- this.subscription = subscription
- task.map(task => subscription.request(1))
- }
-
- override def onComplete(): Unit = {
- task.map(task => task ! Complete("the upstream is completed"))
- }
-
- override def onNext(t: T): Unit = {
- task.map { task =>
- task ! AkkaStreamMessage(t)
- }
- subscription.request(1)
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/UnZip2Task.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/UnZip2Task.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/UnZip2Task.scala
deleted file mode 100644
index 78fabbe..0000000
--- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/UnZip2Task.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package akka.stream.gearpump.task
-
-import akka.stream.gearpump.task.UnZip2Task.UnZipFunction
-
-import org.apache.gearpump.Message
-import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.task.TaskContext
-
-class UnZip2Task(context: TaskContext, userConf: UserConfig) extends GraphTask(context, userConf) {
-
- val unzip = userConf.getValue[UnZipFunction](UnZip2Task.UNZIP2_FUNCTION)(context.system).get.unzip
-
- override def onNext(msg: Message): Unit = {
- val message = msg.msg
- val time = msg.timestamp
- val pair = unzip(message)
- val (a, b) = pair
- output(0, Message(a.asInstanceOf[AnyRef], time))
- output(1, Message(b.asInstanceOf[AnyRef], time))
- }
-}
-
-object UnZip2Task {
- class UnZipFunction(val unzip: Any => (Any, Any)) extends Serializable
-
- val UNZIP2_FUNCTION = "akka.stream.gearpump.task.unzip2.function"
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/akka/stream/gearpump/util/MaterializedValueOps.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/util/MaterializedValueOps.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/util/MaterializedValueOps.scala
deleted file mode 100644
index c774fc7..0000000
--- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/util/MaterializedValueOps.scala
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package akka.stream.gearpump.util
-
-import akka.stream.impl.StreamLayout.{Atomic, Combine, Ignore, MaterializedValueNode, Module, Transform}
-
-class MaterializedValueOps(mat: MaterializedValueNode) {
- def resolve[Mat](materializedValues: Map[Module, Any]): Mat = {
- def resolveMaterialized(mat: MaterializedValueNode, materializedValues: Map[Module, Any]): Any = mat match {
- case Atomic(m) => materializedValues.getOrElse(m, ())
- case Combine(f, d1, d2) => f(resolveMaterialized(d1, materializedValues), resolveMaterialized(d2, materializedValues))
- case Transform(f, d) => f(resolveMaterialized(d, materializedValues))
- case Ignore => ()
- }
- resolveMaterialized(mat, materializedValues).asInstanceOf[Mat]
- }
-}
-
-object MaterializedValueOps {
- def apply(mat: MaterializedValueNode): MaterializedValueOps = new MaterializedValueOps(mat)
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/GearAttributes.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/GearAttributes.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/GearAttributes.scala
new file mode 100644
index 0000000..016a7b2
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/GearAttributes.scala
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream
+
+import akka.stream.Attributes
+import akka.stream.Attributes.Attribute
+
+object GearAttributes {
+ /**
+ * Define how many parallel instance we want to use to run this module
+ * @param count Int
+ * @return
+ */
+ def count(count: Int): Attributes = Attributes(ParallismAttribute(count))
+
+ /**
+ * Define we want to render this module locally.
+ * @return
+ */
+ def local: Attributes = Attributes(LocationAttribute(Local))
+
+ /**
+ * Define we want to render this module remotely
+ * @return
+ */
+ def remote: Attributes = Attributes(LocationAttribute(Remote))
+
+ /**
+ * Get the effective location settings if child override the parent
+ * setttings.
+ *
+ * @param attrs Attributes
+ * @return
+ */
+ def location(attrs: Attributes): Location = {
+ attrs.attributeList.foldLeft(Local: Location) { (s, attr) =>
+ attr match {
+ case LocationAttribute(location) => location
+ case other => s
+ }
+ }
+ }
+
+ /**
+ * get effective parallelism settings if child override parent.
+ * @param attrs Attributes
+ * @return
+ */
+ def count(attrs: Attributes): Int = {
+ attrs.attributeList.foldLeft(1) { (s, attr) =>
+ attr match {
+ case ParallismAttribute(count) => count
+ case other => s
+ }
+ }
+ }
+
+ /**
+ * Where we want to render the module
+ */
+ sealed trait Location
+ object Local extends Location
+ object Remote extends Location
+
+ final case class LocationAttribute(tag: Location) extends Attribute
+
+ /**
+ * How many parallel instance we want to use for this module.
+ *
+ * @param parallelism Int
+ */
+ final case class ParallismAttribute(parallelism: Int) extends Attribute
+}