You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2015/11/19 11:47:43 UTC
[36/55] [abbrv] [partial] incubator-eagle git commit: [EAGLE-46]
Rename package name as "org.apache.eagle"
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/AbstractTopologyExecutor.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/AbstractTopologyExecutor.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/AbstractTopologyExecutor.scala
new file mode 100644
index 0000000..9ac3cc9
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/AbstractTopologyExecutor.scala
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.datastream
+
+trait AbstractTopologyExecutor {
+ def execute
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/AlertExecutorConsumerUtils.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/AlertExecutorConsumerUtils.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/AlertExecutorConsumerUtils.scala
new file mode 100644
index 0000000..cb6ac4f
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/AlertExecutorConsumerUtils.scala
@@ -0,0 +1,75 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. See the NOTICE file distributed with
+ * * this work for additional information regarding copyright ownership.
+ * * The ASF licenses this file to You under the Apache License, Version 2.0
+ * * (the "License"); you may not use this file except in compliance with
+ * * the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.datastream
+
+import java.util
+
+import org.apache.eagle.alert.dedup.{AlertEmailDeduplicationExecutor, AlertEntityDeduplicationExecutor}
+import org.apache.eagle.alert.notification.AlertNotificationExecutor
+import org.apache.eagle.alert.persist.AlertPersistExecutor
+import org.apache.eagle.executor.AlertExecutor
+import org.slf4j.{Logger, LoggerFactory}
+
+import scala.collection.mutable.ListBuffer
+
+/**
+ * Create alert executors and provide callback for programmer to link alert executor to immediate parent executors
+ *
+ * <br/><br/>
+ * Explanations for programId, alertExecutorId and policy<br/><br/>
+ * - programId - distributed or single-process program for example one storm topology<br/>
+ * - alertExecutorId - one process/thread which executes multiple policies<br/>
+ * - policy - some rules to be evaluated<br/>
+ *
+ * <br/>
+ *
+ * Normally the mapping is like following:
+ * <pre>
+ * programId (1:N) alertExecutorId
+ * alertExecutorId (1:N) policy
+ * </pre>
+ */
+
+object AlertExecutorConsumerUtils {
+ private val LOG: Logger = LoggerFactory.getLogger(AlertExecutorConsumerUtils.getClass)
+
+ def setupAlertConsumers(toBeAddedEdges: ListBuffer[StreamConnector], alertStreamProducers: List[StreamProducer]): Unit = {
+ var alertExecutorIdList : java.util.List[String] = new util.ArrayList[String]()
+ alertStreamProducers.map(x =>
+ alertExecutorIdList.add(x.asInstanceOf[FlatMapProducer[AnyRef, AnyRef]].mapper.asInstanceOf[AlertExecutor].getAlertExecutorId));
+ val alertDefDao = alertStreamProducers.head.asInstanceOf[FlatMapProducer[AnyRef, AnyRef]].mapper.asInstanceOf[AlertExecutor].getAlertDefinitionDao
+ val entityDedupExecutor: AlertEntityDeduplicationExecutor = new AlertEntityDeduplicationExecutor(alertExecutorIdList, alertDefDao)
+ val emailDedupExecutor: AlertEmailDeduplicationExecutor = new AlertEmailDeduplicationExecutor(alertExecutorIdList, alertDefDao)
+ val notificationExecutor: AlertNotificationExecutor = new AlertNotificationExecutor(alertExecutorIdList, alertDefDao)
+ val persistExecutor: AlertPersistExecutor = new AlertPersistExecutor
+
+ val entityDedupStreamProducer = FlatMapProducer(UniqueId.incrementAndGetId(),entityDedupExecutor)
+ val persistStreamProducer = FlatMapProducer(UniqueId.incrementAndGetId(),persistExecutor)
+ val emailDedupStreamProducer = FlatMapProducer(UniqueId.incrementAndGetId(),emailDedupExecutor)
+ val notificationStreamProducer = FlatMapProducer(UniqueId.incrementAndGetId(),notificationExecutor)
+ toBeAddedEdges += StreamConnector(entityDedupStreamProducer, persistStreamProducer)
+ toBeAddedEdges += StreamConnector(emailDedupStreamProducer, notificationStreamProducer)
+
+ alertStreamProducers.foreach(sp => {
+ toBeAddedEdges += StreamConnector(sp, entityDedupStreamProducer)
+ toBeAddedEdges += StreamConnector(sp, emailDedupStreamProducer)
+ })
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/ExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/ExecutionEnvironment.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/ExecutionEnvironment.scala
new file mode 100644
index 0000000..488d52a
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/ExecutionEnvironment.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.datastream
+
+import backtype.storm.topology.base.BaseRichSpout
+import com.typesafe.config.{Config, ConfigFactory}
+import org.apache.eagle.dataproc.impl.storm.AbstractStormSpoutProvider
+import org.jgrapht.experimental.dag.DirectedAcyclicGraph
+import org.slf4j.LoggerFactory
+
+object ExecutionEnvironmentFactory{
+
+ def getStorm(config : Config) = new StormExecutionEnvironment(config)
+ def getStorm:StormExecutionEnvironment = {
+ val config = ConfigFactory.load()
+ getStorm(config)
+ }
+}
+
+abstract class ExecutionEnvironment(config : Config){
+ def execute()
+}
+
+class StormExecutionEnvironment(config: Config) extends ExecutionEnvironment(config){
+ val LOG = LoggerFactory.getLogger(classOf[StormExecutionEnvironment])
+ val dag = new DirectedAcyclicGraph[StreamProducer, StreamConnector](classOf[StreamConnector])
+
+ override def execute() : Unit = {
+ LOG.info("initial graph:\n")
+ GraphPrinter.print(dag)
+ new StreamAlertExpansion(config).expand(dag)
+ LOG.info("after StreamAlertExpansion graph:")
+ GraphPrinter.print(dag)
+ new StreamUnionExpansion(config).expand(dag)
+ LOG.info("after StreamUnionExpansion graph:")
+ GraphPrinter.print(dag)
+ new StreamGroupbyExpansion(config).expand(dag)
+ LOG.info("after StreamGroupbyExpansion graph:")
+ GraphPrinter.print(dag)
+ new StreamNameExpansion(config).expand(dag)
+ LOG.info("after StreamNameExpansion graph:")
+ GraphPrinter.print(dag)
+ new StreamParallelismConfigExpansion(config).expand(dag)
+ LOG.info("after StreamParallelismConfigExpansion graph:")
+ GraphPrinter.print(dag)
+ val stormDag = StormStreamDAGTransformer.transform(dag)
+ StormTopologyCompiler(config, stormDag).buildTopology.execute
+ }
+
+ def newSource(source: BaseRichSpout): StormSourceProducer ={
+ val ret = StormSourceProducer(UniqueId.incrementAndGetId(), source)
+ ret.config = config
+ ret.graph = dag
+ dag.addVertex(ret)
+ ret
+ }
+
+ def newSource(sourceProvider: AbstractStormSpoutProvider):StormSourceProducer = newSource(sourceProvider.getSpout(config))
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/FilterBoltWrapper.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/FilterBoltWrapper.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/FilterBoltWrapper.scala
new file mode 100644
index 0000000..d31759b
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/FilterBoltWrapper.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.datastream
+
+import java.util
+
+import backtype.storm.task.{OutputCollector, TopologyContext}
+import backtype.storm.topology.OutputFieldsDeclarer
+import backtype.storm.topology.base.BaseRichBolt
+import backtype.storm.tuple.{Fields, Tuple}
+import org.slf4j.LoggerFactory
+
+case class FilterBoltWrapper[T](fn : T => Boolean) extends BaseRichBolt{
+ val LOG = LoggerFactory.getLogger(FilterBoltWrapper.getClass)
+ var _collector : OutputCollector = null
+
+ override def prepare(stormConf: util.Map[_, _], context: TopologyContext, collector: OutputCollector): Unit = {
+ _collector = collector
+ }
+
+ override def execute(input : Tuple): Unit = {
+ input.getValue(0) match {
+ case v:T =>
+ if(fn(v)){
+ _collector.emit(input, input.getValues)
+ _collector.ack(input)
+ }
+ }
+ }
+
+ override def declareOutputFields(declarer : OutputFieldsDeclarer): Unit ={
+ declarer.declare(new Fields(OutputFieldNameConst.FIELD_PREFIX + "0"))
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/GraphPrinter.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/GraphPrinter.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/GraphPrinter.scala
new file mode 100644
index 0000000..1925a89
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/GraphPrinter.scala
@@ -0,0 +1,37 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. See the NOTICE file distributed with
+ * * this work for additional information regarding copyright ownership.
+ * * The ASF licenses this file to You under the Apache License, Version 2.0
+ * * (the "License"); you may not use this file except in compliance with
+ * * the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.datastream
+
+import org.jgrapht.experimental.dag.DirectedAcyclicGraph
+import org.slf4j.LoggerFactory
+import scala.collection.JavaConversions._
+
+object GraphPrinter {
+ private val LOG = LoggerFactory.getLogger(GraphPrinter.getClass)
+ def print(dag: DirectedAcyclicGraph[StreamProducer, StreamConnector]): Unit ={
+ val iter = dag.iterator()
+ while(iter.hasNext) {
+ val current = iter.next()
+ dag.outgoingEdgesOf(current).foreach(edge => {
+ LOG.info(edge.from + "{" + edge.from.parallelism + "}" +" => " + edge.to + "{" + edge.to.parallelism + "}" + " with groupByFields " + edge.groupByFields)
+ })
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/JavaStormBoltWrapper.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/JavaStormBoltWrapper.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/JavaStormBoltWrapper.scala
new file mode 100644
index 0000000..8ebfd7b
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/JavaStormBoltWrapper.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.datastream
+
+import java.util
+
+import backtype.storm.task.{OutputCollector, TopologyContext}
+import backtype.storm.topology.OutputFieldsDeclarer
+import backtype.storm.topology.base.BaseRichBolt
+import backtype.storm.tuple.{Fields, Tuple}
+import org.slf4j.LoggerFactory
+
+import scala.collection.JavaConverters._
+
+case class JavaStormBoltWrapper(worker : JavaStormStreamExecutor[EagleTuple]) extends BaseRichBolt{
+ val LOG = LoggerFactory.getLogger(StormBoltWrapper.getClass)
+ var _collector : OutputCollector = null
+
+ override def prepare(stormConf: util.Map[_, _], context: TopologyContext, collector: OutputCollector): Unit = {
+ _collector = collector
+ worker.init
+ }
+
+ override def execute(input : Tuple): Unit ={
+ worker.flatMap(input.getValues, new Collector[EagleTuple](){
+ def collect(t: EagleTuple): Unit ={
+ _collector.emit(input, t.getList.asJava)
+ }
+ })
+ _collector.ack(input)
+ }
+
+ override def declareOutputFields(declarer : OutputFieldsDeclarer): Unit ={
+ val fields = worker.fields
+ LOG.info("output fields for worker " + worker + " : " + fields.toList)
+ declarer.declare(new Fields(fields:_*))
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/MapBoltWrapper.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/MapBoltWrapper.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/MapBoltWrapper.scala
new file mode 100644
index 0000000..99fa32a
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/MapBoltWrapper.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.datastream
+
+import java.util
+
+import backtype.storm.task.{OutputCollector, TopologyContext}
+import backtype.storm.topology.OutputFieldsDeclarer
+import backtype.storm.topology.base.BaseRichBolt
+import backtype.storm.tuple.{Fields, Tuple}
+import org.slf4j.LoggerFactory
+
+/**
+ * @since 9/29/15
+ */
+case class MapBoltWrapper[T,R](num: Int, fn: T => R) extends BaseRichBolt {
+ val LOG = LoggerFactory.getLogger(FilterBoltWrapper.getClass)
+ var _collector : OutputCollector = null
+
+ override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = {
+ var fields = new util.ArrayList[String]()
+ var i : Int = 0;
+ while(i < num){
+ fields.add(OutputFieldNameConst.FIELD_PREFIX + i)
+ i += 1
+ }
+ declarer.declare(new Fields(fields))
+ }
+
+ override def execute(input: Tuple): Unit = {
+ val size = input.size()
+ var values : AnyRef = null
+ size match {
+ case 1 => values = scala.Tuple1(input.getValue(0))
+ case 2 => values = scala.Tuple2(input.getValue(0), input.getValue(1))
+ case 3 => values = scala.Tuple3(input.getValue(0), input.getValue(1), input.getValue(2))
+ case 4 => values = scala.Tuple4(input.getValue(0), input.getValue(1), input.getValue(2), input.getValue(3))
+ case _ => throw new IllegalArgumentException
+ }
+ val output = fn(values.asInstanceOf[T])
+ output match {
+ case scala.Tuple1(a) => _collector.emit(input, util.Arrays.asList(a.asInstanceOf[AnyRef]))
+ case scala.Tuple2(a, b) => _collector.emit(input, util.Arrays.asList(a.asInstanceOf[AnyRef], b.asInstanceOf[AnyRef]))
+ case scala.Tuple3(a, b, c) => _collector.emit(input, util.Arrays.asList(a.asInstanceOf[AnyRef], b.asInstanceOf[AnyRef], c.asInstanceOf[AnyRef]))
+ case scala.Tuple4(a, b, c, d) => _collector.emit(input, util.Arrays.asList(a.asInstanceOf[AnyRef], b.asInstanceOf[AnyRef], c.asInstanceOf[AnyRef], d.asInstanceOf[AnyRef]))
+ case a => _collector.emit(input, util.Arrays.asList(a.asInstanceOf[AnyRef]))
+ }
+ _collector.ack(input)
+ }
+
+ override def prepare(stormConf: util.Map[_, _], context: TopologyContext, collector: OutputCollector): Unit = {
+ _collector = collector
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/NodeNameSelector.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/NodeNameSelector.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/NodeNameSelector.scala
new file mode 100644
index 0000000..8b06322
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/NodeNameSelector.scala
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.datastream
+
+case class NodeNameSelector(producer : StreamProducer) {
+ def getName : String = {
+ producer.name match {
+ case null => producer.toString
+ case _ => producer.name
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/OutputFieldNameConst.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/OutputFieldNameConst.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/OutputFieldNameConst.scala
new file mode 100644
index 0000000..64659b7
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/OutputFieldNameConst.scala
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.datastream
+
+object OutputFieldNameConst {
+ val FIELD_PREFIX = "f"
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/SpoutProxy.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/SpoutProxy.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/SpoutProxy.scala
new file mode 100644
index 0000000..aca3b5b
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/SpoutProxy.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.datastream
+
+import backtype.storm.spout.SpoutOutputCollector
+import backtype.storm.task.TopologyContext
+import backtype.storm.topology.OutputFieldsDeclarer
+import backtype.storm.topology.base.BaseRichSpout
+import backtype.storm.tuple.Fields
+
+/**
+ * Declare delegated BaseRichSpout with given field names
+ *
+ * @param delegate delegated BaseRichSpout
+ * @param outputFields given field names
+ */
+case class SpoutProxy(delegate: BaseRichSpout, outputFields: java.util.List[String]) extends BaseRichSpout{
+ def open(conf: java.util.Map[_, _], context: TopologyContext, collector: SpoutOutputCollector) {
+ this.delegate.open(conf, context, collector)
+ }
+
+ def nextTuple {
+ this.delegate.nextTuple
+ }
+
+ override def ack(msgId: AnyRef) {
+ this.delegate.ack(msgId)
+ }
+
+ override def fail(msgId: AnyRef) {
+ this.delegate.fail(msgId)
+ }
+
+ override def deactivate {
+ this.delegate.deactivate
+ }
+
+ override def declareOutputFields(declarer: OutputFieldsDeclarer) {
+ declarer.declare(new Fields(outputFields))
+ }
+
+ override def close {
+ this.delegate.close
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StormBoltFactory.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StormBoltFactory.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StormBoltFactory.scala
new file mode 100644
index 0000000..2a2e268
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StormBoltFactory.scala
@@ -0,0 +1,47 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. See the NOTICE file distributed with
+ * * this work for additional information regarding copyright ownership.
+ * * The ASF licenses this file to You under the Apache License, Version 2.0
+ * * (the "License"); you may not use this file except in compliance with
+ * * the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+package org.apache.eagle.datastream
+
+import backtype.storm.topology.base.BaseRichBolt
+import com.typesafe.config.Config
+
+object StormBoltFactory {
+ def getBoltWrapper(graph: AbstractStreamProducerGraph, producer : StreamProducer, config : Config) : BaseRichBolt = {
+ producer match{
+ case FlatMapProducer(id, worker) => {
+ if(worker.isInstanceOf[JavaStormStreamExecutor[EagleTuple]]){
+ worker.asInstanceOf[JavaStormStreamExecutor[EagleTuple]].prepareConfig(config)
+ JavaStormBoltWrapper(worker.asInstanceOf[JavaStormStreamExecutor[EagleTuple]])
+ }else if(worker.isInstanceOf[StormStreamExecutor[EagleTuple]]){
+ worker.asInstanceOf[StormStreamExecutor[EagleTuple]].prepareConfig(config)
+ StormBoltWrapper(worker.asInstanceOf[StormStreamExecutor[EagleTuple]])
+ }else {
+ throw new UnsupportedOperationException
+ }
+ }
+ case FilterProducer(id, fn) => {
+ FilterBoltWrapper(fn)
+ }
+ case MapProducer(id, n, fn) => {
+ MapBoltWrapper(n, fn)
+ }
+ case _ => throw new UnsupportedOperationException
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StormBoltWrapper.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StormBoltWrapper.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StormBoltWrapper.scala
new file mode 100644
index 0000000..7f27483
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StormBoltWrapper.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.datastream
+
+import java.util
+
+import backtype.storm.task.{OutputCollector, TopologyContext}
+import backtype.storm.topology.OutputFieldsDeclarer
+import backtype.storm.topology.base.BaseRichBolt
+import backtype.storm.tuple.{Fields, Tuple}
+import org.slf4j.LoggerFactory
+
+import scala.collection.JavaConverters._
+
+case class StormBoltWrapper(worker : StormStreamExecutor[EagleTuple]) extends BaseRichBolt{
+ val LOG = LoggerFactory.getLogger(StormBoltWrapper.getClass)
+ var _collector : OutputCollector = null
+
+ override def prepare(stormConf: util.Map[_, _], context: TopologyContext, collector: OutputCollector): Unit = {
+ _collector = collector
+ worker.init
+ }
+
+ override def execute(input : Tuple): Unit = {
+ try {
+ worker.flatMap(input.getValues.asScala, new Collector[EagleTuple] {
+ override def collect(t: EagleTuple): Unit = {
+ _collector.emit(input, t.getList.asJava)
+ }
+ })
+ }catch{
+ case ex: Exception => {
+ LOG.error("fail executing", ex)
+ _collector.fail(input)
+ throw new RuntimeException(ex)
+ }
+ }
+ _collector.ack(input)
+ }
+
+ override def declareOutputFields(declarer : OutputFieldsDeclarer): Unit ={
+ val fields = worker.fields
+ LOG.info("output fields for worker " + worker + " : " + fields.toList)
+ declarer.declare(new Fields(fields:_*))
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StormExecutorForAlertWrapper.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StormExecutorForAlertWrapper.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StormExecutorForAlertWrapper.scala
new file mode 100644
index 0000000..6ff1d52
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StormExecutorForAlertWrapper.scala
@@ -0,0 +1,43 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. See the NOTICE file distributed with
+ * * this work for additional information regarding copyright ownership.
+ * * The ASF licenses this file to You under the Apache License, Version 2.0
+ * * (the "License"); you may not use this file except in compliance with
+ * * the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.datastream
+
+import java.util
+
+import com.typesafe.config.Config
+
+case class StormExecutorForAlertWrapper(delegate: StormStreamExecutor[Tuple2[String, util.SortedMap[AnyRef, AnyRef]]], streamName: String)
+ extends StormStreamExecutor3[String, String, util.SortedMap[Object, Object]]{
+ override def prepareConfig(config: Config): Unit = {
+ delegate.prepareConfig(config)
+ }
+
+ override def init: Unit = {
+ delegate.init
+ }
+
+ override def flatMap(input: Seq[AnyRef], collector: Collector[Tuple3[String, String, util.SortedMap[Object, Object]]]): Unit = {
+ delegate.flatMap(input, new Collector[Tuple2[String, util.SortedMap[AnyRef, AnyRef]]] {
+ override def collect(r: Tuple2[String, util.SortedMap[AnyRef, AnyRef]]): Unit = {
+ collector.collect(Tuple3(r.f0, streamName, r.f1))
+ }
+ })
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StormSpoutFactory.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StormSpoutFactory.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StormSpoutFactory.scala
new file mode 100644
index 0000000..dcb51fd
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StormSpoutFactory.scala
@@ -0,0 +1,46 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. See the NOTICE file distributed with
+ * * this work for additional information regarding copyright ownership.
+ * * The ASF licenses this file to You under the Apache License, Version 2.0
+ * * (the "License"); you may not use this file except in compliance with
+ * * the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+package org.apache.eagle.datastream
+
+import java.util
+
+import backtype.storm.topology.base.BaseRichSpout
+import com.typesafe.config.Config
+
+object StormSpoutFactory {
+ /**
+ * @param config context configuration
+ * @param sourceProducer source producer
+ * @return
+ */
+ def createSpout(config: Config, sourceProducer: StormSourceProducer) : BaseRichSpout = {
+ val numFields = sourceProducer.numFields
+ if(numFields <= 0) {
+ sourceProducer.source
+ }else{
+ var i = 0
+ val ret = new util.ArrayList[String]
+ while(i < numFields){
+ ret.add(OutputFieldNameConst.FIELD_PREFIX + i)
+ i += 1
+ }
+ SpoutProxy(sourceProducer.source, ret)
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StormStreamDAG.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StormStreamDAG.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StormStreamDAG.scala
new file mode 100644
index 0000000..f4129ae
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StormStreamDAG.scala
@@ -0,0 +1,68 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. See the NOTICE file distributed with
+ * * this work for additional information regarding copyright ownership.
+ * * The ASF licenses this file to You under the Apache License, Version 2.0
+ * * (the "License"); you may not use this file except in compliance with
+ * * the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+package org.apache.eagle.datastream
+
+import org.jgrapht.experimental.dag.DirectedAcyclicGraph
+
+import scala.collection.JavaConverters._
+import scala.collection.{JavaConversions, mutable}
+
+/**
+ * wrapper of DAG, used for storm topology compiler
+ */
+class StormStreamDAG(graph: DirectedAcyclicGraph[StreamProducer, StreamConnector]) extends AbstractStreamProducerGraph {
+ var nodeMap: mutable.Map[String, StreamProducer] = null
+
+ override def addEdge(from: StreamProducer, to: StreamProducer, streamConnector: StreamConnector): Unit = {
+ graph.addEdge(from, to, streamConnector)
+ }
+
+ override def addVertex(producer: StreamProducer): Unit = {
+ graph.addVertex(producer)
+ }
+
+ override def iterator(): Iterator[StreamProducer] = {
+ JavaConversions.asScalaIterator(graph.iterator())
+ }
+
+ override def isSource(v: StreamProducer): Boolean = {
+ graph.inDegreeOf(v) match {
+ case 0 => true
+ case _ => false
+ }
+ }
+
+ override def outgoingEdgesOf(v: StreamProducer): scala.collection.mutable.Set[StreamConnector] = {
+ JavaConversions.asScalaSet(graph.outgoingEdgesOf(v))
+ }
+
+ override def getNodeByName(name: String): Option[StreamProducer] = {
+ nodeMap.get(name)
+ }
+
+ def setNodeMap(nodeMap: mutable.Map[String, StreamProducer]): Unit = {
+ this.nodeMap = nodeMap
+ }
+
+ override def incomingVertexOf(v: StreamProducer): scala.collection.mutable.Set[StreamProducer] = {
+ val set = mutable.Set[StreamProducer]()
+ graph.incomingEdgesOf(v).asScala.foreach(e => set += graph.getEdgeSource(e))
+ set
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StormStreamDAGTransformer.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StormStreamDAGTransformer.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StormStreamDAGTransformer.scala
new file mode 100644
index 0000000..254d84b
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StormStreamDAGTransformer.scala
@@ -0,0 +1,46 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. See the NOTICE file distributed with
+ * * this work for additional information regarding copyright ownership.
+ * * The ASF licenses this file to You under the Apache License, Version 2.0
+ * * (the "License"); you may not use this file except in compliance with
+ * * the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.datastream
+
+import org.jgrapht.experimental.dag.DirectedAcyclicGraph
+import scala.collection.mutable
+
+/**
+ * convert generic DAG data structure to Storm specific DAG data structure for easy topology compiler
+ */
+object StormStreamDAGTransformer {
+ /**
+ * Transform DirectedAcyclicGraph[StreamProducer, StreamConnector] into StormStreamDAG
+ *
+ * @param dag DirectedAcyclicGraph[StreamProducer, StreamConnector]
+ * @return StormStreamDAG
+ */
+ def transform(dag: DirectedAcyclicGraph[StreamProducer, StreamConnector]) : StormStreamDAG = {
+ val stormDAG = new StormStreamDAG(dag)
+ val nodeMap = mutable.HashMap[String, StreamProducer]()
+ val iter = dag.iterator()
+ while(iter.hasNext){
+ val sp = iter.next()
+ nodeMap.put(sp.name, sp)
+ }
+ stormDAG.setNodeMap(nodeMap)
+ stormDAG
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StormTopologyCompiler.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StormTopologyCompiler.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StormTopologyCompiler.scala
new file mode 100644
index 0000000..dbe69d2
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StormTopologyCompiler.scala
@@ -0,0 +1,98 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. See the NOTICE file distributed with
+ * * this work for additional information regarding copyright ownership.
+ * * The ASF licenses this file to You under the Apache License, Version 2.0
+ * * (the "License"); you may not use this file except in compliance with
+ * * the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+package org.apache.eagle.datastream
+
+import java.util
+
+import backtype.storm.topology.base.BaseRichBolt
+import backtype.storm.topology.{BoltDeclarer, TopologyBuilder}
+import backtype.storm.tuple.Fields
+import com.typesafe.config.Config
+import org.slf4j.LoggerFactory
+
+case class StormTopologyCompiler(config: Config, graph: AbstractStreamProducerGraph) extends AbstractTopologyCompiler{
+ val LOG = LoggerFactory.getLogger(StormTopologyCompiler.getClass)
+ val boltCache = scala.collection.mutable.Map[StreamProducer, StormBoltWrapper]()
+
+ override def buildTopology: AbstractTopologyExecutor ={
+ val builder = new TopologyBuilder();
+ val iter = graph.iterator()
+ val boltDeclarerCache = scala.collection.mutable.Map[String, BoltDeclarer]()
+ while(iter.hasNext){
+ val from = iter.next()
+ val fromName = from.name
+ if(graph.isSource(from)){
+ val spout = StormSpoutFactory.createSpout(config, from.asInstanceOf[StormSourceProducer])
+ builder.setSpout(fromName, spout, from.parallelism)
+ LOG.info("Spout name : " + fromName + " with parallelism " + from.parallelism)
+ } else {
+ LOG.info("Bolt name:" + fromName)
+ }
+
+ val edges = graph.outgoingEdgesOf(from)
+ edges.foreach(sc => {
+ val toName = sc.to.name
+ var boltDeclarer : BoltDeclarer = null
+ val toBolt = createBoltIfAbsent(toName)
+ boltDeclarerCache.get(toName) match{
+ case None => {
+ var finalParallelism = 1
+ graph.getNodeByName(toName) match {
+ case Some(p) => finalParallelism = p.parallelism
+ case None => finalParallelism = 1
+ }
+ boltDeclarer = builder.setBolt(toName, toBolt, finalParallelism);
+ LOG.info("created bolt " + toName + " with parallelism " + finalParallelism)
+ boltDeclarerCache.put(toName, boltDeclarer)
+ }
+ case Some(bt) => boltDeclarer = bt
+ }
+ sc.groupByFields match{
+ case Nil => boltDeclarer.shuffleGrouping(fromName)
+ case p => boltDeclarer.fieldsGrouping(fromName, new Fields(fields(p)))
+ }
+ LOG.info("bolt connected " + fromName + "->" + toName + " with groupby fields " + sc.groupByFields)
+ })
+ }
+ new StormTopologyExecutorImpl(builder.createTopology, config)
+ }
+
+ def fields(fields : Seq[Int]): java.util.List[String] ={
+ val ret = new util.ArrayList[String]
+ fields.map(n => ret.add(OutputFieldNameConst.FIELD_PREFIX + n))
+ ret
+ }
+
+ def createBoltIfAbsent(name : String) : BaseRichBolt = {
+ val producer = graph.getNodeByName(name)
+ producer match{
+ case Some(p) => createBoltIfAbsent(graph, p)
+ case None => throw new IllegalArgumentException("please check bolt name " + name)
+ }
+ }
+
+ def createBoltIfAbsent(graph: AbstractStreamProducerGraph, producer : StreamProducer): BaseRichBolt ={
+ boltCache.get(producer) match{
+ case Some(bolt) => bolt
+ case None => {
+ StormBoltFactory.getBoltWrapper(graph, producer, config)
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StormTopologyExecutorImpl.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StormTopologyExecutorImpl.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StormTopologyExecutorImpl.scala
new file mode 100644
index 0000000..4cecb85
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StormTopologyExecutorImpl.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.datastream
+
+import backtype.storm.generated.StormTopology
+import backtype.storm.utils.Utils
+import backtype.storm.{Config, LocalCluster, StormSubmitter}
+import storm.trident.spout.RichSpoutBatchExecutor
+
+case class StormTopologyExecutorImpl(topology: StormTopology, config: com.typesafe.config.Config) extends AbstractTopologyExecutor {
+ @throws(classOf[Exception])
+ def execute {
+ val localMode: Boolean = config.getString("envContextConfig.mode").equalsIgnoreCase("local")
+ val conf: Config = new Config
+ conf.put(RichSpoutBatchExecutor.MAX_BATCH_SIZE_CONF, Int.box(64 * 1024))
+ conf.put(Config.TOPOLOGY_RECEIVER_BUFFER_SIZE, Int.box(8))
+ conf.put(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE, Int.box(32))
+ conf.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, Int.box(16384))
+ conf.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, Int.box(16384))
+
+ val topologyName = config.getString("envContextConfig.topologyName")
+ if (!localMode) {
+ StormSubmitter.submitTopologyWithProgressBar(topologyName, conf, topology)
+ }
+ else {
+ val cluster: LocalCluster = new LocalCluster
+ cluster.submitTopology(topologyName, conf, topology)
+ Utils.sleep(Integer.MAX_VALUE)
+ cluster.killTopology(topologyName)
+ cluster.shutdown
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamAlertExpansion.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamAlertExpansion.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamAlertExpansion.scala
new file mode 100644
index 0000000..fa83e6d
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamAlertExpansion.scala
@@ -0,0 +1,192 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. See the NOTICE file distributed with
+ * * this work for additional information regarding copyright ownership.
+ * * The ASF licenses this file to You under the Apache License, Version 2.0
+ * * (the "License"); you may not use this file except in compliance with
+ * * the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.datastream
+
+import java.util
+
+import com.typesafe.config.Config
+import org.apache.eagle.alert.dao.AlertDefinitionDAOImpl
+import org.apache.eagle.executor.AlertExecutorCreationUtils
+import org.jgrapht.experimental.dag.DirectedAcyclicGraph
+import org.slf4j.LoggerFactory
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.ListBuffer
+
+/**
+ * The constraints for alert is:
+ * 1. only 3 StreamProducers can be put immediately before MapProducer, FlatMapProducer, StreamUnionProducer
+ * 2. For StreamUnionProducer, the only supported unioned producers are MapProducer and FlatMapProducer
+ * 3. the output for MapProducer and FlatMapProducer is 2-field tuple, key and value, key is string, value has to be SortedMap
+ * 4. the framework will wrapper original MapProducer and FlatMapProducer to emit 3-field tuple, {key, streamName and value}
+ * 5. the framework will automatically partition traffic with first field
+ *
+ *
+ * 2 steps
+ * step 1: wrapper previous StreamProducer with one more field "streamName"
+ * step 2: partition alert executor by policy partitioner class
+ */
+
+class StreamAlertExpansion(config: Config) extends StreamDAGExpansion(config) {
+ val LOG = LoggerFactory.getLogger(classOf[StreamAlertExpansion])
+
+ override def expand(dag: DirectedAcyclicGraph[StreamProducer, StreamConnector]): Unit ={
+ val iter = dag.iterator()
+ val toBeAddedEdges = new ListBuffer[StreamConnector]
+ val toBeRemovedVertex = new ListBuffer[StreamProducer]
+ while(iter.hasNext) {
+ val current = iter.next()
+ dag.outgoingEdgesOf(current).foreach(edge => {
+ val child = edge.to
+ onIteration(toBeAddedEdges, toBeRemovedVertex, dag, current, child)
+ })
+ }
+ // add back edges
+ toBeAddedEdges.foreach(e => {
+ dag.addVertex(e.from)
+ dag.addVertex(e.to)
+ dag.addEdge(e.from, e.to, e)
+ })
+ toBeRemovedVertex.foreach(v => dag.removeVertex(v))
+ }
+
+ def onIteration(toBeAddedEdges: ListBuffer[StreamConnector], toBeRemovedVertex: ListBuffer[StreamProducer],
+ dag: DirectedAcyclicGraph[StreamProducer, StreamConnector], current: StreamProducer, child: StreamProducer): Unit = {
+ child match {
+ case AlertStreamSink(id, upStreamNames, alertExecutorId, withConsumer) => {
+ /**
+ * step 1: wrapper previous StreamProducer with one more field "streamName"
+ * for AlertStreamSink, we check previous StreamProducer and replace that
+ */
+ val newStreamProducers = new ListBuffer[StreamProducer]
+ current match {
+ case StreamUnionProducer(id, others) => {
+ val incomingEdges = dag.incomingEdgesOf(current)
+ incomingEdges.foreach(e => newStreamProducers += replace(toBeAddedEdges, toBeRemovedVertex, dag, e.from, upStreamNames.get(0)))
+ var i: Int = 1
+ others.foreach(o => {
+ newStreamProducers += replace(toBeAddedEdges, toBeRemovedVertex, dag, o, upStreamNames.get(i))
+ i += 1
+ })
+ }
+ case _: FlatMapProducer[AnyRef, AnyRef] => {
+ newStreamProducers += replace(toBeAddedEdges, toBeRemovedVertex, dag, current, upStreamNames.get(0))
+ }
+ case _: MapProducer => {
+ newStreamProducers += replace(toBeAddedEdges, toBeRemovedVertex, dag, current, upStreamNames.get(0))
+ }
+ case s: StreamProducer if dag.inDegreeOf(s) == 0 => {
+ newStreamProducers += replace(toBeAddedEdges, toBeRemovedVertex, dag, current, upStreamNames.get(0))
+ }
+ case p@_ => throw new IllegalStateException(s"$p can not be put before AlertStreamSink, only StreamUnionProducer,FlatMapProducer and MapProducer are supported")
+ }
+
+ /**
+ * step 2: partition alert executor by policy partitioner class
+ */
+ val alertExecutors = AlertExecutorCreationUtils.createAlertExecutors(config, new AlertDefinitionDAOImpl(config), upStreamNames, alertExecutorId)
+ var alertProducers = new scala.collection.mutable.MutableList[StreamProducer]
+ alertExecutors.foreach(exec => {
+ val t = FlatMapProducer(UniqueId.incrementAndGetId(), exec).withName(exec.getAlertExecutorId() + "_" + exec.getPartitionSeq())
+ t.setConfig(config)
+ t.setGraph(dag)
+ alertProducers += t
+ newStreamProducers.foreach(newsp => toBeAddedEdges += StreamConnector(newsp, t).groupBy(Seq(0)))
+ })
+
+ // remove AlertStreamSink
+ toBeRemovedVertex += child
+
+ // add alert consumer if necessary
+ if (withConsumer) {
+ AlertExecutorConsumerUtils.setupAlertConsumers(toBeAddedEdges, alertProducers.toList)
+ }
+ }
+ case _ =>
+ }
+ }
+
+ private def replace(toBeAddedEdges: ListBuffer[StreamConnector], toBeRemovedVertex: ListBuffer[StreamProducer],
+ dag: DirectedAcyclicGraph[StreamProducer, StreamConnector], current: StreamProducer, upStreamName: String) : StreamProducer= {
+ var newsp: StreamProducer = null
+ current match {
+ case _: FlatMapProducer[AnyRef, AnyRef] => {
+ val mapper = current.asInstanceOf[FlatMapProducer[_, _]].mapper
+ mapper match {
+ case a: JavaStormStreamExecutor[EagleTuple] => {
+ val newmapper = new JavaStormExecutorForAlertWrapper(a.asInstanceOf[JavaStormStreamExecutor[Tuple2[String, util.SortedMap[AnyRef, AnyRef]]]], upStreamName)
+ newsp = FlatMapProducer(UniqueId.incrementAndGetId(), newmapper)
+ newsp.setGraph(dag)
+ newsp.setConfig(config)
+ }
+ case b: StormStreamExecutor[EagleTuple] => {
+ val newmapper = StormExecutorForAlertWrapper(b.asInstanceOf[StormStreamExecutor[Tuple2[String, util.SortedMap[AnyRef, AnyRef]]]], upStreamName)
+ newsp = FlatMapProducer(UniqueId.incrementAndGetId(), newmapper)
+ newsp.setGraph(dag)
+ newsp.setConfig(config)
+ }
+ case _ => throw new IllegalArgumentException
+ }
+ // remove old StreamProducer and replace that with new StreamProducer
+ val incomingEdges = dag.incomingEdgesOf(current)
+ incomingEdges.foreach(e => toBeAddedEdges += StreamConnector(e.from, newsp))
+ val outgoingEdges = dag.outgoingEdgesOf(current)
+ outgoingEdges.foreach(e => toBeAddedEdges += StreamConnector(newsp, e.to))
+ toBeRemovedVertex += current
+ }
+ case _: MapProducer => {
+ val mapper = current.asInstanceOf[MapProducer].fn
+ val newfun: (AnyRef => AnyRef) = {
+ a => mapper(a) match {
+ case scala.Tuple2(x1, x2) => (x1, upStreamName, x2)
+ case _ => throw new IllegalArgumentException
+ }
+ }
+ current match {
+ case MapProducer(id, 2, fn) => newsp = MapProducer(UniqueId.incrementAndGetId(), 3, newfun)
+ case _ => throw new IllegalArgumentException
+ }
+ val incomingEdges = dag.incomingEdgesOf(current)
+ incomingEdges.foreach(e => toBeAddedEdges += StreamConnector(e.from, newsp))
+ val outgoingEdges = dag.outgoingEdgesOf(current)
+ outgoingEdges.foreach(e => toBeAddedEdges += StreamConnector(newsp, e.to))
+ toBeRemovedVertex += current
+ }
+ case s: StreamProducer if dag.inDegreeOf(s) == 0 => {
+ val fn:(AnyRef => AnyRef) = {
+ n => {
+ n match {
+ case scala.Tuple3 => n
+ case scala.Tuple2(x1,x2) => (x1,upStreamName,x2)
+ case scala.Tuple1(x1) => (if(x1 == null) null else x1.hashCode(),upStreamName,x1)
+ case _ => (if(n == null) null else n.hashCode(),upStreamName,n)
+ }
+ }
+ }
+ newsp = MapProducer(UniqueId.incrementAndGetId(),3,fn)
+ toBeAddedEdges += StreamConnector(current,newsp)
+ val outgoingEdges = dag.outgoingEdgesOf(current)
+ outgoingEdges.foreach(e => toBeAddedEdges += StreamConnector(newsp,e.to))
+ }
+ case _ => throw new IllegalArgumentException("Only FlatMapProducer and MapProducer can be replaced before AlertStreamSink")
+ }
+ newsp
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamAppDSL.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamAppDSL.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamAppDSL.scala
new file mode 100644
index 0000000..c6a3d3d
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamAppDSL.scala
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.datastream
+
+import backtype.storm.topology.base.BaseRichSpout
+import com.typesafe.config._
+import org.apache.eagle.dataproc.impl.storm.AbstractStormSpoutProvider
+import org.apache.eagle.dataproc.util.ConfigOptionParser
+
+import scala.reflect.runtime.universe._
+
+/**
+ * @since 11/6/15
+ */
+trait ConfigContext{
+ def set(config:Config)
+ def config:Config
+
+ def set[T<:AnyRef](key:String,value:T): Unit = {
+ set(config.withValue(key,ConfigValueFactory.fromAnyRef(value)))
+ }
+
+ /**
+ *
+ * @param key config key
+ * @param default default value
+ * @tparam T return type
+ * @return
+ */
+ def get[T](key:String,default:T=null)(implicit tag:TypeTag[T]):T = {
+ if(config.hasPath(key)) {
+ get(key)
+ } else default
+ }
+
+ def get[T](key:String)(implicit tag: TypeTag[T]):T = tag.tpe match {
+ case STRING_TYPE => config.getString(key).asInstanceOf[T]
+ case TypeTag.Double => config.getDouble(key).asInstanceOf[T]
+ case TypeTag.Long => config.getLong(key).asInstanceOf[T]
+ case TypeTag.Int => config.getInt(key).asInstanceOf[T]
+ case TypeTag.Byte => config.getBytes(key).asInstanceOf[T]
+ case TypeTag.Boolean => config.getBoolean(key).asInstanceOf[T]
+ case NUMBER_TYPE => config.getNumber(key).asInstanceOf[T]
+ case OBJECT_TYPE => config.getObject(key).asInstanceOf[T]
+ case VALUE_TYPE => config.getValue(key).asInstanceOf[T]
+ case ANY_REF_TYPE => config.getAnyRef(key).asInstanceOf[T]
+ case INT_LIST_TYPE => config.getIntList(key).asInstanceOf[T]
+ case DOUBLE_LIST_TYPE => config.getDoubleList(key).asInstanceOf[T]
+ case BOOL_LIST_TYPE => config.getBooleanList(key).asInstanceOf[T]
+ case LONG_LIST_TYPE => config.getLongList(key).asInstanceOf[T]
+ case _ => throw new UnsupportedOperationException(s"$tag is not supported yet")
+ }
+
+ val STRING_TYPE = typeOf[String]
+ val NUMBER_TYPE = typeOf[Number]
+ val INT_LIST_TYPE = typeOf[List[Int]]
+ val BOOL_LIST_TYPE = typeOf[List[Boolean]]
+ val DOUBLE_LIST_TYPE = typeOf[List[Double]]
+ val LONG_LIST_TYPE = typeOf[List[Double]]
+ val OBJECT_TYPE = typeOf[ConfigObject]
+ val VALUE_TYPE = typeOf[ConfigValue]
+ val ANY_REF_TYPE = typeOf[AnyRef]
+}
+
+/**
+ * Stream APP DSL
+ * @tparam E
+ */
+trait StreamApp[+E<:ExecutionEnvironment] extends App with ConfigContext{
+ private var _executed = false
+ private var _config:Config = null
+
+ override def config:Config = _config
+
+ override def set(config:Config) = _config = config
+
+ def env:E
+
+ def execute() {
+ env.execute()
+ _executed = true
+ }
+
+ override def main(args: Array[String]): Unit = {
+ _config = new ConfigOptionParser().load(args)
+ super.main(args)
+ if(!_executed) execute()
+ }
+}
+
+trait StormStreamApp extends StreamApp[StormExecutionEnvironment]{
+ private var _env:StormExecutionEnvironment = null
+ def source(sourceProvider: AbstractStormSpoutProvider) = {
+ val spout = sourceProvider.getSpout(config)
+ env.newSource(spout)
+ }
+
+ def source(spout:BaseRichSpout) = env.newSource(spout)
+
+ override def env:StormExecutionEnvironment = {
+ if(_env == null){
+ _env = ExecutionEnvironmentFactory.getStorm(config)
+ }
+ _env
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamConnector.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamConnector.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamConnector.scala
new file mode 100644
index 0000000..0cece47
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamConnector.scala
@@ -0,0 +1,28 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. See the NOTICE file distributed with
+ * * this work for additional information regarding copyright ownership.
+ * * The ASF licenses this file to You under the Apache License, Version 2.0
+ * * (the "License"); you may not use this file except in compliance with
+ * * the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+package org.apache.eagle.datastream
+
+case class StreamConnector(from: StreamProducer, to: StreamProducer) {
+ var groupByFields : Seq[Int] = Nil
+
+ def groupBy(fields : Seq[Int]) : StreamConnector = {
+ groupByFields = fields
+ this
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamDAGExpansion.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamDAGExpansion.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamDAGExpansion.scala
new file mode 100644
index 0000000..7e15233
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamDAGExpansion.scala
@@ -0,0 +1,27 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. See the NOTICE file distributed with
+ * * this work for additional information regarding copyright ownership.
+ * * The ASF licenses this file to You under the Apache License, Version 2.0
+ * * (the "License"); you may not use this file except in compliance with
+ * * the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.datastream
+
+import com.typesafe.config.Config
+import org.jgrapht.experimental.dag.DirectedAcyclicGraph
+
+abstract class StreamDAGExpansion(config: Config) {
+ def expand(dag: DirectedAcyclicGraph[StreamProducer, StreamConnector])
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamGroupbyExpansion.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamGroupbyExpansion.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamGroupbyExpansion.scala
new file mode 100644
index 0000000..42bc9a8
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamGroupbyExpansion.scala
@@ -0,0 +1,59 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. See the NOTICE file distributed with
+ * * this work for additional information regarding copyright ownership.
+ * * The ASF licenses this file to You under the Apache License, Version 2.0
+ * * (the "License"); you may not use this file except in compliance with
+ * * the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.datastream
+
+import com.typesafe.config.Config
+import org.jgrapht.experimental.dag.DirectedAcyclicGraph
+import org.slf4j.LoggerFactory
+import scala.collection.JavaConversions._
+import scala.collection.mutable.ListBuffer
+
+/**
+ * Replace GroupByProducer(Vertex) with StreamConnector (Edge)
+ * @param config context configuration
+ */
+class StreamGroupbyExpansion(config: Config) extends StreamDAGExpansion(config){
+ val LOG = LoggerFactory.getLogger(classOf[StreamGroupbyExpansion])
+
+ override def expand(dag: DirectedAcyclicGraph[StreamProducer, StreamConnector]) = {
+ val iter = dag.iterator()
+ var toBeAddedEdges = new ListBuffer[StreamConnector]
+ var toBeRemovedVertex = new ListBuffer[StreamProducer]
+ while(iter.hasNext) {
+ val current = iter.next()
+ dag.outgoingEdgesOf(current).foreach(edge => {
+ val child = edge.to
+ child match {
+ case p : GroupByProducer => {
+ dag.outgoingEdgesOf(p).foreach(c2 => {
+ toBeAddedEdges += StreamConnector(current, c2.to).groupBy(p.fields)
+ })
+ toBeRemovedVertex += p
+ }
+ case _ =>
+ }
+ })
+ }
+
+ // add back edges
+ toBeAddedEdges.foreach(e => dag.addEdge(e.from, e.to, e))
+ toBeRemovedVertex.foreach(v => dag.removeVertex(v))
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamNameExpansion.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamNameExpansion.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamNameExpansion.scala
new file mode 100644
index 0000000..5208a97
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamNameExpansion.scala
@@ -0,0 +1,41 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. See the NOTICE file distributed with
+ * * this work for additional information regarding copyright ownership.
+ * * The ASF licenses this file to You under the Apache License, Version 2.0
+ * * (the "License"); you may not use this file except in compliance with
+ * * the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.datastream
+
+import com.typesafe.config.Config
+import org.jgrapht.experimental.dag.DirectedAcyclicGraph
+import org.slf4j.LoggerFactory
+
+/**
+ * to set name for each StreamProducer
+ * 1. if name is given programatically, then use this name
+ * 2. otherwise use name generated by scala internally
+ */
+class StreamNameExpansion(config: Config) extends StreamDAGExpansion(config){
+ val LOG = LoggerFactory.getLogger(classOf[StreamNameExpansion])
+
+ override def expand(dag: DirectedAcyclicGraph[StreamProducer, StreamConnector]) = {
+ val iter = dag.iterator()
+ while(iter.hasNext){
+ val sp = iter.next()
+ sp.name = NodeNameSelector(sp).getName
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamParallelismConfigExpansion.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamParallelismConfigExpansion.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamParallelismConfigExpansion.scala
new file mode 100644
index 0000000..0264ca6
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamParallelismConfigExpansion.scala
@@ -0,0 +1,55 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. See the NOTICE file distributed with
+ * * this work for additional information regarding copyright ownership.
+ * * The ASF licenses this file to You under the Apache License, Version 2.0
+ * * (the "License"); you may not use this file except in compliance with
+ * * the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.datastream
+
+import java.util.regex.Pattern
+
+import com.typesafe.config.{ConfigValue, ConfigObject, Config}
+import org.jgrapht.experimental.dag.DirectedAcyclicGraph
+import org.slf4j.LoggerFactory
+import scala.collection.JavaConverters._
+
+class StreamParallelismConfigExpansion(config: Config) extends StreamDAGExpansion(config){
+ val LOG = LoggerFactory.getLogger(classOf[StreamParallelismConfigExpansion])
+
+ override def expand(dag: DirectedAcyclicGraph[StreamProducer, StreamConnector]) = {
+ val map = getParallelismMap(config)
+ val iter = dag.iterator()
+ while(iter.hasNext){
+ val streamProducer = iter.next()
+ if(streamProducer.name != null) {
+ map.foreach(tuple => {
+ tuple._1.matcher(streamProducer.name).find() match {
+ case true => streamProducer.parallelism = tuple._2
+ case false =>
+ }
+ })
+ }
+ }
+ }
+
+ private def getParallelismMap(config: Config) : Map[Pattern, Int]= {
+ val parallelismConfig: ConfigObject = config.getObject("envContextConfig.parallelismConfig")
+ LOG.info("Found parallelismConfig ? " + (if (parallelismConfig == null) "no" else "yes"))
+ parallelismConfig.asScala.toMap map {
+ case (name, value) => (Pattern.compile(name), value.asInstanceOf[ConfigValue].unwrapped().asInstanceOf[Int])
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamProducer.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamProducer.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamProducer.scala
new file mode 100644
index 0000000..9fb3e22
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamProducer.scala
@@ -0,0 +1,193 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. See the NOTICE file distributed with
+ * * this work for additional information regarding copyright ownership.
+ * * The ASF licenses this file to You under the Apache License, Version 2.0
+ * * (the "License"); you may not use this file except in compliance with
+ * * the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+package org.apache.eagle.datastream
+
+import java.util
+import java.util.concurrent.atomic.AtomicInteger
+
+import backtype.storm.topology.base.BaseRichSpout
+import com.typesafe.config.Config
+import org.jgrapht.experimental.dag.DirectedAcyclicGraph
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+
+/**
+ * StreamProducer is the base class for all other concrete StreamProducer
+ * It defines high level API for user to organize data stream flow
+ *
+ * StreamProducer is independent of execution environment
+ */
+
+trait StreamProducer{
+ var name: String = null
+ var parallelism: Int = 1
+ var graph: DirectedAcyclicGraph[StreamProducer, StreamConnector] = null
+ var config: Config = null
+
+ private def incrementAndGetId() = UniqueId.incrementAndGetId()
+
+ def setGraph(graph: DirectedAcyclicGraph[StreamProducer, StreamConnector]): Unit = this.graph = graph
+ def getGraph: DirectedAcyclicGraph[StreamProducer, StreamConnector] = graph
+ def setConfig(config: Config) : Unit = this.config = config
+ def getConfig: Config = config
+
+ def filter(fn : AnyRef => Boolean): StreamProducer ={
+ val ret = FilterProducer(incrementAndGetId(), fn)
+ hookupDAG(graph, this, ret)
+ ret
+ }
+
+ def flatMap[T, R](mapper : FlatMapper[T, R]) : StreamProducer = {
+ val ret = FlatMapProducer(incrementAndGetId(), mapper)
+ hookupDAG(graph, this, ret)
+ ret
+ }
+
+ def map1(fn : AnyRef => AnyRef) : StreamProducer = {
+ val ret = MapProducer(incrementAndGetId(), 1, fn)
+ hookupDAG(graph, this, ret)
+ ret
+ }
+
+ def map2(fn : AnyRef => AnyRef) : StreamProducer = {
+ val ret = MapProducer(incrementAndGetId(), 2, fn)
+ hookupDAG(graph, this, ret)
+ ret
+ }
+
+ def map3(fn : AnyRef => AnyRef) : StreamProducer = {
+ val ret = MapProducer(incrementAndGetId(), 3, fn)
+ hookupDAG(graph, this, ret)
+ ret
+ }
+
+ def map4(fn : AnyRef => AnyRef) : StreamProducer = {
+ val ret = MapProducer(incrementAndGetId(), 4, fn)
+ hookupDAG(graph, this, ret)
+ ret
+ }
+
+ /**
+ * starting from 0, groupby operator would be upon edge of the graph
+ */
+ def groupBy(fields : Int*) : StreamProducer = {
+ // validate each field index is greater or equal to 0
+ fields.foreach(n => if(n<0) throw new IllegalArgumentException("field index should be always >= 0"))
+ val ret = GroupByProducer(incrementAndGetId(), fields)
+ hookupDAG(graph, this, ret)
+ ret
+ }
+
+ //groupBy java version, starting from 1
+ def groupBy(fields : java.util.List[Integer]) : StreamProducer = {
+ // validate each field index is greater or equal to 0
+ fields.foreach(n => if(n<0) throw new IllegalArgumentException("field index should be always >= 0"))
+ val ret = GroupByProducer(incrementAndGetId(), fields.asScala.toSeq.asInstanceOf[Seq[Int]])
+ hookupDAG(graph, this, ret)
+ ret
+ }
+
+ def streamUnion(others : Seq[StreamProducer]) : StreamProducer = {
+ val ret = StreamUnionProducer(incrementAndGetId(), others)
+ hookupDAG(graph, this, ret)
+ ret
+ }
+
+ /**
+ * alert is always sink of data flow
+ */
+ def alertWithConsumer(upStreamNames: util.List[String], alertExecutorId : String) = {
+ alert(upStreamNames, alertExecutorId, true)
+ }
+
+ def alertWithoutConsumer(upStreamNames: util.List[String], alertExecutorId : String) = {
+ alert(upStreamNames, alertExecutorId, false)
+ }
+
+ def alert(upStreamNames: util.List[String], alertExecutorId : String, withConsumer: Boolean=true) = {
+ val ret = AlertStreamSink(incrementAndGetId(), upStreamNames, alertExecutorId, withConsumer)
+ hookupDAG(graph, this, ret)
+ }
+
+ def alertWithConsumer(upStreamName: String, alertExecutorId : String): Unit ={
+ alert(util.Arrays.asList(upStreamName), alertExecutorId, true)
+ }
+
+ def alertWithoutConsumer(upStreamName: String, alertExecutorId : String): Unit ={
+ alert(util.Arrays.asList(upStreamName), alertExecutorId, false)
+ }
+
+ def hookupDAG(graph: DirectedAcyclicGraph[StreamProducer, StreamConnector], current: StreamProducer, next: StreamProducer) = {
+ current.getGraph.addVertex(next)
+ current.getGraph.addEdge(current, next, StreamConnector(current, next))
+ passOnContext(current, next)
+ }
+
+ private def passOnContext(current: StreamProducer, next: StreamProducer): Unit ={
+ next.graph = current.graph
+ next.config = current.config
+ }
+
+ /**
+ * can be set by programatically or by configuration
+ */
+ def withParallelism(parallelism : Int) : StreamProducer = {
+ this.parallelism = parallelism
+ this
+ }
+
+ def withName(name : String) : StreamProducer = {
+ this.name = name
+ this
+ }
+}
+
+case class FilterProducer(id: Int, fn : AnyRef => Boolean) extends StreamProducer
+
+case class FlatMapProducer[T, R](id: Int, var mapper: FlatMapper[T, R]) extends StreamProducer {
+ override def toString() = mapper.toString + "_" + id
+}
+
+case class MapProducer(id: Int, numOutputFields : Int, var fn : AnyRef => AnyRef) extends StreamProducer
+
+case class GroupByProducer(id: Int, fields : Seq[Int]) extends StreamProducer
+
+case class StreamUnionProducer(id: Int, others: Seq[StreamProducer]) extends StreamProducer
+
+case class StormSourceProducer(id: Int, source : BaseRichSpout) extends StreamProducer{
+ var numFields : Int = 0
+ /**
+ * rename outputfields to f0, f1, f2, ...
+ * if one spout declare some field names, those fields names will be modified
+ * @param n
+ */
+ def renameOutputFields(n : Int): StormSourceProducer ={
+ this.numFields = n
+ this
+ }
+}
+
+case class AlertStreamSink(id: Int, upStreamNames: util.List[String], alertExecutorId : String, withConsumer: Boolean=true) extends StreamProducer
+
+object UniqueId{
+ val id : AtomicInteger = new AtomicInteger(0);
+ def incrementAndGetId() : Int = {
+ id.incrementAndGet()
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamUnionExpansion.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamUnionExpansion.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamUnionExpansion.scala
new file mode 100644
index 0000000..83a83fe
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/StreamUnionExpansion.scala
@@ -0,0 +1,62 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. See the NOTICE file distributed with
+ * * this work for additional information regarding copyright ownership.
+ * * The ASF licenses this file to You under the Apache License, Version 2.0
+ * * (the "License"); you may not use this file except in compliance with
+ * * the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.datastream
+
+import com.typesafe.config.Config
+import org.jgrapht.experimental.dag.DirectedAcyclicGraph
+import org.slf4j.LoggerFactory
+import scala.collection.JavaConversions._
+import scala.collection.mutable.ListBuffer
+
+/**
+ * union operator should be expanded
+ */
+class StreamUnionExpansion(config: Config) extends StreamDAGExpansion(config){
+ val LOG = LoggerFactory.getLogger(classOf[StreamUnionExpansion])
+
+ override def expand(dag: DirectedAcyclicGraph[StreamProducer, StreamConnector]) = {
+ val iter = dag.iterator()
+ var toBeAddedEdges = new ListBuffer[StreamConnector]
+ var toBeRemovedVertex = new ListBuffer[StreamProducer]
+ while(iter.hasNext) {
+ val current = iter.next()
+ dag.outgoingEdgesOf(current).foreach(edge => {
+ val child = edge.to
+ val groupByFields = edge.groupByFields;
+ child match {
+ case StreamUnionProducer(id, others) => {
+ dag.outgoingEdgesOf(child).foreach(c2 => {
+ toBeAddedEdges += StreamConnector(current, c2.to).groupBy(groupByFields)
+ others.foreach(o => {
+ toBeAddedEdges += StreamConnector(o, c2.to).groupBy(groupByFields)
+ })
+ })
+ toBeRemovedVertex += child
+ }
+ case _ =>
+ }
+ })
+ }
+
+ // add back edges
+ toBeAddedEdges.foreach(e => dag.addEdge(e.from, e.to, e))
+ toBeRemovedVertex.foreach(v => dag.removeVertex(v))
+ }
+}