You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2016/10/11 02:33:21 UTC

[4/5] incubator-gearpump git commit: Fixes #22 support akka-streams Gearpump Materializer

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/akka/stream/gearpump/materializer/LocalMaterializerImpl.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/materializer/LocalMaterializerImpl.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/materializer/LocalMaterializerImpl.scala
deleted file mode 100644
index 1ec724e..0000000
--- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/materializer/LocalMaterializerImpl.scala
+++ /dev/null
@@ -1,284 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package akka.stream.gearpump.materializer
-
-import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
-
-import akka.actor.{ActorRef, ActorSystem}
-import akka.dispatch.Dispatchers
-import akka.stream.ModuleGraph.{Edge, MaterializedValueSourceAttribute}
-import akka.stream.actor.ActorSubscriber
-import akka.stream.gearpump.materializer.LocalMaterializerImpl.MaterializedModule
-import akka.stream.gearpump.module.ReduceModule
-import akka.stream.gearpump.util.MaterializedValueOps
-import akka.stream.impl.Stages.{DirectProcessor, Fold, StageModule}
-import akka.stream.impl.StreamLayout.Module
-import akka.stream.impl.{ActorProcessorFactory, ActorPublisher, ExposedPublisher, FanIn, FanOut, SinkModule, SourceModule, VirtualProcessor}
-import akka.stream.{ActorMaterializerSettings, Attributes, Graph => AkkaGraph, InPort, MaterializationContext, Materializer, OutPort, Shape}
-import org.reactivestreams.{Processor, Publisher, Subscriber}
-
-import org.apache.gearpump.util.Graph
-
-/**
- * This materializer is functional equivalent to [[akka.stream.impl.ActorMaterializerImpl]]
- *
- * @param system
- * @param settings
- * @param dispatchers
- * @param supervisor
- * @param haveShutDown
- * @param flowNameCounter
- * @param namePrefix
- * @param optimizations
- */
-class LocalMaterializerImpl (
-    system: ActorSystem,
-    settings: ActorMaterializerSettings,
-    dispatchers: Dispatchers,
-    supervisor: ActorRef,
-    haveShutDown: AtomicBoolean,
-    flowNameCounter: AtomicLong,
-    namePrefix: String,
-    optimizations: Optimizations)
-  extends LocalMaterializer(
-    system, settings, dispatchers, supervisor,
-    haveShutDown, flowNameCounter, namePrefix, optimizations) {
-
-  override def materialize(graph: Graph[Module, Edge], inputMatValues: Map[Module, Any]): Map[Module, Any] = {
-    val materializedGraph = graph.mapVertex { module =>
-      materializeAtomic(module)
-    }
-
-    materializedGraph.edges.foreach { nodeEdgeNode =>
-      val (node1, edge, node2) = nodeEdgeNode
-      val from = edge.from
-      val to = edge.to
-      val publisher = node1.outputs(from).asInstanceOf[Publisher[Any]]
-      val subscriber = node2.inputs(to).asInstanceOf[Subscriber[Any]]
-      publisher.subscribe(subscriber)
-    }
-
-    val matValues = inputMatValues ++ materializedGraph.vertices.map { vertex =>
-      (vertex.module, vertex.matValue)
-    }.toMap
-
-    val matValueSources = materializedGraph.vertices.filter(_.module.isInstanceOf[MaterializedValueSource[_]])
-    publishToMaterializedValueSource(matValueSources, matValues)
-
-    matValues
-  }
-
-  private def publishToMaterializedValueSource(modules: List[MaterializedModule], matValues: Map[Module, Any]) = {
-    modules.foreach { module =>
-      val source = module.module.asInstanceOf[MaterializedValueSource[_]]
-      val attr = source.attributes.getAttribute(classOf[MaterializedValueSourceAttribute], null)
-
-      Option(attr).map { attr =>
-        val valueToPublish = MaterializedValueOps(attr.mat).resolve[Any](matValues)
-        module.outputs.foreach { portAndPublisher =>
-          val (port, publisher) = portAndPublisher
-          publisher match {
-            case valuePublisher: MaterializedValuePublisher =>
-              valuePublisher.setValue(valueToPublish)
-          }
-        }
-      }
-    }
-  }
-
-  private[this] def nextFlowNameCount(): Long = flowNameCounter.incrementAndGet()
-
-  private[this] def createFlowName(): String = s"$namePrefix-${nextFlowNameCount()}"
-
-  val flowName = createFlowName()
-  var nextId = 0
-
-  def stageName(attr: Attributes): String = {
-    val name = s"$flowName-$nextId-${attr.nameOrDefault()}"
-    nextId += 1
-    name
-  }
-
-  private def materializeAtomic(atomic: Module): MaterializedModule = {
-    val effectiveAttributes = atomic.attributes
-
-    def newMaterializationContext() =
-      new MaterializationContext(LocalMaterializerImpl.this, effectiveAttributes, stageName(effectiveAttributes))
-
-    atomic match {
-      case matValue: MaterializedValueSource[_] =>
-        val pub = new MaterializedValuePublisher
-        val outputs = Map[OutPort, Publisher[_]](matValue.shape.outlet -> pub)
-        MaterializedModule(matValue, (), outputs = outputs)
-      case sink: SinkModule[_, _] =>
-        val (sub, mat) = sink.create(newMaterializationContext())
-        val inputs = Map[InPort, Subscriber[_]](sink.shape.inlet -> sub)
-        MaterializedModule(sink, mat, inputs)
-      case source: SourceModule[_, _] =>
-        val (pub, mat) = source.create(newMaterializationContext())
-        val outputs = Map[OutPort, Publisher[_]](source.shape.outlet -> pub)
-        MaterializedModule(source, mat, outputs = outputs)
-
-      case reduce: ReduceModule[Any] =>
-        //TODO: remove this after the official akka-stream API support the Reduce Module
-        val stage = LocalMaterializerImpl.toFoldModule(reduce)
-        val (processor, mat) = processorFor(stage, effectiveAttributes, effectiveSettings(effectiveAttributes))
-        val inputs = Map[InPort, Subscriber[_]](stage.inPort -> processor)
-        val outputs = Map[OutPort, Publisher[_]](stage.outPort -> processor)
-        MaterializedModule(stage, mat, inputs, outputs)
-
-      case stage: StageModule =>
-        val (processor, mat) = processorFor(stage, effectiveAttributes, effectiveSettings(effectiveAttributes))
-        val inputs = Map[InPort, Subscriber[_]](stage.inPort -> processor)
-        val outputs = Map[OutPort, Publisher[_]](stage.outPort -> processor)
-        MaterializedModule(stage, mat, inputs, outputs)
-      case tls: TlsModule => // TODO solve this so TlsModule doesn't need special treatment here
-        val es = effectiveSettings(effectiveAttributes)
-        val props =
-          SslTlsCipherActor.props(es, tls.sslContext, tls.firstSession, tracing = false, tls.role, tls.closing)
-        val impl = actorOf(props, stageName(effectiveAttributes), es.dispatcher)
-        def factory(id: Int) = new ActorPublisher[Any](impl) {
-          override val wakeUpMsg = FanOut.SubstreamSubscribePending(id)
-        }
-        val publishers = Vector.tabulate(2)(factory)
-        impl ! FanOut.ExposedPublishers(publishers)
-
-        val inputs = Map[InPort, Subscriber[_]](
-          tls.plainIn -> FanIn.SubInput[Any](impl, SslTlsCipherActor.UserIn),
-          tls.cipherIn -> FanIn.SubInput[Any](impl, SslTlsCipherActor.TransportIn))
-
-        val outputs = Map[OutPort, Publisher[_]](
-          tls.plainOut -> publishers(SslTlsCipherActor.UserOut),
-          tls.cipherOut -> publishers(SslTlsCipherActor.TransportOut))
-        MaterializedModule(tls, (), inputs, outputs)
-
-      case junction: JunctionModule =>
-        materializeJunction(junction, effectiveAttributes, effectiveSettings(effectiveAttributes))
-    }
-  }
-
-  private def processorFor(op: StageModule,
-    effectiveAttributes: Attributes,
-    effectiveSettings: ActorMaterializerSettings): (Processor[Any, Any], Any) = op match {
-    case DirectProcessor(processorFactory, _) => processorFactory()
-    case Identity(attr) => (new VirtualProcessor, ())
-    case _ =>
-      val (opprops, mat) = ActorProcessorFactory.props(LocalMaterializerImpl.this, op, effectiveAttributes)
-      ActorProcessorFactory[Any, Any](
-            actorOf(opprops, stageName(effectiveAttributes), effectiveSettings.dispatcher)) -> mat
-  }
-
-  private def materializeJunction(
-    op: JunctionModule,
-    effectiveAttributes: Attributes,
-    effectiveSettings: ActorMaterializerSettings): MaterializedModule = {
-    op match {
-      case fanin: FanInModule =>
-        val (props, inputs, output) = fanin match {
-
-          case MergeModule(shape, _) =>
-            (FairMerge.props(effectiveSettings, shape.inSeq.size), shape.inSeq, shape.out)
-
-          case f: FlexiMergeModule[_, Shape] =>
-            val flexi = f.flexi(f.shape)
-            val shape: Shape = f.shape
-            (FlexiMerge.props(effectiveSettings, f.shape, flexi), shape.inlets, shape.outlets.head)
-
-          case MergePreferredModule(shape, _) =>
-            (UnfairMerge.props(effectiveSettings, shape.inlets.size), shape.preferred +: shape.inSeq, shape.out)
-
-          case ConcatModule(shape, _) =>
-            require(shape.inSeq.size == 2, "currently only supporting concatenation of exactly two inputs") // TODO
-            (Concat.props(effectiveSettings), shape.inSeq, shape.out)
-
-          case zip: ZipWithModule =>
-            (zip.props(effectiveSettings), zip.shape.inlets, zip.outPorts.head)
-        }
-
-        val impl = actorOf(props, stageName(effectiveAttributes), effectiveSettings.dispatcher)
-        val publisher = new ActorPublisher[Any](impl)
-        // Resolve cyclic dependency with actor. This MUST be the first message no matter what.
-        impl ! ExposedPublisher(publisher)
-
-        val inputMapping: Map[InPort, Subscriber[_]] = inputs.zipWithIndex.map { pair =>
-          val (in, id) = pair
-          (in, FanIn.SubInput[Any](impl, id))
-        }.toMap
-
-        val outMapping = Map(output -> publisher)
-        MaterializedModule(fanin, (), inputMapping, outMapping)
-
-      case fanout: FanOutModule =>
-        val (props, in, outs) = fanout match {
-
-          case r: FlexiRouteModule[t, Shape] =>
-            val flexi = r.flexi(r.shape)
-            val shape: Shape = r.shape
-            (FlexiRoute.props(effectiveSettings, r.shape, flexi), shape.inlets.head: InPort, r.shape.outlets)
-
-          case BroadcastModule(shape, eagerCancel, _) =>
-            (Broadcast.props(effectiveSettings, eagerCancel, shape.outArray.size), shape.in, shape.outArray.toSeq)
-
-          case BalanceModule(shape, waitForDownstreams, _) =>
-            (Balance.props(effectiveSettings, shape.outArray.size, waitForDownstreams), shape.in, shape.outArray.toSeq)
-
-          case unzip: UnzipWithModule =>
-            (unzip.props(effectiveSettings), unzip.inPorts.head, unzip.shape.outlets)
-        }
-        val impl = actorOf(props, stageName(effectiveAttributes), effectiveSettings.dispatcher)
-        val size = outs.size
-        def factory(id: Int) =
-          new ActorPublisher[Any](impl) {
-            override val wakeUpMsg = FanOut.SubstreamSubscribePending(id)
-          }
-        val publishers =
-          if (outs.size < 8) Vector.tabulate(size)(factory)
-          else List.tabulate(size)(factory)
-
-        impl ! FanOut.ExposedPublishers(publishers)
-        val outputs: Map[OutPort, Publisher[_]] = publishers.iterator.zip(outs.iterator).map { case (pub, out) =>
-          (out, pub)
-        }.toMap
-
-        val inputs: Map[InPort, Subscriber[_]] = Map(in -> ActorSubscriber[Any](impl))
-        MaterializedModule(fanout, (), inputs, outputs)
-    }
-  }
-
-  override def withNamePrefix(name: String): Materializer = {
-    new LocalMaterializerImpl(system, settings, dispatchers, supervisor,
-      haveShutDown, flowNameCounter, namePrefix = name, optimizations)
-  }
-}
-
-object LocalMaterializerImpl {
-  case class MaterializedModule(val module: Module, val matValue: Any, inputs: Map[InPort, Subscriber[_]] = Map.empty[InPort, Subscriber[_]], outputs: Map[OutPort, Publisher[_]] = Map.empty[OutPort, Publisher[_]])
-
-  def toFoldModule(reduce: ReduceModule[Any]): Fold = {
-    val f = reduce.f
-    val aggregator = { (zero: Any, input: Any) =>
-      if (zero == null) {
-        input
-      } else {
-        f(zero, input)
-      }
-    }
-    new Fold(null, aggregator)
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/akka/stream/gearpump/materializer/RemoteMaterializerImpl.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/materializer/RemoteMaterializerImpl.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/materializer/RemoteMaterializerImpl.scala
deleted file mode 100644
index 47ed1f2..0000000
--- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/materializer/RemoteMaterializerImpl.scala
+++ /dev/null
@@ -1,453 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package akka.stream.gearpump.materializer
-
-import akka.actor.ActorSystem
-import akka.stream.ModuleGraph.Edge
-import akka.stream.gearpump.GearAttributes
-import akka.stream.gearpump.module.{GroupByModule, ProcessorModule, ReduceModule, SinkBridgeModule, SinkTaskModule, SourceBridgeModule, SourceTaskModule}
-import akka.stream.gearpump.task.{BalanceTask, BroadcastTask, GraphTask, SinkBridgeTask, SourceBridgeTask, UnZip2Task}
-import akka.stream.impl.Stages
-import akka.stream.impl.Stages.StageModule
-import akka.stream.impl.StreamLayout.Module
-import org.slf4j.LoggerFactory
-
-import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.dsl.StreamApp
-import org.apache.gearpump.streaming.dsl.op.{DataSinkOp, DataSourceOp, Direct, FlatMapOp, GroupByOp, MasterOp, MergeOp, Op, OpEdge, ProcessorOp, Shuffle, SlaveOp}
-import org.apache.gearpump.streaming.{ProcessorId, StreamApplication}
-import org.apache.gearpump.util.Graph
-
-/**
- * [[RemoteMaterializerImpl]] will materialize the [[Graph[Module, Edge]] to a Gearpump
- * Streaming Application.
- *
- * @param graph
- * @param system
- */
-class RemoteMaterializerImpl(graph: Graph[Module, Edge], system: ActorSystem) {
-
-  import RemoteMaterializerImpl._
-
-  type Clue = String
-  private implicit val actorSystem = system
-
-  private def uuid: String = {
-    java.util.UUID.randomUUID.toString
-  }
-
-  /**
-   * @return a mapping from Module to Materialized Processor Id.
-   */
-  def materialize: (StreamApplication, Map[Module, ProcessorId]) = {
-    val (opGraph, clues) = toOpGraph()
-    val app: StreamApplication = new StreamApp("app", system, UserConfig.empty, opGraph)
-    val processorIds = resolveClues(app, clues)
-
-    val updatedApp = updateJunctionConfig(processorIds, app)
-    (cleanClues(updatedApp), processorIds)
-  }
-
-  private def updateJunctionConfig(processorIds: Map[Module, ProcessorId], app: StreamApplication): StreamApplication = {
-    val config = junctionConfig(processorIds)
-
-    val dag = app.dag.mapVertex { vertex =>
-      val processorId = vertex.id
-      val newConf = vertex.taskConf.withConfig(config(processorId))
-      vertex.copy(taskConf = newConf)
-    }
-    new StreamApplication(app.name, app.inputUserConfig, dag)
-  }
-
-  /**
-   * Update junction config so that each GraphTask know its upstream and downstream.
-   * @param processorIds
-   * @return
-   */
-  private def junctionConfig(processorIds: Map[Module, ProcessorId]): Map[ProcessorId, UserConfig] = {
-    val updatedConfigs = graph.vertices.map { vertex =>
-      val processorId = processorIds(vertex)
-      vertex match {
-        case junction: JunctionModule =>
-          val inProcessors = junction.shape.inlets.map { inlet =>
-            val upstreamModule = graph.incomingEdgesOf(junction).find(_._2.to == inlet).map(_._1)
-            val upstreamProcessorId = processorIds(upstreamModule.get)
-            upstreamProcessorId
-          }.toList
-
-          val outProcessors = junction.shape.outlets.map { outlet =>
-            val downstreamModule = graph.outgoingEdgesOf(junction).find(_._2.from == outlet).map(_._3)
-            val downstreamProcessorId = downstreamModule.map(processorIds(_))
-            downstreamProcessorId.get
-          }.toList
-
-          (processorId, UserConfig.empty.withValue(GraphTask.OUT_PROCESSORS, outProcessors)
-            .withValue(GraphTask.IN_PROCESSORS, inProcessors))
-        case _ =>
-          (processorId, UserConfig.empty)
-      }
-    }.toMap
-    updatedConfigs
-  }
-
-  private def resolveClues(app: StreamApplication, clues: Map[Module, Clue]): Map[Module, ProcessorId] = {
-    clues.flatMap { kv =>
-      val (module, clue) = kv
-      val processorId = app.dag.vertices.find { processor =>
-        processor.taskConf.getString(clue).isDefined
-      }.map(_.id)
-      processorId.map((module, _))
-    }
-  }
-
-  private def cleanClues(app: StreamApplication): StreamApplication = {
-    val graph = app.dag.mapVertex { processor =>
-      val conf = cleanClue(processor.taskConf)
-      processor.copy(taskConf = conf)
-    }
-    new StreamApplication(app.name, app.inputUserConfig, graph)
-  }
-
-  private def cleanClue(conf: UserConfig): UserConfig = {
-    conf.filter { kv =>
-      kv._2 != RemoteMaterializerImpl.STAINS
-    }
-  }
-
-  private def toOpGraph(): (Graph[Op, OpEdge], Map[Module, Clue]) = {
-    var matValues = Map.empty[Module, Clue]
-    val opGraph = graph.mapVertex{ module =>
-      val name = uuid
-      val conf = UserConfig.empty.withString(name, RemoteMaterializerImpl.STAINS)
-      matValues += module -> name
-      val parallelism = GearAttributes.count(module.attributes)
-      val op = module match {
-        case source: SourceTaskModule[t] =>
-          val updatedConf = conf.withConfig(source.conf)
-          new DataSourceOp[t](source.source, parallelism, updatedConf, "source")
-        case sink: SinkTaskModule[t] =>
-          val updatedConf = conf.withConfig(sink.conf)
-          new DataSinkOp[t](sink.sink, parallelism, updatedConf, "sink")
-        case sourceBridge: SourceBridgeModule[_, _] =>
-          new ProcessorOp(classOf[SourceBridgeTask], parallelism = 1, conf, "source")
-        case processor: ProcessorModule[_, _, _] =>
-          val updatedConf = conf.withConfig(processor.conf)
-          new ProcessorOp(processor.processor, parallelism, updatedConf, "source")
-        case sinkBridge: SinkBridgeModule[_, _] =>
-          new ProcessorOp(classOf[SinkBridgeTask], parallelism, conf, "sink")
-        case groupBy: GroupByModule[t, g] =>
-          new GroupByOp[t, g](groupBy.groupBy, parallelism, "groupBy", conf)
-        case reduce: ReduceModule[Any] =>
-          reduceOp(reduce.f, conf)
-        case stage: StageModule =>
-          translateStage(stage, conf)
-        case fanIn: FanInModule =>
-          translateFanIn(fanIn, graph.incomingEdgesOf(fanIn), parallelism, conf)
-        case fanOut: FanOutModule =>
-          translateFanOut(fanOut, graph.outgoingEdgesOf(fanOut), parallelism, conf)
-      }
-
-      if (op == null) {
-        throw new UnsupportedOperationException(module.getClass.toString + " is not supported with RemoteMaterializer")
-      }
-      op
-    }.mapEdge[OpEdge]{(n1, edge, n2) =>
-      n2 match {
-        case master: MasterOp =>
-          Shuffle
-        case slave: SlaveOp[_] if n1.isInstanceOf[ProcessorOp[_]] =>
-          Shuffle
-        case slave: SlaveOp[_] =>
-          Direct
-      }
-    }
-    (opGraph, matValues)
-  }
-
-  private def translateStage(module: StageModule, conf: UserConfig): Op = {
-    module match {
-      case buffer: Stages.Buffer =>
-        //ignore the buffering operation
-        identity("buffer", conf)
-      case collect: Stages.Collect =>
-        collectOp(collect.pf, conf)
-      case concatAll: Stages.ConcatAll =>
-        //TODO:
-        null
-      case conflat: Stages.Conflate =>
-        conflatOp(conflat.seed, conflat.aggregate, conf)
-      case drop: Stages.Drop =>
-        dropOp(drop.n, conf)
-      case dropWhile: Stages.DropWhile =>
-        dropWhileOp(dropWhile.p, conf)
-      case expand: Stages.Expand =>
-        //TODO
-        null
-      case filter: Stages.Filter =>
-        filterOp(filter.p, conf)
-      case fold: Stages.Fold =>
-        foldOp(fold.zero, fold.f, conf)
-      case groupBy: Stages.GroupBy =>
-        //TODO
-        null
-      case grouped: Stages.Grouped =>
-        groupedOp(grouped.n, conf)
-      case _: Stages.Identity =>
-        identity("identity", conf)
-      case log: Stages.Log =>
-        logOp(log.name, log.extract, conf)
-      case map: Stages.Map =>
-        mapOp(map.f, conf)
-      case mapAsync: Stages.MapAsync =>
-        //TODO
-        null
-      case mapAsync: Stages.MapAsyncUnordered =>
-        //TODO
-        null
-      case flatMap: Stages.MapConcat =>
-        flatMapOp(flatMap.f, "mapConcat", conf)
-      case stage: MaterializingStageFactory =>
-        //TODO
-        null
-      case prefixAndTail: Stages.PrefixAndTail =>
-        //TODO
-        null
-      case recover: Stages.Recover =>
-        //TODO: we will just ignore this
-        identity("recover", conf)
-      case scan: Stages.Scan =>
-        scanOp(scan.zero, scan.f, conf)
-      case split: Stages.Split =>
-        //TODO
-        null
-      case stage: Stages.StageFactory =>
-        //TODO
-        null
-      case take: Stages.Take =>
-        takeOp(take.n, conf)
-      case takeWhile: Stages.TakeWhile =>
-        filterOp(takeWhile.p, conf)
-      case time: Stages.TimerTransform =>
-        //TODO
-        null
-    }
-  }
-
-  private def translateFanIn(
-    fanIn: FanInModule,
-    edges: List[(Module, Edge, Module)],
-    parallelism: Int,
-    conf: UserConfig): Op = {
-    fanIn match {
-      case merge: MergeModule[_] =>
-        MergeOp("merge", conf)
-      case mergePrefered: MergePreferredModule[_] =>
-        //TODO, support "prefer" merge
-        MergeOp("mergePrefered", conf)
-      case zip: ZipWithModule =>
-        //TODO: support zip module
-        null
-      case concat: ConcatModule[_] =>
-        //TODO: support concat module
-        null
-      case flexiMerge: FlexiMergeModule[_, _] =>
-        //TODO: Suport flexi merge module
-        null
-    }
-  }
-
-  private def translateFanOut(
-    fanOut: FanOutModule,
-    edges: List[(Module, Edge, Module)],
-    parallelism: Int,
-    conf: UserConfig): Op = {
-    fanOut match {
-      case unzip2: UnzipWith2Module[Any, Any, Any] =>
-        val updatedConf = conf.withValue(UnZip2Task.UNZIP2_FUNCTION, new UnZip2Task.UnZipFunction(unzip2.f))
-        new ProcessorOp(classOf[UnZip2Task], parallelism, updatedConf, "unzip")
-      case broadcast: BroadcastModule[_] =>
-        new ProcessorOp(classOf[BroadcastTask], parallelism, conf, "broadcast")
-      case broadcast: BalanceModule[_] =>
-        new ProcessorOp(classOf[BalanceTask], parallelism, conf, "balance")
-      case flexi: FlexiRouteImpl[_, _] =>
-        //TODO
-        null
-    }
-  }
-}
-
-object RemoteMaterializerImpl {
-  final val NotApplied: Any => Any = _ => NotApplied
-
-  def collectOp(collect: PartialFunction[Any, Any], conf: UserConfig): Op = {
-    flatMapOp({ data =>
-      collect.applyOrElse(data, NotApplied) match {
-        case NotApplied => None
-        case result: Any => Option(result)
-      }
-    }, "collect", conf)
-  }
-
-  def filterOp(filter: Any => Boolean, conf: UserConfig): Op = {
-    flatMapOp({ data =>
-      if (filter(data)) Option(data) else None
-    }, "filter", conf)
-  }
-
-  def reduceOp(reduce: (Any, Any) => Any, conf: UserConfig): Op = {
-    var result: Any = null
-    val flatMap = { elem: Any =>
-      if (result == null) {
-        result = elem
-      } else {
-        result = reduce(result, elem)
-      }
-      List(result)
-    }
-    flatMapOp(flatMap, "reduce", conf)
-  }
-
-  def identity(description: String, conf: UserConfig): Op = {
-    flatMapOp({ data =>
-      List(data)
-    }, description, conf)
-  }
-
-  def mapOp(map: Any => Any, conf: UserConfig): Op = {
-    flatMapOp({ data: Any =>
-      List(map(data))
-    }, "map", conf)
-  }
-
-  def flatMapOp(flatMap: Any => Iterable[Any], conf: UserConfig): Op = {
-    flatMapOp(flatMap, "flatmap", conf)
-  }
-
-  def flatMapOp(fun: Any => TraversableOnce[Any], description: String, conf: UserConfig): Op = {
-    FlatMapOp(fun, description, conf)
-  }
-
-  def conflatOp(seed: Any => Any, aggregate: (Any, Any) => Any, conf: UserConfig): Op = {
-    var agg: Any = null
-    val flatMap = { elem: Any =>
-      agg = if (agg == null) {
-        seed(elem)
-      } else {
-        aggregate(agg, elem)
-      }
-      List(agg)
-    }
-
-    flatMapOp(flatMap, "map", conf)
-  }
-
-  def foldOp(zero: Any, fold: (Any, Any) => Any, conf: UserConfig): Op = {
-    var aggregator: Any = zero
-    val map = { elem: Any =>
-      aggregator = fold(aggregator, elem)
-      List(aggregator)
-    }
-    flatMapOp(map, "fold", conf)
-  }
-
-  def groupedOp(count: Int, conf: UserConfig): Op = {
-    var left = count
-    val buf = {
-      val b = Vector.newBuilder[Any]
-      b.sizeHint(count)
-      b
-    }
-
-    val flatMap: Any => Iterable[Any] = { input: Any =>
-      buf += input
-      left -= 1
-      if (left == 0) {
-        val emit = buf.result()
-        buf.clear()
-        left = count
-        Some(emit)
-      } else {
-        None
-      }
-    }
-    flatMapOp(flatMap, conf: UserConfig)
-  }
-
-  def dropOp(number: Long, conf: UserConfig): Op = {
-    var left = number
-    val flatMap: Any => Iterable[Any] = { input: Any =>
-      if (left > 0) {
-        left -= 1
-        None
-      } else {
-        Some(input)
-      }
-    }
-    flatMapOp(flatMap, "drop", conf)
-  }
-
-  def dropWhileOp(drop: Any => Boolean, conf: UserConfig): Op = {
-    flatMapOp({ data =>
-      if (drop(data)) None else Option(data)
-    }, "dropWhile", conf)
-  }
-
-  def logOp(name: String, extract: Any => Any, conf: UserConfig): Op = {
-    val flatMap = { elem: Any =>
-      LoggerFactory.getLogger(name).info(s"Element: {${extract(elem)}}")
-      List(elem)
-    }
-    flatMapOp(flatMap, "log", conf)
-  }
-
-  def scanOp(zero: Any, f: (Any, Any) => Any, conf: UserConfig): Op = {
-    var aggregator = zero
-    var pushedZero = false
-
-    val flatMap = { elem: Any =>
-      aggregator = f(aggregator, elem)
-
-      if (pushedZero) {
-        pushedZero = true
-        List(zero, aggregator)
-      } else {
-        List(aggregator)
-      }
-    }
-    flatMapOp(flatMap, "scan", conf)
-  }
-
-  def takeOp(count: Long, conf: UserConfig): Op = {
-    var left: Long = count
-
-    val filter: Any => Iterable[Any] = { elem: Any =>
-      left -= 1
-      if (left > 0) Some(elem)
-      else if (left == 0) Some(elem)
-      else None
-    }
-    flatMapOp(filter, "take", conf)
-  }
-
-  /**
-   * We use stains to track how module maps to Processor
-   *
-   */
-  val STAINS = "track how module is fused to processor"
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/BridgeModule.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/BridgeModule.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/BridgeModule.scala
deleted file mode 100644
index c5dfc9a..0000000
--- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/BridgeModule.scala
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package akka.stream.gearpump.module
-
-import akka.stream.Attributes
-import akka.stream.impl.FlowModule
-import akka.stream.impl.StreamLayout.Module
-import org.reactivestreams.{Publisher, Subscriber}
-
-/**
- *
- *
- * [[IN]] -> [[BridgeModule]] -> [[OUT]]
- * /
- * /
- * out of band data input or output channel [[MAT]]
- *
- *
- * [[BridgeModule]] is used as a bridge between different materializers.
- * Different [[akka.stream.Materializer]]s can use out of band channel to
- * exchange messages.
- *
- * For example:
- *
- * Remote Materializer
- * -----------------------------
- * |                            |
- * | BridgeModule -> RemoteSink |
- * |  /                         |
- * --/----------------------------
- * Local Materializer     /  out of band channel.
- * ----------------------/----
- * | Local              /    |
- * | Source ->  BridgeModule |
- * |                         |
- * ---------------------------
- *
- *
- * Typically [[BridgeModule]] is created implicitly as a temporary intermediate
- * module during materialization.
- *
- * However, user can still declare it explicitly. In this case, it means we have a
- * boundary Source or Sink module which accept out of band channel inputs or
- * outputs.
- *
- *
- * @tparam IN
- * @tparam OUT
- * @tparam MAT
- */
-abstract class BridgeModule[IN, OUT, MAT] extends FlowModule[IN, OUT, MAT] {
-  def attributes: Attributes
-  def withAttributes(attributes: Attributes): BridgeModule[IN, OUT, MAT]
-
-  protected def newInstance: BridgeModule[IN, OUT, MAT]
-  override def carbonCopy: Module = newInstance
-}
-
-/**
- *
- * Bridge module which accept out of band channel Input
- * [[org.reactivestreams.Publisher]][IN].
- *
- *
- * [[SourceBridgeModule]] -> [[OUT]]
- * /|
- * /
- * out of band data input [[org.reactivestreams.Publisher]][IN]
- *
- * @see [[BridgeModule]]
- *
- * @param attributes
- * @tparam IN, input data type from out of band [[org.reactivestreams.Publisher]]
- * @tparam OUT out put data type to next module.
- */
-class SourceBridgeModule[IN, OUT](val attributes: Attributes = Attributes.name("sourceBridgeModule")) extends BridgeModule[IN, OUT, Subscriber[IN]] {
-  override protected def newInstance: BridgeModule[IN, OUT, Subscriber[IN]] = new SourceBridgeModule[IN, OUT](attributes)
-
-  override def withAttributes(attributes: Attributes): BridgeModule[IN, OUT, Subscriber[IN]] = {
-    new SourceBridgeModule(attributes)
-  }
-}
-
-/**
- *
- * Bridge module which accept out of band channel Output
- * [[org.reactivestreams.Subscriber]][OUT].
- *
- *
- * [[IN]] -> [[BridgeModule]]
- * \
- * \
- * \|
- * out of band data output [[org.reactivestreams.Subscriber]][OUT]
- *
- * @see [[BridgeModule]]
- *
- * @param attributes
- * @tparam IN, input data type from previous module
- * @tparam OUT out put data type to out of band subscriber
- */
-class SinkBridgeModule[IN, OUT](val attributes: Attributes = Attributes.name("sourceBridgeModule")) extends BridgeModule[IN, OUT, Publisher[OUT]] {
-  override protected def newInstance: BridgeModule[IN, OUT, Publisher[OUT]] = new SinkBridgeModule[IN, OUT](attributes)
-
-  override def withAttributes(attributes: Attributes): BridgeModule[IN, OUT, Publisher[OUT]] = {
-    new SinkBridgeModule[IN, OUT](attributes)
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/DummyModule.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/DummyModule.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/DummyModule.scala
deleted file mode 100644
index bc744f9..0000000
--- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/DummyModule.scala
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package akka.stream.gearpump.module
-
-import akka.stream.impl.StreamLayout.Module
-import akka.stream.impl.{SinkModule, SourceModule}
-import akka.stream.{Attributes, MaterializationContext, SinkShape, SourceShape}
-import org.reactivestreams.{Publisher, Subscriber}
-
-/**
- * [[DummyModule]] is a set of special module to help construct a RunnableGraph,
- * so that all ports are closed.
- *
- * In runtime, [[DummyModule]] should be ignored during materialization.
- *
- * For example, if you have a [[BridgeModule]] which only accept the input
- * message from out of band channel, then you can use DummySource to fake
- * a Message Source Like this.
- *
- * [[DummySource]] -> [[BridgeModule]] -> Sink
- *                      /|
- *                     /
- *       out of band input message [[Publisher]]
- *
- *  After materialization, [[DummySource]] will be removed.
-
- *              [[BridgeModule]] -> Sink
- *                      /|
- *                     /
- *           [[akka.stream.impl.PublisherSource]]
- *
- *
- */
-trait DummyModule extends Module
-
-/**
- *
- *    [[DummySource]]-> [[BridgeModule]] -> Sink
- *                        /|
- *                       /
- *       out of band input message Source
- *
- * @param attributes
- * @param shape
- * @tparam Out
- */
-class DummySource[Out](val attributes: Attributes, shape: SourceShape[Out])
-  extends SourceModule[Out, Unit](shape) with DummyModule {
-
-  override def create(context: MaterializationContext): (Publisher[Out], Unit) = {
-    throw new UnsupportedOperationException()
-  }
-
-  override protected def newInstance(shape: SourceShape[Out]): SourceModule[Out, Unit] = {
-    new DummySource[Out](attributes, shape)
-  }
-
-  override def withAttributes(attr: Attributes): Module = {
-    new DummySource(attr, amendShape(attr))
-  }
-}
-
-/**
- *
- *    Source-> [[BridgeModule]] -> [[DummySink]]
- *                    \
- *                     \
- *                      \|
- *                   out of band output message [[Subscriber]]
- *
- * @param attributes
- * @param shape
- */
-class DummySink[IN](val attributes: Attributes, shape: SinkShape[IN])
-  extends SinkModule[IN, Unit](shape) with DummyModule {
-  override def create(context: MaterializationContext): (Subscriber[IN], Unit) = {
-    throw new UnsupportedOperationException()
-  }
-
-  override protected def newInstance(shape: SinkShape[IN]): SinkModule[IN, Unit] = {
-    new DummySink[IN](attributes, shape)
-  }
-
-  override def withAttributes(attr: Attributes): Module = {
-    new DummySink[IN](attr, amendShape(attr))
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/GearpumpTaskModule.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/GearpumpTaskModule.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/GearpumpTaskModule.scala
deleted file mode 100644
index c4c78cc..0000000
--- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/GearpumpTaskModule.scala
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package akka.stream.gearpump.module
-
-import akka.stream.impl.FlowModule
-import akka.stream.impl.StreamLayout.Module
-import akka.stream.{Attributes, Inlet, Outlet, Shape, SinkShape, SourceShape}
-
-import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.sink.DataSink
-import org.apache.gearpump.streaming.source.DataSource
-import org.apache.gearpump.streaming.task.Task
-
-/**
- * [[GearpumpTaskModule]] represent modules that can be materialized as Gearpump Tasks.
- *
- * This is specially designed for Gearpump runtime. It is not supposed to be used
- * for local materializer.
- *
- */
-trait GearpumpTaskModule extends Module
-
-/**
- * This is used to represent the Gearpump Data Source
- * @param source
- * @param conf
- * @param shape
- * @param attributes
- * @tparam T
- */
-final case class SourceTaskModule[T](
-    source: DataSource,
-    conf: UserConfig,
-    shape: SourceShape[T] = SourceShape[T](Outlet[T]("SourceTaskModule.out")),
-    attributes: Attributes = Attributes.name("SourceTaskModule"))
-  extends GearpumpTaskModule {
-
-  override def subModules: Set[Module] = Set.empty
-  override def withAttributes(attr: Attributes): Module = {
-    this.copy(shape = amendShape(attr), attributes = attr)
-  }
-  override def carbonCopy: Module = {
-    this.copy(shape = SourceShape(Outlet[T]("SourceTaskModule.out")))
-  }
-
-  override def replaceShape(s: Shape): Module =
-    if (s == shape) this
-    else throw new UnsupportedOperationException("cannot replace the shape of SourceTaskModule")
-
-  private def amendShape(attr: Attributes): SourceShape[T] = {
-    val thisN = attributes.nameOrDefault(null)
-    val thatN = attr.nameOrDefault(null)
-
-    if ((thatN eq null) || thisN == thatN) shape
-    else shape.copy(outlet = Outlet(thatN + ".out"))
-  }
-}
-
-/**
- * This is used to represent the Gearpump Data Sink
- * @param sink
- * @param conf
- * @param shape
- * @param attributes
- * @tparam IN
- */
-final case class SinkTaskModule[IN](
-    sink: DataSink,
-    conf: UserConfig,
-    shape: SinkShape[IN] = SinkShape[IN](Inlet[IN]("SinkTaskModule.in")),
-    attributes: Attributes = Attributes.name("SinkTaskModule"))
-  extends GearpumpTaskModule {
-
-  override def subModules: Set[Module] = Set.empty
-  override def withAttributes(attr: Attributes): Module = {
-    this.copy(shape = amendShape(attr), attributes = attr)
-  }
-  override def carbonCopy: Module = this.copy(shape = SinkShape(Inlet[IN]("SinkTaskModule.in")))
-
-  override def replaceShape(s: Shape): Module =
-    if (s == shape) this
-    else throw new UnsupportedOperationException("cannot replace the shape of SinkTaskModule")
-
-  private def amendShape(attr: Attributes): SinkShape[IN] = {
-    val thisN = attributes.nameOrDefault(null)
-    val thatN = attr.nameOrDefault(null)
-
-    if ((thatN eq null) || thisN == thatN) shape
-    else shape.copy(inlet = Inlet(thatN + ".out"))
-  }
-}
-
-/**
- * This is to represent the Gearpump Processor which has exact one input and one output
- * @param processor
- * @param conf
- * @param attributes
- * @tparam IN
- * @tparam OUT
- * @tparam Unit
- */
-case class ProcessorModule[IN, OUT, Unit](
-    processor: Class[_ <: Task],
-    conf: UserConfig,
-    val attributes: Attributes = Attributes.name("processorModule"))
-  extends FlowModule[IN, OUT, Unit] with GearpumpTaskModule {
-
-  override def carbonCopy: Module = newInstance
-
-  protected def newInstance: ProcessorModule[IN, OUT, Unit] = {
-    new ProcessorModule[IN, OUT, Unit](processor, conf, attributes)
-  }
-
-  override def withAttributes(attributes: Attributes): ProcessorModule[IN, OUT, Unit] = {
-    new ProcessorModule[IN, OUT, Unit](processor, conf, attributes)
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/GroupByModule.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/GroupByModule.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/GroupByModule.scala
deleted file mode 100644
index e57a6f6..0000000
--- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/GroupByModule.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package akka.stream.gearpump.module
-
-import akka.stream.Attributes
-import akka.stream.impl.FlowModule
-import akka.stream.impl.StreamLayout.Module
-
-/**
- *
- * Group the T value groupBy function
- *
- * @param f
- * @param attributes
- * @tparam T
- * @tparam Group
- */
-case class GroupByModule[T, Group](val groupBy: T => Group,
-    val attributes: Attributes = Attributes.name("groupByModule")) extends FlowModule[T, T, Unit] {
-
-  override def carbonCopy: Module = newInstance
-
-  protected def newInstance: GroupByModule[T, Group] = {
-    new GroupByModule[T, Group](groupBy, attributes)
-  }
-
-  override def withAttributes(attributes: Attributes): GroupByModule[T, Group] = {
-    new GroupByModule[T, Group](groupBy, attributes)
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/ReduceModule.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/ReduceModule.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/ReduceModule.scala
deleted file mode 100644
index 926feb6..0000000
--- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/module/ReduceModule.scala
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package akka.stream.gearpump.module
-
-import akka.stream.Attributes
-import akka.stream.impl.FlowModule
-import akka.stream.impl.StreamLayout.Module
-
-/**
- *
- * Reduce Module
- *
- * @param f
- * @param attributes
- * @tparam T
- */
-case class ReduceModule[T](
-    val f: (T, T) => T, val attributes: Attributes = Attributes.name("reduceModule"))
-  extends FlowModule[T, T, Unit] {
-
-  override def carbonCopy: Module = newInstance
-
-  protected def newInstance: ReduceModule[T] = new ReduceModule[T](f, attributes)
-
-  override def withAttributes(attributes: Attributes): ReduceModule[T] = {
-    new ReduceModule[T](f, attributes)
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/akka/stream/gearpump/scaladsl/Api.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/scaladsl/Api.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/scaladsl/Api.scala
deleted file mode 100644
index 9cc46c9..0000000
--- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/scaladsl/Api.scala
+++ /dev/null
@@ -1,282 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package akka.stream.gearpump.scaladsl
-
-import akka.stream.Attributes
-import akka.stream.gearpump.module.{DummySink, DummySource, GroupByModule, ProcessorModule, ReduceModule, SinkBridgeModule, SinkTaskModule, SourceBridgeModule, SourceTaskModule}
-import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
-import org.reactivestreams.{Publisher, Subscriber}
-
-import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.sink.DataSink
-import org.apache.gearpump.streaming.source.DataSource
-import org.apache.gearpump.streaming.task.Task
-
-object GearSource {
-
-  /**
-   * Construct a Source which accepts out of band input messages.
-   *
-   *                   [[SourceBridgeModule]] -> Sink
-   *                          /
-   *                         /
-   *                        V
-   *                materialize to [[Subscriber]]
-   *                                   /|
-   *                                  /
-   *       upstream [[Publisher]] send out of band message
-   *
-   */
-  def bridge[IN, OUT]: Source[OUT, Subscriber[IN]] = {
-    val source = new Source(new DummySource[IN](Attributes.name("dummy"), Source.shape("dummy")))
-    val flow = new Flow[IN, OUT, Subscriber[IN]](new SourceBridgeModule[IN, OUT]())
-    source.viaMat(flow)(Keep.right)
-  }
-
-  /**
-   * Construct a Source from Gearpump [[DataSource]].
-   *
-   * [[SourceTaskModule]] -> downstream Sink
-   *
-   */
-  def from[OUT](source: DataSource): Source[OUT, Unit] = {
-    val taskSource = new Source[OUT, Unit](new SourceTaskModule(source, UserConfig.empty))
-    taskSource
-  }
-
-  /**
-   * Construct a Source from Gearpump [[org.apache.gearpump.streaming.Processor]].
-   *
-   * [[ProcessorModule]] -> downstream Sink
-   *
-   */
-  def from[OUT](processor: Class[_ <: Task], conf: UserConfig): Source[OUT, Unit] = {
-    val source = new Source(new DummySource[Unit](Attributes.name("dummy"), Source.shape("dummy")))
-    val flow = Processor.apply[Unit, OUT](processor, conf)
-    source.viaMat(flow)(Keep.right)
-  }
-}
-
-object GearSink {
-
-  /**
-   * Construct a Sink which output messages to a out of band channel.
-   *
-   *   Souce ->   [[SinkBridgeModule]]
-   *                    \
-   *                     \|
-   *         materialize to [[Publisher]]
-   *                              \
-   *                               \
-   *                                \|
-   *       send out of band message to downstream [[Subscriber]]
-   *
-   */
-  def bridge[IN, OUT]: Sink[IN, Publisher[OUT]] = {
-    val sink = new Sink(new DummySink[OUT](Attributes.name("dummy"), Sink.shape("dummy")))
-    val flow = new Flow[IN, OUT, Publisher[OUT]](new SinkBridgeModule[IN, OUT]())
-    flow.to(sink)
-  }
-
-  /**
-   * Construct a Sink from Gearpump [[DataSink]].
-   *
-   * Upstream Source -> [[SinkTaskModule]]
-   *
-   */
-  def to[IN](sink: DataSink): Sink[IN, Unit] = {
-    val taskSink = new Sink[IN, Unit](new SinkTaskModule(sink, UserConfig.empty))
-    taskSink
-  }
-
-  /**
-   * Construct a Sink from Gearpump [[org.apache.gearpump.streaming.Processor]].
-   *
-   * Upstream Source -> [[ProcessorModule]]
-   *
-   */
-  def to[IN](processor: Class[_ <: Task], conf: UserConfig): Sink[IN, Unit] = {
-    val sink = new Sink(new DummySink[Unit](Attributes.name("dummy"), Sink.shape("dummy")))
-    val flow = Processor.apply[IN, Unit](processor, conf)
-    flow.to(sink)
-  }
-}
-
-/**
- *
- * GroupBy will divide the main input stream to a set of sub-streams.
- * This is a work-around to bypass the limitation of official API Flow.groupBy
- *
- *
- * For example, to do a word count, we can write code like this:
- *
- * case class KV(key: String, value: String)
- * case class Count(key: String, count: Int)
- *
- * val flow: Flow[KV] = GroupBy[KV](foo).map{ kv =>
- *   Count(kv.key, 1)
- * }.fold(Count(null, 0)){(base, add) =>
- *   Count(add.key, base.count + add.count)
- * }.log("count of current key")
- * .flatten()
- * .to(sink)
- *
- * map, fold will transform data on all sub-streams, If there are 10 groups,
- * then there will be 10 sub-streams, and for each sub-stream, there will be
- * a map and fold.
- *
- * flatten will collect all sub-stream into the main stream,
- *
- * sink will only operate on the main stream.
- *
- */
-object GroupBy {
-  def apply[T, Group](groupBy: T => Group): Flow[T, T, Unit] = {
-    new Flow[T, T, Unit](new GroupByModule(groupBy))
-  }
-}
-
-/**
- * Aggregate on the data.
- *
- * val flow = Reduce({(a: Int, b: Int) => a + b})
- *
- *
- */
-object Reduce {
-  def apply[T](reduce: (T, T) => T): Flow[T, T, Unit] = {
-    new Flow[T, T, Unit](new ReduceModule(reduce))
-  }
-}
-
-/**
- * Create a Flow by providing a Gearpump Processor class and configuration
- *
- *
- */
-object Processor {
-  def apply[In, Out](processor: Class[_ <: Task], conf: UserConfig): Flow[In, Out, Unit] = {
-    new Flow[In, Out, Unit](new ProcessorModule[In, Out, Unit](processor, conf))
-  }
-}
-
-object Implicits {
-
-  /**
-   * Help util to support reduce and groupBy
-   */
-  implicit class SourceOps[T, Mat](source: Source[T, Mat]) {
-
-    //TODO It is named as groupBy2 to avoid conflict with built-in
-    // groupBy. Eventually, we think the built-in groupBy should
-    // be replace with this implementation.
-    def groupBy2[Group](groupBy: T => Group): Source[T, Mat] = {
-      val stage = GroupBy.apply(groupBy)
-      source.via[T, Unit](stage)
-    }
-
-    def reduce(reduce: (T, T) => T): Source[T, Mat] = {
-      val stage = Reduce.apply(reduce)
-      source.via[T, Unit](stage)
-    }
-
-    def process[R](processor: Class[_ <: Task], conf: UserConfig): Source[R, Mat] = {
-      val stage = Processor.apply[T, R](processor, conf)
-      source.via(stage)
-    }
-  }
-
-  /**
-   * Help util to support reduce and groupBy
-   */
-  implicit class FlowOps[IN, OUT, Mat](flow: Flow[IN, OUT, Mat]) {
-    def groupBy2[Group](groupBy: OUT => Group): Flow[IN, OUT, Mat] = {
-      val stage = GroupBy.apply(groupBy)
-      flow.via(stage)
-    }
-
-    def reduce(reduce: (OUT, OUT) => OUT): Flow[IN, OUT, Mat] = {
-      val stage = Reduce.apply(reduce)
-      flow.via(stage)
-    }
-
-    def process[R](processor: Class[_ <: Task], conf: UserConfig): Flow[IN, R, Mat] = {
-      val stage = Processor.apply[OUT, R](processor, conf)
-      flow.via(stage)
-    }
-  }
-
-  /**
-   * Help util to support groupByKey and sum
-   */
-  implicit class KVSourceOps[K, V, Mat](source: Source[(K, V), Mat]) {
-
-    /**
-     * if it is a KV Pair, we can group the KV pair by the key.
-     * @return
-     */
-    def groupByKey: Source[(K, V), Mat] = {
-      val stage = GroupBy.apply(getTupleKey[K, V])
-      source.via(stage)
-    }
-
-    /**
-     * Does sum on values
-     *
-     * Before doing this, you need to do groupByKey to group same key together
-     * , otherwise, it will do the sum no matter what current key is.
-     */
-    def sumOnValue(implicit numeric: Numeric[V]): Source[(K, V), Mat] = {
-      val stage = Reduce.apply(sumByKey[K, V](numeric))
-      source.via(stage)
-    }
-  }
-
-  /**
-   * Helper util to support groupByKey and sum
-   */
-  implicit class KVFlowOps[K, V, Mat](flow: Flow[(K, V), (K, V), Mat]) {
-
-    /**
-     * If it is a KV Pair, we can group the KV pair by the key.
-     *
-     */
-    def groupByKey: Flow[(K, V), (K, V), Mat] = {
-      val stage = GroupBy.apply(getTupleKey[K, V])
-      flow.via(stage)
-    }
-
-    /**
-     * do sum on values
-     *
-     * Before doing this, you need to do groupByKey to group same key together
-     * , otherwise, it will do the sum no matter what current key is.
-     *
-     */
-    def sumOnValue(implicit numeric: Numeric[V]): Flow[(K, V), (K, V), Mat] = {
-      val stage = Reduce.apply(sumByKey[K, V](numeric))
-      flow.via(stage)
-    }
-  }
-
-  private def getTupleKey[K, V](tuple: Tuple2[K, V]): K = tuple._1
-
-  private def sumByKey[K, V](numeric: Numeric[V]): (Tuple2[K, V], Tuple2[K, V]) => Tuple2[K, V] =
-    (tuple1, tuple2) => Tuple2(tuple1._1, numeric.plus(tuple1._2, tuple2._2))
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/BalanceTask.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/BalanceTask.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/BalanceTask.scala
deleted file mode 100644
index 2eb0612..0000000
--- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/BalanceTask.scala
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package akka.stream.gearpump.task
-
-import org.apache.gearpump.Message
-import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.task.TaskContext
-
-class BalanceTask(context: TaskContext, userConf: UserConfig) extends GraphTask(context, userConf) {
-
-  val sizeOfOutputs = sizeOfOutPorts
-  var index = 0
-
-  override def onNext(msg: Message): Unit = {
-    output(index, msg)
-    index += 1
-    if (index == sizeOfOutputs) {
-      index = 0
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/BroadcastTask.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/BroadcastTask.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/BroadcastTask.scala
deleted file mode 100644
index 925bf21..0000000
--- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/BroadcastTask.scala
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package akka.stream.gearpump.task
-
-import org.apache.gearpump.Message
-import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.task.TaskContext
-
-class BroadcastTask(context: TaskContext, userConf: UserConfig) extends GraphTask(context, userConf) {
-  override def onNext(msg: Message): Unit = {
-    context.output(msg)
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/GraphTask.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/GraphTask.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/GraphTask.scala
deleted file mode 100644
index 9a4e24e..0000000
--- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/GraphTask.scala
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package akka.stream.gearpump.task
-
-import akka.stream.gearpump.task.GraphTask.{Index, PortId}
-
-import org.apache.gearpump.Message
-import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.ProcessorId
-import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext, TaskWrapper}
-
-class GraphTask(inputTaskContext: TaskContext, userConf: UserConfig)
-  extends Task(inputTaskContext, userConf) {
-
-  private val context = inputTaskContext.asInstanceOf[TaskWrapper]
-  private val outMapping = portsMapping(userConf.getValue[List[ProcessorId]](
-    GraphTask.OUT_PROCESSORS).get)
-  private val inMapping = portsMapping(userConf.getValue[List[ProcessorId]](
-    GraphTask.IN_PROCESSORS).get)
-
-  val sizeOfOutPorts = outMapping.keys.size
-  val sizeOfInPorts = inMapping.keys.size
-
-  private def portsMapping(processors: List[ProcessorId]): Map[PortId, Index] = {
-    val portToProcessor = processors.zipWithIndex.map { kv =>
-      (kv._2, kv._1)
-    }.toMap
-
-    val processorToIndex = processors.sorted.zipWithIndex.toMap
-
-    val portToIndex = portToProcessor.map { kv =>
-      val (outlet, processorId) = kv
-      val index = processorToIndex(processorId)
-      (outlet, index)
-    }
-    portToIndex
-  }
-
-  def output(outletId: Int, msg: Message): Unit = {
-    context.output(outMapping(outletId), msg)
-  }
-
-  override def onStart(startTime: StartTime): Unit = {}
-
-  override def onStop(): Unit = {}
-}
-
-object GraphTask {
-  val OUT_PROCESSORS = "akka.stream.gearpump.task.outprocessors"
-  val IN_PROCESSORS = "akka.stream.gearpump.task.inprocessors"
-
-  type PortId = Int
-  type Index = Int
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/SinkBridgeTask.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/SinkBridgeTask.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/SinkBridgeTask.scala
deleted file mode 100644
index b681852..0000000
--- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/SinkBridgeTask.scala
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package akka.stream.gearpump.task
-
-import java.util
-import java.util.concurrent.TimeUnit
-
-import akka.actor.Actor.Receive
-import akka.actor.{Actor, ActorRef, ActorSystem, Props}
-import akka.stream.gearpump.task.SinkBridgeTask.RequestMessage
-import akka.util.Timeout
-import org.reactivestreams.{Publisher, Subscriber, Subscription}
-
-import org.apache.gearpump.Message
-import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.cluster.client.ClientContext
-import org.apache.gearpump.streaming.ProcessorId
-import org.apache.gearpump.streaming.appmaster.AppMaster.{LookupTaskActorRef, TaskActorRef}
-import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext, TaskId}
-import org.apache.gearpump.util.LogUtil
-
-/**
- * Bridge Task when data flow is from remote Gearpump Task to local Akka-Stream Module
- *
- *
- * upstream [[Task]] -> [[SinkBridgeTask]]
- *                         \              Remote Cluster
- * -------------------------\----------------------
- *                           \            Local JVM
- *                            \|
- *                       Akka Stream [[Subscriber]]
- *
- */
-class SinkBridgeTask(taskContext: TaskContext, userConf: UserConfig) extends Task(taskContext, userConf) {
-  import taskContext.taskId
-
-  val queue = new util.LinkedList[Message]()
-  var subscriber: ActorRef = null
-
-  var request: Int = 0
-
-  override def onStart(startTime: StartTime): Unit = {}
-
-  override def onNext(msg: Message): Unit = {
-    queue.add(msg)
-    trySendingData()
-  }
-
-  override def onStop(): Unit = {}
-
-  private def trySendingData(): Unit = {
-    if (subscriber != null) {
-      (0 to request).map(_ => queue.poll()).filter(_ != null).foreach { msg =>
-        subscriber ! msg.msg
-        request -= 1
-      }
-    }
-  }
-
-  override def receiveUnManagedMessage: Receive = {
-    case RequestMessage(n) =>
-      this.subscriber = sender
-      LOG.info("the downstream has requested " + n + " messages from " + subscriber)
-      request += n.toInt
-      trySendingData()
-    case msg =>
-      LOG.error("Failed! Received unknown message " + "taskId: " + taskId + ", " + msg.toString)
-  }
-}
-
-object SinkBridgeTask {
-
-  case class RequestMessage(number: Int)
-
-  class SinkBridgeTaskClient(system: ActorSystem, context: ClientContext, appId: Int, processorId: ProcessorId) extends Publisher[AnyRef] with Subscription {
-    private val taskId = TaskId(processorId, index = 0)
-
-    private val LOG = LogUtil.getLogger(getClass)
-
-    private var actor: ActorRef = null
-    import system.dispatcher
-
-    private val task = context.askAppMaster[TaskActorRef](appId, LookupTaskActorRef(taskId)).map { container =>
-      // println("Successfully resolved taskRef for taskId " + taskId + ", " + container.task)
-      container.task
-    }
-
-    override def subscribe(subscriber: Subscriber[_ >: AnyRef]): Unit = {
-      this.actor = system.actorOf(Props(new ClientActor(subscriber)))
-      subscriber.onSubscribe(this)
-    }
-
-    override def cancel(): Unit = Unit
-
-    private implicit val timeout = Timeout(5, TimeUnit.SECONDS)
-
-    override def request(l: Long): Unit = {
-      task.foreach { task =>
-        task.tell(RequestMessage(l.toInt), actor)
-      }
-    }
-  }
-
-  class ClientActor(subscriber: Subscriber[_ >: AnyRef]) extends Actor {
-    def receive: Receive = {
-      case result: AnyRef => subscriber.onNext(result)
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/SourceBridgeTask.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/SourceBridgeTask.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/SourceBridgeTask.scala
deleted file mode 100644
index ccbd350..0000000
--- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/SourceBridgeTask.scala
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package akka.stream.gearpump.task
-
-import scala.concurrent.ExecutionContext
-
-import akka.actor.Actor.Receive
-import akka.stream.gearpump.task.SourceBridgeTask.{AkkaStreamMessage, Complete, Error}
-import org.reactivestreams.{Subscriber, Subscription}
-
-import org.apache.gearpump.Message
-import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.cluster.client.ClientContext
-import org.apache.gearpump.streaming.ProcessorId
-import org.apache.gearpump.streaming.appmaster.AppMaster.{LookupTaskActorRef, TaskActorRef}
-import org.apache.gearpump.streaming.task.{StartTime, Task, TaskContext, TaskId}
-
-/**
- * Bridge Task when data flow is from local Akka-Stream Module to remote Gearpump Task
- *
- *
- *
- *      [[SourceBridgeTask]]   --> downstream [[Task]]
- *                 /|                Remote Cluster
- * ---------------/--------------------------------
- *               /                    Local JVM
- *    Akka Stream [[org.reactivestreams.Publisher]]
- *
- */
-class SourceBridgeTask(taskContext: TaskContext, userConf: UserConfig) extends Task(taskContext, userConf) {
-  import taskContext.taskId
-
-  override def onStart(startTime: StartTime): Unit = {}
-
-  override def onNext(msg: Message): Unit = {
-    LOG.info("AkkaStreamSource receiving message " + msg)
-  }
-
-  override def onStop(): Unit = {}
-
-  override def receiveUnManagedMessage: Receive = {
-    case Error(ex) =>
-      LOG.error("the stream has error", ex)
-    case AkkaStreamMessage(msg) =>
-      LOG.error("we have received message from akka stream source: " + msg)
-      taskContext.output(Message(msg, System.currentTimeMillis()))
-    case Complete(description) =>
-      LOG.error("the stream is completed: " + description)
-    case msg =>
-      LOG.error("Failed! Received unknown message " + "taskId: " + taskId + ", " + msg.toString)
-  }
-}
-
-object SourceBridgeTask {
-  case class Error(ex: java.lang.Throwable)
-
-  case class Complete(description: String)
-
-  case class AkkaStreamMessage(msg: AnyRef)
-
-  class SourceBridgeTaskClient[T <: AnyRef](ec: ExecutionContext, context: ClientContext, appId: Int, processorId: ProcessorId) extends Subscriber[T] {
-    val taskId = TaskId(processorId, 0)
-    var subscription: Subscription = null
-    implicit val dispatcher = ec
-
-    val task = context.askAppMaster[TaskActorRef](appId, LookupTaskActorRef(taskId)).map { container =>
-      // println("Successfully resolved taskRef for taskId " + taskId + ", " + container.task)
-      container.task
-    }
-
-    override def onError(throwable: Throwable): Unit = {
-      task.map(task => task ! Error(throwable))
-    }
-
-    override def onSubscribe(subscription: Subscription): Unit = {
-      this.subscription = subscription
-      task.map(task => subscription.request(1))
-    }
-
-    override def onComplete(): Unit = {
-      task.map(task => task ! Complete("the upstream is completed"))
-    }
-
-    override def onNext(t: T): Unit = {
-      task.map { task =>
-        task ! AkkaStreamMessage(t)
-      }
-      subscription.request(1)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/UnZip2Task.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/UnZip2Task.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/UnZip2Task.scala
deleted file mode 100644
index 78fabbe..0000000
--- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/task/UnZip2Task.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package akka.stream.gearpump.task
-
-import akka.stream.gearpump.task.UnZip2Task.UnZipFunction
-
-import org.apache.gearpump.Message
-import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.task.TaskContext
-
-class UnZip2Task(context: TaskContext, userConf: UserConfig) extends GraphTask(context, userConf) {
-
-  val unzip = userConf.getValue[UnZipFunction](UnZip2Task.UNZIP2_FUNCTION)(context.system).get.unzip
-
-  override def onNext(msg: Message): Unit = {
-    val message = msg.msg
-    val time = msg.timestamp
-    val pair = unzip(message)
-    val (a, b) = pair
-    output(0, Message(a.asInstanceOf[AnyRef], time))
-    output(1, Message(b.asInstanceOf[AnyRef], time))
-  }
-}
-
-object UnZip2Task {
-  class UnZipFunction(val unzip: Any => (Any, Any)) extends Serializable
-
-  val UNZIP2_FUNCTION = "akka.stream.gearpump.task.unzip2.function"
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/akka/stream/gearpump/util/MaterializedValueOps.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/akka/stream/gearpump/util/MaterializedValueOps.scala b/experiments/akkastream/src/main/scala/akka/stream/gearpump/util/MaterializedValueOps.scala
deleted file mode 100644
index c774fc7..0000000
--- a/experiments/akkastream/src/main/scala/akka/stream/gearpump/util/MaterializedValueOps.scala
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package akka.stream.gearpump.util
-
-import akka.stream.impl.StreamLayout.{Atomic, Combine, Ignore, MaterializedValueNode, Module, Transform}
-
-class MaterializedValueOps(mat: MaterializedValueNode) {
-  def resolve[Mat](materializedValues: Map[Module, Any]): Mat = {
-    def resolveMaterialized(mat: MaterializedValueNode, materializedValues: Map[Module, Any]): Any = mat match {
-      case Atomic(m) => materializedValues.getOrElse(m, ())
-      case Combine(f, d1, d2) => f(resolveMaterialized(d1, materializedValues), resolveMaterialized(d2, materializedValues))
-      case Transform(f, d) => f(resolveMaterialized(d, materializedValues))
-      case Ignore => ()
-    }
-    resolveMaterialized(mat, materializedValues).asInstanceOf[Mat]
-  }
-}
-
-object MaterializedValueOps {
-  def apply(mat: MaterializedValueNode): MaterializedValueOps = new MaterializedValueOps(mat)
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4fe5458f/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/GearAttributes.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/GearAttributes.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/GearAttributes.scala
new file mode 100644
index 0000000..016a7b2
--- /dev/null
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/GearAttributes.scala
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.akkastream
+
+import akka.stream.Attributes
+import akka.stream.Attributes.Attribute
+
+object GearAttributes {
+  /**
+   * Define how many parallel instance we want to use to run this module
+   * @param count Int
+   * @return
+   */
+  def count(count: Int): Attributes = Attributes(ParallismAttribute(count))
+
+  /**
+   * Define we want to render this module locally.
+   * @return
+   */
+  def local: Attributes = Attributes(LocationAttribute(Local))
+
+  /**
+   * Define we want to render this module remotely
+   * @return
+   */
+  def remote: Attributes = Attributes(LocationAttribute(Remote))
+
+  /**
+   * Get the effective location settings if child override the parent
+   * setttings.
+   *
+   * @param attrs Attributes
+   * @return
+   */
+  def location(attrs: Attributes): Location = {
+    attrs.attributeList.foldLeft(Local: Location) { (s, attr) =>
+      attr match {
+        case LocationAttribute(location) => location
+        case other => s
+      }
+    }
+  }
+
+  /**
+   * get effective parallelism settings if child override parent.
+   * @param attrs Attributes
+   * @return
+   */
+  def count(attrs: Attributes): Int = {
+    attrs.attributeList.foldLeft(1) { (s, attr) =>
+      attr match {
+        case ParallismAttribute(count) => count
+        case other => s
+      }
+    }
+  }
+
+  /**
+   * Where we want to render the module
+   */
+  sealed trait Location
+  object Local extends Location
+  object Remote extends Location
+
+  final case class LocationAttribute(tag: Location) extends Attribute
+
+  /**
+   * How many parallel instance we want to use for this module.
+   *
+   * @param parallelism Int
+   */
+  final case class ParallismAttribute(parallelism: Int) extends Attribute
+}