You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2015/11/19 11:47:43 UTC

[36/55] [abbrv] [partial] incubator-eagle git commit: [EAGLE-46] Rename package name as "org.apache.eagle"

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/AbstractTopologyExecutor.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/AbstractTopologyExecutor.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/AbstractTopologyExecutor.scala
new file mode 100644
index 0000000..9ac3cc9
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/AbstractTopologyExecutor.scala
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.datastream
+
+trait AbstractTopologyExecutor {
+  def execute
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/AlertExecutorConsumerUtils.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/AlertExecutorConsumerUtils.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/AlertExecutorConsumerUtils.scala
new file mode 100644
index 0000000..cb6ac4f
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/AlertExecutorConsumerUtils.scala
@@ -0,0 +1,75 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.datastream
+
+import java.util
+
+import org.apache.eagle.alert.dedup.{AlertEmailDeduplicationExecutor, AlertEntityDeduplicationExecutor}
+import org.apache.eagle.alert.notification.AlertNotificationExecutor
+import org.apache.eagle.alert.persist.AlertPersistExecutor
+import org.apache.eagle.executor.AlertExecutor
+import org.slf4j.{Logger, LoggerFactory}
+
+import scala.collection.mutable.ListBuffer
+
+/**
+ * Create alert executors and provide callback for programmer to link alert executor to immediate parent executors
+ *
+ * <br/><br/>
+ * Explanations for programId, alertExecutorId and policy<br/><br/>
+ * - programId - distributed or single-process program for example one storm topology<br/>
+ * - alertExecutorId - one process/thread which executes multiple policies<br/>
+ * - policy - some rules to be evaluated<br/>
+ *
+ * <br/>
+ *
+ * Normally the mapping is like following:
+ * <pre>
+ * programId (1:N) alertExecutorId
+ * alertExecutorId (1:N) policy
+ * </pre>
+ */
+
+object AlertExecutorConsumerUtils {
+  private val LOG: Logger = LoggerFactory.getLogger(AlertExecutorConsumerUtils.getClass)
+
+  def setupAlertConsumers(toBeAddedEdges: ListBuffer[StreamConnector], alertStreamProducers: List[StreamProducer]): Unit = {
+    var alertExecutorIdList : java.util.List[String] = new util.ArrayList[String]()
+    alertStreamProducers.map(x =>
+      alertExecutorIdList.add(x.asInstanceOf[FlatMapProducer[AnyRef, AnyRef]].mapper.asInstanceOf[AlertExecutor].getAlertExecutorId));
+    val alertDefDao = alertStreamProducers.head.asInstanceOf[FlatMapProducer[AnyRef, AnyRef]].mapper.asInstanceOf[AlertExecutor].getAlertDefinitionDao
+    val entityDedupExecutor: AlertEntityDeduplicationExecutor = new AlertEntityDeduplicationExecutor(alertExecutorIdList, alertDefDao)
+    val emailDedupExecutor: AlertEmailDeduplicationExecutor = new AlertEmailDeduplicationExecutor(alertExecutorIdList, alertDefDao)
+    val notificationExecutor: AlertNotificationExecutor = new AlertNotificationExecutor(alertExecutorIdList, alertDefDao)
+    val persistExecutor: AlertPersistExecutor = new AlertPersistExecutor
+
+    val entityDedupStreamProducer = FlatMapProducer(UniqueId.incrementAndGetId(),entityDedupExecutor)
+    val persistStreamProducer = FlatMapProducer(UniqueId.incrementAndGetId(),persistExecutor)
+    val emailDedupStreamProducer = FlatMapProducer(UniqueId.incrementAndGetId(),emailDedupExecutor)
+    val notificationStreamProducer = FlatMapProducer(UniqueId.incrementAndGetId(),notificationExecutor)
+    toBeAddedEdges += StreamConnector(entityDedupStreamProducer, persistStreamProducer)
+    toBeAddedEdges += StreamConnector(emailDedupStreamProducer, notificationStreamProducer)
+
+    alertStreamProducers.foreach(sp => {
+      toBeAddedEdges += StreamConnector(sp, entityDedupStreamProducer)
+      toBeAddedEdges += StreamConnector(sp, emailDedupStreamProducer)
+    })
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/ExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/ExecutionEnvironment.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/ExecutionEnvironment.scala
new file mode 100644
index 0000000..488d52a
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/ExecutionEnvironment.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.datastream
+
+import backtype.storm.topology.base.BaseRichSpout
+import com.typesafe.config.{Config, ConfigFactory}
+import org.apache.eagle.dataproc.impl.storm.AbstractStormSpoutProvider
+import org.jgrapht.experimental.dag.DirectedAcyclicGraph
+import org.slf4j.LoggerFactory
+
+object ExecutionEnvironmentFactory{
+
+  def getStorm(config : Config) = new StormExecutionEnvironment(config)
+  def getStorm:StormExecutionEnvironment = {
+    val config = ConfigFactory.load()
+    getStorm(config)
+  }
+}
+
+abstract class ExecutionEnvironment(config : Config){
+  def execute()
+}
+
+class StormExecutionEnvironment(config: Config) extends ExecutionEnvironment(config){
+  val LOG = LoggerFactory.getLogger(classOf[StormExecutionEnvironment])
+  val dag = new DirectedAcyclicGraph[StreamProducer, StreamConnector](classOf[StreamConnector])
+
+  override def execute() : Unit = {
+    LOG.info("initial graph:\n")
+    GraphPrinter.print(dag)
+    new StreamAlertExpansion(config).expand(dag)
+    LOG.info("after StreamAlertExpansion graph:")
+    GraphPrinter.print(dag)
+    new StreamUnionExpansion(config).expand(dag)
+    LOG.info("after StreamUnionExpansion graph:")
+    GraphPrinter.print(dag)
+    new StreamGroupbyExpansion(config).expand(dag)
+    LOG.info("after StreamGroupbyExpansion graph:")
+    GraphPrinter.print(dag)
+    new StreamNameExpansion(config).expand(dag)
+    LOG.info("after StreamNameExpansion graph:")
+    GraphPrinter.print(dag)
+    new StreamParallelismConfigExpansion(config).expand(dag)
+    LOG.info("after StreamParallelismConfigExpansion graph:")
+    GraphPrinter.print(dag)
+    val stormDag = StormStreamDAGTransformer.transform(dag)
+    StormTopologyCompiler(config, stormDag).buildTopology.execute
+  }
+
+  def newSource(source: BaseRichSpout): StormSourceProducer ={
+    val ret = StormSourceProducer(UniqueId.incrementAndGetId(), source)
+    ret.config = config
+    ret.graph = dag
+    dag.addVertex(ret)
+    ret
+  }
+
+  def newSource(sourceProvider: AbstractStormSpoutProvider):StormSourceProducer = newSource(sourceProvider.getSpout(config))
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/FilterBoltWrapper.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/FilterBoltWrapper.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/FilterBoltWrapper.scala
new file mode 100644
index 0000000..d31759b
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/FilterBoltWrapper.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.datastream
+
+import java.util
+
+import backtype.storm.task.{OutputCollector, TopologyContext}
+import backtype.storm.topology.OutputFieldsDeclarer
+import backtype.storm.topology.base.BaseRichBolt
+import backtype.storm.tuple.{Fields, Tuple}
+import org.slf4j.LoggerFactory
+
+case class FilterBoltWrapper[T](fn : T => Boolean) extends BaseRichBolt{
+  val LOG = LoggerFactory.getLogger(FilterBoltWrapper.getClass)
+  var _collector : OutputCollector = null
+
+  override def prepare(stormConf: util.Map[_, _], context: TopologyContext, collector: OutputCollector): Unit = {
+    _collector = collector
+  }
+
+  override def execute(input : Tuple): Unit = {
+    input.getValue(0) match {
+      case v:T =>
+        if(fn(v)){
+          _collector.emit(input, input.getValues)
+          _collector.ack(input)
+        }
+    }
+  }
+
+  override def declareOutputFields(declarer : OutputFieldsDeclarer): Unit ={
+    declarer.declare(new Fields(OutputFieldNameConst.FIELD_PREFIX + "0"))
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/GraphPrinter.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/GraphPrinter.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/GraphPrinter.scala
new file mode 100644
index 0000000..1925a89
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/GraphPrinter.scala
@@ -0,0 +1,37 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.datastream
+
+import org.jgrapht.experimental.dag.DirectedAcyclicGraph
+import org.slf4j.LoggerFactory
+import scala.collection.JavaConversions._
+
+object GraphPrinter {
+  private val LOG = LoggerFactory.getLogger(GraphPrinter.getClass)
+  def print(dag: DirectedAcyclicGraph[StreamProducer, StreamConnector]): Unit ={
+    val iter = dag.iterator()
+    while(iter.hasNext) {
+      val current = iter.next()
+      dag.outgoingEdgesOf(current).foreach(edge => {
+        LOG.info(edge.from + "{" + edge.from.parallelism + "}" +" => " + edge.to + "{" + edge.to.parallelism + "}" + " with groupByFields " + edge.groupByFields)
+      })
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/JavaStormBoltWrapper.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/JavaStormBoltWrapper.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/JavaStormBoltWrapper.scala
new file mode 100644
index 0000000..8ebfd7b
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/JavaStormBoltWrapper.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.datastream
+
+import java.util
+
+import backtype.storm.task.{OutputCollector, TopologyContext}
+import backtype.storm.topology.OutputFieldsDeclarer
+import backtype.storm.topology.base.BaseRichBolt
+import backtype.storm.tuple.{Fields, Tuple}
+import org.slf4j.LoggerFactory
+
+import scala.collection.JavaConverters._
+
+case class JavaStormBoltWrapper(worker : JavaStormStreamExecutor[EagleTuple]) extends BaseRichBolt{
+  val LOG = LoggerFactory.getLogger(StormBoltWrapper.getClass)
+  var _collector : OutputCollector = null
+
+  override def prepare(stormConf: util.Map[_, _], context: TopologyContext, collector: OutputCollector): Unit = {
+    _collector = collector
+    worker.init
+  }
+
+  override def execute(input : Tuple): Unit ={
+    worker.flatMap(input.getValues, new Collector[EagleTuple](){
+      def collect(t: EagleTuple): Unit ={
+        _collector.emit(input, t.getList.asJava)
+      }
+    })
+    _collector.ack(input)
+  }
+
+  override def declareOutputFields(declarer : OutputFieldsDeclarer): Unit ={
+    val fields = worker.fields
+    LOG.info("output fields for worker " + worker + " : " + fields.toList)
+    declarer.declare(new Fields(fields:_*))
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/MapBoltWrapper.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/MapBoltWrapper.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/MapBoltWrapper.scala
new file mode 100644
index 0000000..99fa32a
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/MapBoltWrapper.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.datastream
+
+import java.util
+
+import backtype.storm.task.{OutputCollector, TopologyContext}
+import backtype.storm.topology.OutputFieldsDeclarer
+import backtype.storm.topology.base.BaseRichBolt
+import backtype.storm.tuple.{Fields, Tuple}
+import org.slf4j.LoggerFactory
+
+/**
+ * @since  9/29/15
+ */
+case class MapBoltWrapper[T,R](num: Int, fn: T => R) extends BaseRichBolt {
+  val LOG = LoggerFactory.getLogger(FilterBoltWrapper.getClass)
+  var _collector : OutputCollector = null
+
+  override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = {
+    var fields = new util.ArrayList[String]()
+    var i : Int = 0;
+    while(i < num){
+      fields.add(OutputFieldNameConst.FIELD_PREFIX + i)
+      i += 1
+    }
+    declarer.declare(new Fields(fields))
+  }
+
+  override def execute(input: Tuple): Unit = {
+    val size = input.size()
+    var values : AnyRef = null
+    size match {
+      case 1 => values = scala.Tuple1(input.getValue(0))
+      case 2 => values = scala.Tuple2(input.getValue(0), input.getValue(1))
+      case 3 => values = scala.Tuple3(input.getValue(0), input.getValue(1), input.getValue(2))
+      case 4 => values = scala.Tuple4(input.getValue(0), input.getValue(1), input.getValue(2), input.getValue(3))
+      case _ => throw new IllegalArgumentException
+    }
+    val output = fn(values.asInstanceOf[T])
+    output match {
+      case scala.Tuple1(a) => _collector.emit(input, util.Arrays.asList(a.asInstanceOf[AnyRef]))
+      case scala.Tuple2(a, b) => _collector.emit(input, util.Arrays.asList(a.asInstanceOf[AnyRef], b.asInstanceOf[AnyRef]))
+      case scala.Tuple3(a, b, c) => _collector.emit(input, util.Arrays.asList(a.asInstanceOf[AnyRef], b.asInstanceOf[AnyRef], c.asInstanceOf[AnyRef]))
+      case scala.Tuple4(a, b, c, d) => _collector.emit(input, util.Arrays.asList(a.asInstanceOf[AnyRef], b.asInstanceOf[AnyRef], c.asInstanceOf[AnyRef], d.asInstanceOf[AnyRef]))
+      case a => _collector.emit(input, util.Arrays.asList(a.asInstanceOf[AnyRef]))
+    }
+    _collector.ack(input)
+  }
+
+  override def prepare(stormConf: util.Map[_, _], context: TopologyContext, collector: OutputCollector): Unit = {
+    _collector = collector
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/NodeNameSelector.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/NodeNameSelector.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/NodeNameSelector.scala
new file mode 100644
index 0000000..8b06322
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/NodeNameSelector.scala
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.datastream
+
+case class NodeNameSelector(producer : StreamProducer) {
+  def getName : String = {
+    producer.name match {
+      case null => producer.toString
+      case _ => producer.name
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/OutputFieldNameConst.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/OutputFieldNameConst.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/OutputFieldNameConst.scala
new file mode 100644
index 0000000..64659b7
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/OutputFieldNameConst.scala
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.datastream
+
+object OutputFieldNameConst {
+  val FIELD_PREFIX = "f"
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/SpoutProxy.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/SpoutProxy.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/SpoutProxy.scala
new file mode 100644
index 0000000..aca3b5b
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/SpoutProxy.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.datastream
+
+import backtype.storm.spout.SpoutOutputCollector
+import backtype.storm.task.TopologyContext
+import backtype.storm.topology.OutputFieldsDeclarer
+import backtype.storm.topology.base.BaseRichSpout
+import backtype.storm.tuple.Fields
+
+/**
+ * Declare delegated BaseRichSpout with given field names
+ *
+ * @param delegate delegated BaseRichSpout
+ * @param outputFields given field names
+ */
+case class SpoutProxy(delegate: BaseRichSpout, outputFields: java.util.List[String]) extends BaseRichSpout{
+  def open(conf: java.util.Map[_, _], context: TopologyContext, collector: SpoutOutputCollector) {
+    this.delegate.open(conf, context, collector)
+  }
+
+  def nextTuple {
+    this.delegate.nextTuple
+  }
+
+  override def ack(msgId: AnyRef) {
+    this.delegate.ack(msgId)
+  }
+
+  override def fail(msgId: AnyRef) {
+    this.delegate.fail(msgId)
+  }
+
+  override def deactivate {
+    this.delegate.deactivate
+  }
+
+  override def declareOutputFields(declarer: OutputFieldsDeclarer) {
+    declarer.declare(new Fields(outputFields))
+  }
+
+  override def close {
+    this.delegate.close
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StormBoltFactory.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StormBoltFactory.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StormBoltFactory.scala
new file mode 100644
index 0000000..2a2e268
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StormBoltFactory.scala
@@ -0,0 +1,47 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+package org.apache.eagle.datastream
+
+import backtype.storm.topology.base.BaseRichBolt
+import com.typesafe.config.Config
+
+object StormBoltFactory {
+  def getBoltWrapper(graph: AbstractStreamProducerGraph, producer : StreamProducer, config : Config) : BaseRichBolt = {
+    producer match{
+      case FlatMapProducer(id, worker) => {
+        if(worker.isInstanceOf[JavaStormStreamExecutor[EagleTuple]]){
+          worker.asInstanceOf[JavaStormStreamExecutor[EagleTuple]].prepareConfig(config)
+          JavaStormBoltWrapper(worker.asInstanceOf[JavaStormStreamExecutor[EagleTuple]])
+        }else if(worker.isInstanceOf[StormStreamExecutor[EagleTuple]]){
+          worker.asInstanceOf[StormStreamExecutor[EagleTuple]].prepareConfig(config)
+          StormBoltWrapper(worker.asInstanceOf[StormStreamExecutor[EagleTuple]])
+        }else {
+          throw new UnsupportedOperationException
+        }
+      }
+      case FilterProducer(id, fn) => {
+        FilterBoltWrapper(fn)
+      }
+      case MapProducer(id, n, fn) => {
+        MapBoltWrapper(n, fn)
+      }
+      case _ => throw new UnsupportedOperationException
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StormBoltWrapper.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StormBoltWrapper.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StormBoltWrapper.scala
new file mode 100644
index 0000000..7f27483
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StormBoltWrapper.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.datastream
+
+import java.util
+
+import backtype.storm.task.{OutputCollector, TopologyContext}
+import backtype.storm.topology.OutputFieldsDeclarer
+import backtype.storm.topology.base.BaseRichBolt
+import backtype.storm.tuple.{Fields, Tuple}
+import org.slf4j.LoggerFactory
+
+import scala.collection.JavaConverters._
+
+case class StormBoltWrapper(worker : StormStreamExecutor[EagleTuple]) extends BaseRichBolt{
+  val LOG = LoggerFactory.getLogger(StormBoltWrapper.getClass)
+  var _collector : OutputCollector = null
+
+  override def prepare(stormConf: util.Map[_, _], context: TopologyContext, collector: OutputCollector): Unit = {
+    _collector = collector
+    worker.init
+  }
+
+  override def execute(input : Tuple): Unit = {
+    try {
+      worker.flatMap(input.getValues.asScala, new Collector[EagleTuple] {
+        override def collect(t: EagleTuple): Unit = {
+          _collector.emit(input, t.getList.asJava)
+        }
+      })
+    }catch{
+      case ex: Exception => {
+        LOG.error("fail executing", ex)
+        _collector.fail(input)
+        throw new RuntimeException(ex)
+      }
+    }
+    _collector.ack(input)
+  }
+
+  override def declareOutputFields(declarer : OutputFieldsDeclarer): Unit ={
+    val fields = worker.fields
+    LOG.info("output fields for worker " + worker + " : " + fields.toList)
+    declarer.declare(new Fields(fields:_*))
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StormExecutorForAlertWrapper.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StormExecutorForAlertWrapper.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StormExecutorForAlertWrapper.scala
new file mode 100644
index 0000000..6ff1d52
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StormExecutorForAlertWrapper.scala
@@ -0,0 +1,43 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.datastream
+
+import java.util
+
+import com.typesafe.config.Config
+
+case class StormExecutorForAlertWrapper(delegate: StormStreamExecutor[Tuple2[String, util.SortedMap[AnyRef, AnyRef]]], streamName: String)
+  extends StormStreamExecutor3[String, String, util.SortedMap[Object, Object]]{
+  override def prepareConfig(config: Config): Unit = {
+    delegate.prepareConfig(config)
+  }
+
+  override def init: Unit = {
+    delegate.init
+  }
+
+  override def flatMap(input: Seq[AnyRef], collector: Collector[Tuple3[String, String, util.SortedMap[Object, Object]]]): Unit = {
+    delegate.flatMap(input, new Collector[Tuple2[String, util.SortedMap[AnyRef, AnyRef]]] {
+      override def collect(r: Tuple2[String, util.SortedMap[AnyRef, AnyRef]]): Unit = {
+        collector.collect(Tuple3(r.f0, streamName, r.f1))
+      }
+    })
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StormSpoutFactory.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StormSpoutFactory.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StormSpoutFactory.scala
new file mode 100644
index 0000000..dcb51fd
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StormSpoutFactory.scala
@@ -0,0 +1,46 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+package org.apache.eagle.datastream
+
+import java.util
+
+import backtype.storm.topology.base.BaseRichSpout
+import com.typesafe.config.Config
+
+object StormSpoutFactory {
+  /**
+   * @param config context configuration
+   * @param sourceProducer source producer
+   * @return
+   */
+  def createSpout(config: Config, sourceProducer: StormSourceProducer) : BaseRichSpout = {
+    val numFields = sourceProducer.numFields
+    if(numFields <= 0) {
+      sourceProducer.source
+    }else{
+      var i = 0
+      val ret = new util.ArrayList[String]
+      while(i < numFields){
+        ret.add(OutputFieldNameConst.FIELD_PREFIX + i)
+        i += 1
+      }
+      SpoutProxy(sourceProducer.source, ret)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StormStreamDAG.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StormStreamDAG.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StormStreamDAG.scala
new file mode 100644
index 0000000..f4129ae
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StormStreamDAG.scala
@@ -0,0 +1,68 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+package org.apache.eagle.datastream
+
+import org.jgrapht.experimental.dag.DirectedAcyclicGraph
+
+import scala.collection.JavaConverters._
+import scala.collection.{JavaConversions, mutable}
+
+/**
+ * wrapper of DAG, used for storm topology compiler
+ */
+class StormStreamDAG(graph: DirectedAcyclicGraph[StreamProducer, StreamConnector]) extends AbstractStreamProducerGraph {
+  var nodeMap: mutable.Map[String, StreamProducer] = null
+
+  override def addEdge(from: StreamProducer, to: StreamProducer, streamConnector: StreamConnector): Unit = {
+    graph.addEdge(from, to, streamConnector)
+  }
+
+  override def addVertex(producer: StreamProducer): Unit = {
+    graph.addVertex(producer)
+  }
+
+  override def iterator(): Iterator[StreamProducer] = {
+    JavaConversions.asScalaIterator(graph.iterator())
+  }
+
+  override def isSource(v: StreamProducer): Boolean = {
+    graph.inDegreeOf(v) match {
+      case 0 => true
+      case _ => false
+    }
+  }
+
+  override def outgoingEdgesOf(v: StreamProducer): scala.collection.mutable.Set[StreamConnector] = {
+    JavaConversions.asScalaSet(graph.outgoingEdgesOf(v))
+  }
+
+  override def getNodeByName(name: String): Option[StreamProducer] = {
+    nodeMap.get(name)
+  }
+
+  def setNodeMap(nodeMap: mutable.Map[String, StreamProducer]): Unit = {
+    this.nodeMap = nodeMap
+  }
+
+  override def incomingVertexOf(v: StreamProducer): scala.collection.mutable.Set[StreamProducer] = {
+    val set = mutable.Set[StreamProducer]()
+    graph.incomingEdgesOf(v).asScala.foreach(e => set += graph.getEdgeSource(e))
+    set
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StormStreamDAGTransformer.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StormStreamDAGTransformer.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StormStreamDAGTransformer.scala
new file mode 100644
index 0000000..254d84b
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StormStreamDAGTransformer.scala
@@ -0,0 +1,46 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.datastream
+
+import org.jgrapht.experimental.dag.DirectedAcyclicGraph
+import scala.collection.mutable
+
+/**
+ * convert generic DAG data structure to Storm specific DAG data structure for easy topology compiler
+ */
+object StormStreamDAGTransformer {
+  /**
+   * Transform DirectedAcyclicGraph[StreamProducer, StreamConnector] into StormStreamDAG
+   *
+   * @param dag DirectedAcyclicGraph[StreamProducer, StreamConnector]
+   * @return StormStreamDAG
+   */
+  def transform(dag: DirectedAcyclicGraph[StreamProducer, StreamConnector]) : StormStreamDAG = {
+    val stormDAG = new StormStreamDAG(dag)
+    val nodeMap = mutable.HashMap[String, StreamProducer]()
+    val iter = dag.iterator()
+    while(iter.hasNext){
+      val sp = iter.next()
+      nodeMap.put(sp.name, sp)
+    }
+    stormDAG.setNodeMap(nodeMap)
+    stormDAG
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StormTopologyCompiler.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StormTopologyCompiler.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StormTopologyCompiler.scala
new file mode 100644
index 0000000..dbe69d2
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StormTopologyCompiler.scala
@@ -0,0 +1,98 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+package org.apache.eagle.datastream
+
+import java.util
+
+import backtype.storm.topology.base.BaseRichBolt
+import backtype.storm.topology.{BoltDeclarer, TopologyBuilder}
+import backtype.storm.tuple.Fields
+import com.typesafe.config.Config
+import org.slf4j.LoggerFactory
+
+case class StormTopologyCompiler(config: Config, graph: AbstractStreamProducerGraph) extends AbstractTopologyCompiler{
+  val LOG = LoggerFactory.getLogger(StormTopologyCompiler.getClass)
+  val boltCache = scala.collection.mutable.Map[StreamProducer, StormBoltWrapper]()
+
+  override def buildTopology: AbstractTopologyExecutor ={
+    val builder = new TopologyBuilder();
+    val iter = graph.iterator()
+    val boltDeclarerCache = scala.collection.mutable.Map[String, BoltDeclarer]()
+    while(iter.hasNext){
+      val from = iter.next()
+      val fromName = from.name
+      if(graph.isSource(from)){
+        val spout = StormSpoutFactory.createSpout(config, from.asInstanceOf[StormSourceProducer])
+        builder.setSpout(fromName, spout, from.parallelism)
+        LOG.info("Spout name : " + fromName + " with parallelism " + from.parallelism)
+      } else {
+        LOG.info("Bolt name:" + fromName)
+      }
+
+      val edges = graph.outgoingEdgesOf(from)
+      edges.foreach(sc => {
+        val toName = sc.to.name
+        var boltDeclarer : BoltDeclarer = null
+        val toBolt = createBoltIfAbsent(toName)
+        boltDeclarerCache.get(toName) match{
+          case None => {
+            var finalParallelism = 1
+            graph.getNodeByName(toName) match {
+              case Some(p) => finalParallelism = p.parallelism
+              case None => finalParallelism = 1
+            }
+            boltDeclarer = builder.setBolt(toName, toBolt, finalParallelism);
+            LOG.info("created bolt " + toName + " with parallelism " + finalParallelism)
+            boltDeclarerCache.put(toName, boltDeclarer)
+          }
+          case Some(bt) => boltDeclarer = bt
+        }
+        sc.groupByFields match{
+          case Nil => boltDeclarer.shuffleGrouping(fromName)
+          case p => boltDeclarer.fieldsGrouping(fromName, new Fields(fields(p)))
+        }
+        LOG.info("bolt connected " + fromName + "->" + toName + " with groupby fields " + sc.groupByFields)
+      })
+    }
+    new StormTopologyExecutorImpl(builder.createTopology, config)
+  }
+
+  def fields(fields : Seq[Int]): java.util.List[String] ={
+    val ret = new util.ArrayList[String]
+    fields.map(n => ret.add(OutputFieldNameConst.FIELD_PREFIX + n))
+    ret
+  }
+
+  def createBoltIfAbsent(name : String) : BaseRichBolt = {
+    val producer = graph.getNodeByName(name)
+    producer match{
+      case Some(p) => createBoltIfAbsent(graph, p)
+      case None => throw new IllegalArgumentException("please check bolt name " + name)
+    }
+  }
+
+  def createBoltIfAbsent(graph: AbstractStreamProducerGraph, producer : StreamProducer): BaseRichBolt ={
+    boltCache.get(producer) match{
+      case Some(bolt) => bolt
+      case None => {
+        StormBoltFactory.getBoltWrapper(graph, producer, config)
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StormTopologyExecutorImpl.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StormTopologyExecutorImpl.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StormTopologyExecutorImpl.scala
new file mode 100644
index 0000000..4cecb85
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StormTopologyExecutorImpl.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.datastream
+
+import backtype.storm.generated.StormTopology
+import backtype.storm.utils.Utils
+import backtype.storm.{Config, LocalCluster, StormSubmitter}
+import storm.trident.spout.RichSpoutBatchExecutor
+
+case class StormTopologyExecutorImpl(topology: StormTopology, config: com.typesafe.config.Config) extends AbstractTopologyExecutor {
+  @throws(classOf[Exception])
+  def execute {
+    val localMode: Boolean = config.getString("envContextConfig.mode").equalsIgnoreCase("local")
+    val conf: Config = new Config
+    conf.put(RichSpoutBatchExecutor.MAX_BATCH_SIZE_CONF, Int.box(64 * 1024))
+    conf.put(Config.TOPOLOGY_RECEIVER_BUFFER_SIZE, Int.box(8))
+    conf.put(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE, Int.box(32))
+    conf.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, Int.box(16384))
+    conf.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, Int.box(16384))
+
+    val topologyName = config.getString("envContextConfig.topologyName")
+    if (!localMode) {
+      StormSubmitter.submitTopologyWithProgressBar(topologyName, conf, topology)
+    }
+    else {
+      val cluster: LocalCluster = new LocalCluster
+      cluster.submitTopology(topologyName, conf, topology)
+      Utils.sleep(Integer.MAX_VALUE)
+      cluster.killTopology(topologyName)
+      cluster.shutdown
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamAlertExpansion.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamAlertExpansion.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamAlertExpansion.scala
new file mode 100644
index 0000000..fa83e6d
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamAlertExpansion.scala
@@ -0,0 +1,192 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.datastream
+
+import java.util
+
+import com.typesafe.config.Config
+import org.apache.eagle.alert.dao.AlertDefinitionDAOImpl
+import org.apache.eagle.executor.AlertExecutorCreationUtils
+import org.jgrapht.experimental.dag.DirectedAcyclicGraph
+import org.slf4j.LoggerFactory
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.ListBuffer
+
+/**
+ * The constraints for alert is:
+ * 1. only 3 StreamProducers can be put immediately before MapProducer, FlatMapProducer, StreamUnionProducer
+ * 2. For StreamUnionProducer, the only supported unioned producers are MapProducer and FlatMapProducer
+ * 3. the output for MapProducer and FlatMapProducer is 2-field tuple, key and value, key is string, value has to be SortedMap
+ * 4. the framework will wrapper original MapProducer and FlatMapProducer to emit 3-field tuple, {key, streamName and value}
+ * 5. the framework will automatically partition traffic with first field
+ *
+ *
+ * 2 steps
+ * step 1: wrapper previous StreamProducer with one more field "streamName"
+ * step 2: partition alert executor by policy partitioner class
+ */
+
+class StreamAlertExpansion(config: Config) extends StreamDAGExpansion(config) {
+  val LOG = LoggerFactory.getLogger(classOf[StreamAlertExpansion])
+
+  override def expand(dag: DirectedAcyclicGraph[StreamProducer, StreamConnector]): Unit ={
+    val iter = dag.iterator()
+    val toBeAddedEdges = new ListBuffer[StreamConnector]
+    val toBeRemovedVertex = new ListBuffer[StreamProducer]
+    while(iter.hasNext) {
+      val current = iter.next()
+      dag.outgoingEdgesOf(current).foreach(edge => {
+        val child = edge.to
+        onIteration(toBeAddedEdges, toBeRemovedVertex, dag, current, child)
+      })
+    }
+    // add back edges
+    toBeAddedEdges.foreach(e => {
+      dag.addVertex(e.from)
+      dag.addVertex(e.to)
+      dag.addEdge(e.from, e.to, e)
+    })
+    toBeRemovedVertex.foreach(v => dag.removeVertex(v))
+  }
+
+  def onIteration(toBeAddedEdges: ListBuffer[StreamConnector], toBeRemovedVertex: ListBuffer[StreamProducer],
+               dag: DirectedAcyclicGraph[StreamProducer, StreamConnector], current: StreamProducer, child: StreamProducer): Unit = {
+    child match {
+      case AlertStreamSink(id, upStreamNames, alertExecutorId, withConsumer) => {
+        /**
+         * step 1: wrapper previous StreamProducer with one more field "streamName"
+         * for AlertStreamSink, we check previous StreamProducer and replace that
+         */
+        val newStreamProducers = new ListBuffer[StreamProducer]
+        current match {
+          case StreamUnionProducer(id, others) => {
+            val incomingEdges = dag.incomingEdgesOf(current)
+            incomingEdges.foreach(e => newStreamProducers += replace(toBeAddedEdges, toBeRemovedVertex, dag, e.from, upStreamNames.get(0)))
+            var i: Int = 1
+            others.foreach(o => {
+              newStreamProducers += replace(toBeAddedEdges, toBeRemovedVertex, dag, o, upStreamNames.get(i))
+              i += 1
+            })
+          }
+          case _: FlatMapProducer[AnyRef, AnyRef] => {
+            newStreamProducers += replace(toBeAddedEdges, toBeRemovedVertex, dag, current, upStreamNames.get(0))
+          }
+          case _: MapProducer => {
+            newStreamProducers += replace(toBeAddedEdges, toBeRemovedVertex, dag, current, upStreamNames.get(0))
+          }
+          case s: StreamProducer if dag.inDegreeOf(s) == 0 => {
+            newStreamProducers += replace(toBeAddedEdges, toBeRemovedVertex, dag, current, upStreamNames.get(0))
+          }
+          case p@_ => throw new IllegalStateException(s"$p can not be put before AlertStreamSink, only StreamUnionProducer,FlatMapProducer and MapProducer are supported")
+        }
+
+        /**
+         * step 2: partition alert executor by policy partitioner class
+         */
+        val alertExecutors = AlertExecutorCreationUtils.createAlertExecutors(config, new AlertDefinitionDAOImpl(config), upStreamNames, alertExecutorId)
+        var alertProducers = new scala.collection.mutable.MutableList[StreamProducer]
+        alertExecutors.foreach(exec => {
+          val t = FlatMapProducer(UniqueId.incrementAndGetId(), exec).withName(exec.getAlertExecutorId() + "_" + exec.getPartitionSeq())
+          t.setConfig(config)
+          t.setGraph(dag)
+          alertProducers += t
+          newStreamProducers.foreach(newsp => toBeAddedEdges += StreamConnector(newsp, t).groupBy(Seq(0)))
+        })
+
+        // remove AlertStreamSink
+        toBeRemovedVertex += child
+
+        // add alert consumer if necessary
+        if (withConsumer) {
+          AlertExecutorConsumerUtils.setupAlertConsumers(toBeAddedEdges, alertProducers.toList)
+        }
+      }
+      case _ =>
+    }
+  }
+
+  private def replace(toBeAddedEdges: ListBuffer[StreamConnector], toBeRemovedVertex: ListBuffer[StreamProducer],
+                      dag: DirectedAcyclicGraph[StreamProducer, StreamConnector], current: StreamProducer, upStreamName: String) : StreamProducer= {
+    var newsp: StreamProducer = null
+    current match {
+      case _: FlatMapProducer[AnyRef, AnyRef] => {
+        val mapper = current.asInstanceOf[FlatMapProducer[_, _]].mapper
+        mapper match {
+          case a: JavaStormStreamExecutor[EagleTuple] => {
+            val newmapper = new JavaStormExecutorForAlertWrapper(a.asInstanceOf[JavaStormStreamExecutor[Tuple2[String, util.SortedMap[AnyRef, AnyRef]]]], upStreamName)
+            newsp = FlatMapProducer(UniqueId.incrementAndGetId(), newmapper)
+            newsp.setGraph(dag)
+            newsp.setConfig(config)
+          }
+          case b: StormStreamExecutor[EagleTuple] => {
+            val newmapper = StormExecutorForAlertWrapper(b.asInstanceOf[StormStreamExecutor[Tuple2[String, util.SortedMap[AnyRef, AnyRef]]]], upStreamName)
+            newsp = FlatMapProducer(UniqueId.incrementAndGetId(), newmapper)
+            newsp.setGraph(dag)
+            newsp.setConfig(config)
+          }
+          case _ => throw new IllegalArgumentException
+        }
+        // remove old StreamProducer and replace that with new StreamProducer
+        val incomingEdges = dag.incomingEdgesOf(current)
+        incomingEdges.foreach(e => toBeAddedEdges += StreamConnector(e.from, newsp))
+        val outgoingEdges = dag.outgoingEdgesOf(current)
+        outgoingEdges.foreach(e => toBeAddedEdges += StreamConnector(newsp, e.to))
+        toBeRemovedVertex += current
+      }
+      case _: MapProducer => {
+        val mapper = current.asInstanceOf[MapProducer].fn
+        val newfun: (AnyRef => AnyRef) = {
+          a => mapper(a) match {
+            case scala.Tuple2(x1, x2) => (x1, upStreamName, x2)
+            case _ => throw new IllegalArgumentException
+          }
+        }
+        current match {
+          case MapProducer(id, 2, fn) => newsp = MapProducer(UniqueId.incrementAndGetId(), 3, newfun)
+          case _ => throw new IllegalArgumentException
+        }
+        val incomingEdges = dag.incomingEdgesOf(current)
+        incomingEdges.foreach(e => toBeAddedEdges += StreamConnector(e.from, newsp))
+        val outgoingEdges = dag.outgoingEdgesOf(current)
+        outgoingEdges.foreach(e => toBeAddedEdges += StreamConnector(newsp, e.to))
+        toBeRemovedVertex += current
+      }
+      case s: StreamProducer if dag.inDegreeOf(s) == 0 => {
+        val fn:(AnyRef => AnyRef) = {
+          n => {
+            n match {
+              case scala.Tuple3 => n
+              case scala.Tuple2(x1,x2) => (x1,upStreamName,x2)
+              case scala.Tuple1(x1) => (if(x1 == null) null else x1.hashCode(),upStreamName,x1)
+              case _ => (if(n == null) null else n.hashCode(),upStreamName,n)
+            }
+          }
+        }
+        newsp = MapProducer(UniqueId.incrementAndGetId(),3,fn)
+        toBeAddedEdges += StreamConnector(current,newsp)
+        val outgoingEdges = dag.outgoingEdgesOf(current)
+        outgoingEdges.foreach(e => toBeAddedEdges += StreamConnector(newsp,e.to))
+      }
+      case _ => throw new IllegalArgumentException("Only FlatMapProducer and MapProducer can be replaced before AlertStreamSink")
+    }
+    newsp
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamAppDSL.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamAppDSL.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamAppDSL.scala
new file mode 100644
index 0000000..c6a3d3d
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamAppDSL.scala
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.datastream
+
+import backtype.storm.topology.base.BaseRichSpout
+import com.typesafe.config._
+import org.apache.eagle.dataproc.impl.storm.AbstractStormSpoutProvider
+import org.apache.eagle.dataproc.util.ConfigOptionParser
+
+import scala.reflect.runtime.universe._
+
+/**
+ * @since  11/6/15
+ */
+trait ConfigContext{
+  def set(config:Config)
+  def config:Config
+
+  def set[T<:AnyRef](key:String,value:T): Unit = {
+    set(config.withValue(key,ConfigValueFactory.fromAnyRef(value)))
+  }
+
+  /**
+   *
+   * @param key config key
+   * @param default default value
+   * @tparam T return type
+   * @return
+   */
+  def get[T](key:String,default:T=null)(implicit tag:TypeTag[T]):T = {
+    if(config.hasPath(key)) {
+      get(key)
+    } else default
+  }
+
+  def get[T](key:String)(implicit tag: TypeTag[T]):T = tag.tpe match {
+    case STRING_TYPE => config.getString(key).asInstanceOf[T]
+    case TypeTag.Double => config.getDouble(key).asInstanceOf[T]
+    case TypeTag.Long => config.getLong(key).asInstanceOf[T]
+    case TypeTag.Int => config.getInt(key).asInstanceOf[T]
+    case TypeTag.Byte => config.getBytes(key).asInstanceOf[T]
+    case TypeTag.Boolean => config.getBoolean(key).asInstanceOf[T]
+    case NUMBER_TYPE => config.getNumber(key).asInstanceOf[T]
+    case OBJECT_TYPE => config.getObject(key).asInstanceOf[T]
+    case VALUE_TYPE => config.getValue(key).asInstanceOf[T]
+    case ANY_REF_TYPE => config.getAnyRef(key).asInstanceOf[T]
+    case INT_LIST_TYPE => config.getIntList(key).asInstanceOf[T]
+    case DOUBLE_LIST_TYPE => config.getDoubleList(key).asInstanceOf[T]
+    case BOOL_LIST_TYPE => config.getBooleanList(key).asInstanceOf[T]
+    case LONG_LIST_TYPE => config.getLongList(key).asInstanceOf[T]
+    case _ => throw new UnsupportedOperationException(s"$tag is not supported yet")
+  }
+
+  val STRING_TYPE = typeOf[String]
+  val NUMBER_TYPE = typeOf[Number]
+  val INT_LIST_TYPE = typeOf[List[Int]]
+  val BOOL_LIST_TYPE = typeOf[List[Boolean]]
+  val DOUBLE_LIST_TYPE = typeOf[List[Double]]
+  val LONG_LIST_TYPE = typeOf[List[Double]]
+  val OBJECT_TYPE = typeOf[ConfigObject]
+  val VALUE_TYPE = typeOf[ConfigValue]
+  val ANY_REF_TYPE = typeOf[AnyRef]
+}
+
+/**
+ * Stream APP DSL
+ * @tparam E
+ */
+trait StreamApp[+E<:ExecutionEnvironment] extends App with ConfigContext{
+  private var _executed = false
+  private var _config:Config = null
+
+  override def config:Config = _config
+
+  override def set(config:Config) = _config = config
+
+  def env:E
+
+  def execute() {
+    env.execute()
+    _executed = true
+  }
+
+  override def main(args: Array[String]): Unit = {
+    _config = new ConfigOptionParser().load(args)
+    super.main(args)
+    if(!_executed) execute()
+  }
+}
+
+trait StormStreamApp extends StreamApp[StormExecutionEnvironment]{
+  private var _env:StormExecutionEnvironment = null
+  def source(sourceProvider: AbstractStormSpoutProvider) = {
+    val spout = sourceProvider.getSpout(config)
+    env.newSource(spout)
+  }
+
+  def source(spout:BaseRichSpout) = env.newSource(spout)
+
+  override def env:StormExecutionEnvironment = {
+    if(_env == null){
+      _env = ExecutionEnvironmentFactory.getStorm(config)
+    }
+    _env
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamConnector.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamConnector.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamConnector.scala
new file mode 100644
index 0000000..0cece47
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamConnector.scala
@@ -0,0 +1,28 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+package org.apache.eagle.datastream
+
+case class StreamConnector(from: StreamProducer, to: StreamProducer) {
+  var groupByFields : Seq[Int] = Nil
+
+  def groupBy(fields : Seq[Int]) : StreamConnector = {
+    groupByFields = fields
+    this
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamDAGExpansion.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamDAGExpansion.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamDAGExpansion.scala
new file mode 100644
index 0000000..7e15233
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamDAGExpansion.scala
@@ -0,0 +1,27 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.datastream
+
+import com.typesafe.config.Config
+import org.jgrapht.experimental.dag.DirectedAcyclicGraph
+
+abstract class StreamDAGExpansion(config: Config) {
+  def expand(dag: DirectedAcyclicGraph[StreamProducer, StreamConnector])
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamGroupbyExpansion.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamGroupbyExpansion.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamGroupbyExpansion.scala
new file mode 100644
index 0000000..42bc9a8
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamGroupbyExpansion.scala
@@ -0,0 +1,59 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.datastream
+
+import com.typesafe.config.Config
+import org.jgrapht.experimental.dag.DirectedAcyclicGraph
+import org.slf4j.LoggerFactory
+import scala.collection.JavaConversions._
+import scala.collection.mutable.ListBuffer
+
+/**
+ * Replace GroupByProducer(Vertex) with StreamConnector (Edge)
+ * @param config context configuration
+ */
+class StreamGroupbyExpansion(config: Config) extends StreamDAGExpansion(config){
+  val LOG = LoggerFactory.getLogger(classOf[StreamGroupbyExpansion])
+
+  override def expand(dag: DirectedAcyclicGraph[StreamProducer, StreamConnector]) = {
+    val iter = dag.iterator()
+    var toBeAddedEdges = new ListBuffer[StreamConnector]
+    var toBeRemovedVertex = new ListBuffer[StreamProducer]
+    while(iter.hasNext) {
+      val current = iter.next()
+      dag.outgoingEdgesOf(current).foreach(edge => {
+        val child = edge.to
+        child match {
+          case p : GroupByProducer => {
+            dag.outgoingEdgesOf(p).foreach(c2 => {
+              toBeAddedEdges += StreamConnector(current, c2.to).groupBy(p.fields)
+            })
+            toBeRemovedVertex += p
+          }
+          case _ =>
+        }
+      })
+    }
+
+    // add back edges
+    toBeAddedEdges.foreach(e => dag.addEdge(e.from, e.to, e))
+    toBeRemovedVertex.foreach(v => dag.removeVertex(v))
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamNameExpansion.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamNameExpansion.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamNameExpansion.scala
new file mode 100644
index 0000000..5208a97
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamNameExpansion.scala
@@ -0,0 +1,41 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.datastream
+
+import com.typesafe.config.Config
+import org.jgrapht.experimental.dag.DirectedAcyclicGraph
+import org.slf4j.LoggerFactory
+
+/**
+ * to set name for each StreamProducer
+ * 1. if name is given programatically, then use this name
+ * 2. otherwise use name generated by scala internally
+ */
+class StreamNameExpansion(config: Config) extends StreamDAGExpansion(config){
+  val LOG = LoggerFactory.getLogger(classOf[StreamNameExpansion])
+
+  override def expand(dag: DirectedAcyclicGraph[StreamProducer, StreamConnector]) = {
+    val iter = dag.iterator()
+    while(iter.hasNext){
+      val sp = iter.next()
+      sp.name = NodeNameSelector(sp).getName
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamParallelismConfigExpansion.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamParallelismConfigExpansion.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamParallelismConfigExpansion.scala
new file mode 100644
index 0000000..0264ca6
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamParallelismConfigExpansion.scala
@@ -0,0 +1,55 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.datastream
+
+import java.util.regex.Pattern
+
+import com.typesafe.config.{ConfigValue, ConfigObject, Config}
+import org.jgrapht.experimental.dag.DirectedAcyclicGraph
+import org.slf4j.LoggerFactory
+import scala.collection.JavaConverters._
+
+class StreamParallelismConfigExpansion(config: Config) extends StreamDAGExpansion(config){
+  val LOG = LoggerFactory.getLogger(classOf[StreamParallelismConfigExpansion])
+
+  override def expand(dag: DirectedAcyclicGraph[StreamProducer, StreamConnector]) = {
+    val map = getParallelismMap(config)
+    val iter = dag.iterator()
+    while(iter.hasNext){
+      val streamProducer = iter.next()
+      if(streamProducer.name != null) {
+        map.foreach(tuple => {
+          tuple._1.matcher(streamProducer.name).find() match {
+            case true => streamProducer.parallelism = tuple._2
+            case false =>
+          }
+        })
+      }
+    }
+  }
+
+  private def getParallelismMap(config: Config) : Map[Pattern, Int]= {
+    val parallelismConfig: ConfigObject = config.getObject("envContextConfig.parallelismConfig")
+    LOG.info("Found parallelismConfig ? " + (if (parallelismConfig == null) "no" else "yes"))
+    parallelismConfig.asScala.toMap map {
+      case (name, value) => (Pattern.compile(name), value.asInstanceOf[ConfigValue].unwrapped().asInstanceOf[Int])
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamProducer.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamProducer.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamProducer.scala
new file mode 100644
index 0000000..9fb3e22
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamProducer.scala
@@ -0,0 +1,193 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+package org.apache.eagle.datastream
+
+import java.util
+import java.util.concurrent.atomic.AtomicInteger
+
+import backtype.storm.topology.base.BaseRichSpout
+import com.typesafe.config.Config
+import org.jgrapht.experimental.dag.DirectedAcyclicGraph
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+
+/**
+ * StreamProducer is the base class for all other concrete StreamProducer
+ * It defines high level API for user to organize data stream flow
+ *
+ * StreamProducer is independent of execution environment
+ */
+
+trait StreamProducer{
+  var name: String = null
+  var parallelism: Int = 1
+  var graph: DirectedAcyclicGraph[StreamProducer, StreamConnector] = null
+  var config: Config = null
+
+  private def incrementAndGetId() = UniqueId.incrementAndGetId()
+
+  def setGraph(graph: DirectedAcyclicGraph[StreamProducer, StreamConnector]): Unit = this.graph = graph
+  def getGraph: DirectedAcyclicGraph[StreamProducer, StreamConnector] = graph
+  def setConfig(config: Config) : Unit = this.config = config
+  def getConfig: Config = config
+
+  def filter(fn : AnyRef => Boolean): StreamProducer ={
+    val ret = FilterProducer(incrementAndGetId(), fn)
+    hookupDAG(graph, this, ret)
+    ret
+  }
+
+  def flatMap[T, R](mapper : FlatMapper[T, R]) : StreamProducer = {
+    val ret = FlatMapProducer(incrementAndGetId(), mapper)
+    hookupDAG(graph, this, ret)
+    ret
+  }
+
+  def map1(fn : AnyRef => AnyRef) : StreamProducer = {
+    val ret = MapProducer(incrementAndGetId(), 1, fn)
+    hookupDAG(graph, this, ret)
+    ret
+  }
+
+  def map2(fn : AnyRef => AnyRef) : StreamProducer = {
+    val ret = MapProducer(incrementAndGetId(), 2, fn)
+    hookupDAG(graph, this, ret)
+    ret
+  }
+
+  def map3(fn : AnyRef => AnyRef) : StreamProducer = {
+    val ret = MapProducer(incrementAndGetId(), 3, fn)
+    hookupDAG(graph, this, ret)
+    ret
+  }
+
+  def map4(fn : AnyRef => AnyRef) : StreamProducer = {
+    val ret = MapProducer(incrementAndGetId(), 4, fn)
+    hookupDAG(graph, this, ret)
+    ret
+  }
+
+  /**
+   * starting from 0, groupby operator would be upon edge of the graph
+   */
+  def groupBy(fields : Int*) : StreamProducer = {
+    // validate each field index is greater or equal to 0
+    fields.foreach(n => if(n<0) throw new IllegalArgumentException("field index should be always >= 0"))
+    val ret = GroupByProducer(incrementAndGetId(), fields)
+    hookupDAG(graph, this, ret)
+    ret
+  }
+
+  //groupBy java version, starting from 1
+  def groupBy(fields : java.util.List[Integer]) : StreamProducer = {
+    // validate each field index is greater or equal to 0
+    fields.foreach(n => if(n<0) throw new IllegalArgumentException("field index should be always >= 0"))
+    val ret = GroupByProducer(incrementAndGetId(), fields.asScala.toSeq.asInstanceOf[Seq[Int]])
+    hookupDAG(graph, this, ret)
+    ret
+  }
+
+  def streamUnion(others : Seq[StreamProducer]) : StreamProducer = {
+    val ret = StreamUnionProducer(incrementAndGetId(), others)
+    hookupDAG(graph, this, ret)
+    ret
+  }
+
+  /**
+   * alert is always sink of data flow
+   */
+  def alertWithConsumer(upStreamNames: util.List[String], alertExecutorId : String) = {
+    alert(upStreamNames, alertExecutorId, true)
+  }
+
+  def alertWithoutConsumer(upStreamNames: util.List[String], alertExecutorId : String) = {
+    alert(upStreamNames, alertExecutorId, false)
+  }
+
+  def alert(upStreamNames: util.List[String], alertExecutorId : String, withConsumer: Boolean=true) = {
+    val ret = AlertStreamSink(incrementAndGetId(), upStreamNames, alertExecutorId, withConsumer)
+    hookupDAG(graph, this, ret)
+  }
+
+  def alertWithConsumer(upStreamName: String, alertExecutorId : String): Unit ={
+    alert(util.Arrays.asList(upStreamName), alertExecutorId, true)
+  }
+
+  def alertWithoutConsumer(upStreamName: String, alertExecutorId : String): Unit ={
+    alert(util.Arrays.asList(upStreamName), alertExecutorId, false)
+  }
+
+  def hookupDAG(graph: DirectedAcyclicGraph[StreamProducer, StreamConnector], current: StreamProducer, next: StreamProducer) = {
+    current.getGraph.addVertex(next)
+    current.getGraph.addEdge(current, next, StreamConnector(current, next))
+    passOnContext(current, next)
+  }
+
+  private def passOnContext(current: StreamProducer, next: StreamProducer): Unit ={
+    next.graph = current.graph
+    next.config = current.config
+  }
+
+  /**
+   * can be set by programatically or by configuration
+   */
+  def withParallelism(parallelism : Int) : StreamProducer = {
+    this.parallelism = parallelism
+    this
+  }
+
+  def withName(name : String) : StreamProducer = {
+    this.name = name
+    this
+  }
+}
+
+case class FilterProducer(id: Int, fn : AnyRef => Boolean) extends StreamProducer
+
+case class FlatMapProducer[T, R](id: Int, var mapper: FlatMapper[T, R]) extends StreamProducer {
+  override def toString() = mapper.toString + "_" + id
+}
+
+case class MapProducer(id: Int, numOutputFields : Int, var fn : AnyRef => AnyRef) extends StreamProducer
+
+case class GroupByProducer(id: Int, fields : Seq[Int]) extends StreamProducer
+
+case class StreamUnionProducer(id: Int, others: Seq[StreamProducer]) extends StreamProducer
+
+case class StormSourceProducer(id: Int, source : BaseRichSpout) extends StreamProducer{
+  var numFields : Int = 0
+  /**
+    * rename outputfields to f0, f1, f2, ...
+   * if one spout declare some field names, those fields names will be modified
+   * @param n
+   */
+  def renameOutputFields(n : Int): StormSourceProducer ={
+    this.numFields = n
+    this
+  }
+}
+
+case class AlertStreamSink(id: Int, upStreamNames: util.List[String], alertExecutorId : String, withConsumer: Boolean=true) extends StreamProducer
+
+object UniqueId{
+  val id : AtomicInteger = new AtomicInteger(0);
+  def incrementAndGetId() : Int = {
+    id.incrementAndGet()
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamUnionExpansion.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamUnionExpansion.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamUnionExpansion.scala
new file mode 100644
index 0000000..83a83fe
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamUnionExpansion.scala
@@ -0,0 +1,62 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.datastream
+
+import com.typesafe.config.Config
+import org.jgrapht.experimental.dag.DirectedAcyclicGraph
+import org.slf4j.LoggerFactory
+import scala.collection.JavaConversions._
+import scala.collection.mutable.ListBuffer
+
+/**
+ * union operator should be expanded
+ */
+class StreamUnionExpansion(config: Config) extends StreamDAGExpansion(config){
+  val LOG = LoggerFactory.getLogger(classOf[StreamUnionExpansion])
+
+  override def expand(dag: DirectedAcyclicGraph[StreamProducer, StreamConnector]) = {
+    val iter = dag.iterator()
+    var toBeAddedEdges = new ListBuffer[StreamConnector]
+    var toBeRemovedVertex = new ListBuffer[StreamProducer]
+    while(iter.hasNext) {
+      val current = iter.next()
+      dag.outgoingEdgesOf(current).foreach(edge => {
+        val child = edge.to
+        val groupByFields = edge.groupByFields;
+        child match {
+          case StreamUnionProducer(id, others) => {
+            dag.outgoingEdgesOf(child).foreach(c2 => {
+              toBeAddedEdges += StreamConnector(current, c2.to).groupBy(groupByFields)
+              others.foreach(o => {
+                toBeAddedEdges += StreamConnector(o, c2.to).groupBy(groupByFields)
+              })
+            })
+            toBeRemovedVertex += child
+          }
+          case _ =>
+        }
+      })
+    }
+
+    // add back edges
+    toBeAddedEdges.foreach(e => dag.addEdge(e.from, e.to, e))
+    toBeRemovedVertex.foreach(v => dag.removeVertex(v))
+  }
+}