You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2016/01/12 08:47:57 UTC
[4/8] incubator-eagle git commit: EAGLE-79 Provide aggregation and
persistence DSL support
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/ExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/ExecutionEnvironment.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/ExecutionEnvironment.scala
index 7394b75..b189c57 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/ExecutionEnvironment.scala
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/ExecutionEnvironment.scala
@@ -63,6 +63,8 @@ abstract class ExecutionEnvironmentBase(private val conf:Config) extends Execut
implicit val i_conf = _config.get
StreamNameExpansion()
GraphPrinter.print(dag,message="Before expanded DAG ")
+ StreamAggregateExpansion()
+ GraphPrinter.print(dag,message="after analyze expanded DAG ")
StreamAlertExpansion()
StreamUnionExpansion()
StreamGroupbyExpansion()
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamAggregateExpansion.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamAggregateExpansion.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamAggregateExpansion.scala
new file mode 100644
index 0000000..a95001b
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamAggregateExpansion.scala
@@ -0,0 +1,82 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.datastream.core
+
+import com.typesafe.config.Config
+import org.apache.eagle.dataproc.impl.aggregate.AggregateExecutorFactory
+import org.apache.eagle.datastream.FlatMapper
+import org.jgrapht.experimental.dag.DirectedAcyclicGraph
+
+import scala.collection.JavaConversions.asScalaSet
+import scala.collection.mutable.ListBuffer
+
+/**
+ * The expansion job for stream analyze
+ *
+ * TODO : should re-use flow with stream alert expansion, make code cleaner
+ */
+class StreamAggregateExpansion(config: Config) extends StreamAlertExpansion(config) {
+
+ override def onIteration(toBeAddedEdges: ListBuffer[StreamConnector[Any, Any]], toBeRemovedVertex: ListBuffer[StreamProducer[Any]],
+ dag: DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any, Any]], current: StreamProducer[Any],
+ child: StreamProducer[Any]): Unit = {
+ child match {
+ case AggregateProducer(upStreamNames, analyzerId, cepQl, strategy) => {
+ /**
+ * Rewrite the tree to add output field wrapper since policy executors accept only fixed tuple format
+ */
+ val newStreamProducers = rewriteWithStreamOutputWrapper(current, dag, toBeAddedEdges, toBeRemovedVertex, upStreamNames)
+
+
+ val analyzeExecutors = if (cepQl != null) {
+ AggregateExecutorFactory.Instance.createExecutors(cepQl)
+ } else {
+ AggregateExecutorFactory.Instance.createExecutors(config, upStreamNames, analyzerId)
+ }
+
+ analyzeExecutors.foreach(exec => {
+ val t = FlatMapProducer(exec.asInstanceOf[FlatMapper[Any]]).nameAs(exec.getExecutorId() + "_" + exec.getPartitionSeq()).initWith(dag,config, hook = false)
+
+ // connect with previous
+ if (strategy == null) {
+ newStreamProducers.foreach(s => toBeAddedEdges += StreamConnector(s, t))
+ } else {
+ newStreamProducers.foreach(s => toBeAddedEdges += StreamConnector(s, t, strategy))
+ }
+
+ // connect with next
+ val outgoingEdges = dag.outgoingEdgesOf(child)
+ outgoingEdges.foreach(e => toBeAddedEdges += StreamConnector(t, e.to, e))
+ })
+
+ // remote current child
+ toBeRemovedVertex += child
+ }
+ case _ =>
+ }
+ }
+
+}
+
+object StreamAggregateExpansion{
+ def apply()(implicit config:Config, dag: DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]]): StreamAggregateExpansion ={
+ val e = new StreamAggregateExpansion(config)
+ e.expand(dag)
+ e
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamAlertExpansion.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamAlertExpansion.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamAlertExpansion.scala
index 84532dc..c731ac9 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamAlertExpansion.scala
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamAlertExpansion.scala
@@ -1,4 +1,5 @@
/*
+
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -19,18 +20,23 @@ package org.apache.eagle.datastream.core
import java.util
-import com.typesafe.config.Config
-import org.apache.eagle.alert.dao.AlertDefinitionDAOImpl
-import org.apache.eagle.datastream._
+import org.apache.eagle.alert.executor.AlertExecutorCreationUtils
+import org.apache.eagle.policy.dao.AlertDefinitionDAOImpl
+
+import scala.collection.JavaConversions.asScalaSet
+import scala.collection.mutable.ListBuffer
+import org.apache.eagle.datastream.EagleTuple
+import org.apache.eagle.datastream.JavaStormExecutorForAlertWrapper
+import org.apache.eagle.datastream.JavaStormStreamExecutor
+import org.apache.eagle.datastream.StormStreamExecutor
+import org.apache.eagle.datastream.Tuple2
import org.apache.eagle.datastream.storm.StormExecutorForAlertWrapper
import org.apache.eagle.datastream.utils.AlertExecutorConsumerUtils
-import org.apache.eagle.executor.AlertExecutorCreationUtils
import org.apache.eagle.service.client.EagleServiceConnector
import org.jgrapht.experimental.dag.DirectedAcyclicGraph
import org.slf4j.LoggerFactory
-import scala.collection.JavaConversions._
-import scala.collection.mutable.ListBuffer
+import com.typesafe.config.Config
/**
* The constraints for alert is:
@@ -69,7 +75,7 @@ case class StreamAlertExpansion(config: Config) extends StreamDAGExpansion(confi
toBeRemovedVertex.foreach(v => dag.removeVertex(v))
}
- def onIteration(toBeAddedEdges: ListBuffer[StreamConnector[Any,Any]], toBeRemovedVertex: ListBuffer[StreamProducer[Any]],
+ def onIteration(toBeAddedEdges: ListBuffer[StreamConnector[Any,Any]], toBeRemovedVertex: ListBuffer[StreamProducer[Any]],
dag: DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]], current: StreamProducer[Any], child: StreamProducer[Any]): Unit = {
child match {
case AlertStreamSink(upStreamNames, alertExecutorId, withConsumer,strategy) => {
@@ -77,28 +83,7 @@ case class StreamAlertExpansion(config: Config) extends StreamDAGExpansion(confi
* step 1: wrapper previous StreamProducer with one more field "streamName"
* for AlertStreamSink, we check previous StreamProducer and replace that
*/
- val newStreamProducers = new ListBuffer[StreamProducer[Any]]
- current match {
- case StreamUnionProducer(others) => {
- val incomingEdges = dag.incomingEdgesOf(current)
- incomingEdges.foreach(e => newStreamProducers += replace(toBeAddedEdges, toBeRemovedVertex, dag, e.from, upStreamNames.get(0)))
- var i: Int = 1
- others.foreach(o => {
- newStreamProducers += replace(toBeAddedEdges, toBeRemovedVertex, dag, o, upStreamNames.get(i))
- i += 1
- })
- }
- case _: FlatMapProducer[AnyRef, AnyRef] => {
- newStreamProducers += replace(toBeAddedEdges, toBeRemovedVertex, dag, current, upStreamNames.get(0))
- }
- case _: MapperProducer[AnyRef,AnyRef] => {
- newStreamProducers += replace(toBeAddedEdges, toBeRemovedVertex, dag, current, upStreamNames.get(0))
- }
- case s: StreamProducer[AnyRef] if dag.inDegreeOf(s) == 0 => {
- newStreamProducers += replace(toBeAddedEdges, toBeRemovedVertex, dag, current, upStreamNames.get(0))
- }
- case p@_ => throw new IllegalStateException(s"$p can not be put before AlertStreamSink, only StreamUnionProducer,FlatMapProducer and MapProducer are supported")
- }
+ val newStreamProducers = rewriteWithStreamOutputWrapper(current, dag, toBeAddedEdges, toBeRemovedVertex, upStreamNames)
/**
* step 2: partition alert executor by policy partitioner class
@@ -106,7 +91,7 @@ case class StreamAlertExpansion(config: Config) extends StreamDAGExpansion(confi
val alertExecutors = AlertExecutorCreationUtils.createAlertExecutors(config, new AlertDefinitionDAOImpl(new EagleServiceConnector(config)), upStreamNames, alertExecutorId)
var alertProducers = new scala.collection.mutable.MutableList[StreamProducer[Any]]
alertExecutors.foreach(exec => {
- val t = FlatMapProducer(exec).nameAs(exec.getAlertExecutorId + "_" + exec.getPartitionSeq).initWith(dag,config,hook = false)
+ val t = FlatMapProducer(exec).nameAs(exec.getExecutorId + "_" + exec.getPartitionSeq).initWith(dag,config, hook = false)
alertProducers += t
newStreamProducers.foreach(newsp => toBeAddedEdges += StreamConnector[Any,Any](newsp, t,Seq(0)))
if (strategy == null) {
@@ -129,7 +114,36 @@ case class StreamAlertExpansion(config: Config) extends StreamDAGExpansion(confi
}
}
- private def replace(toBeAddedEdges: ListBuffer[StreamConnector[Any,Any]], toBeRemovedVertex: ListBuffer[StreamProducer[Any]],
+ protected def rewriteWithStreamOutputWrapper(current: org.apache.eagle.datastream.core.StreamProducer[Any], dag: org.jgrapht.experimental.dag.DirectedAcyclicGraph[org.apache.eagle.datastream.core.StreamProducer[Any],org.apache.eagle.datastream.core.StreamConnector[Any,Any]], toBeAddedEdges: scala.collection.mutable.ListBuffer[org.apache.eagle.datastream.core.StreamConnector[Any,Any]], toBeRemovedVertex: scala.collection.mutable.ListBuffer[org.apache.eagle.datastream.core.StreamProducer[Any]], upStreamNames: java.util.List[String]) = {/**
+ * step 1: wrapper previous StreamProducer with one more field "streamName"
+ * for AlertStreamSink, we check previous StreamProducer and replace that
+ */
+ val newStreamProducers = new ListBuffer[StreamProducer[Any]]
+ current match {
+ case StreamUnionProducer(others) => {
+ val incomingEdges = dag.incomingEdgesOf(current)
+ incomingEdges.foreach(e => newStreamProducers += replace(toBeAddedEdges, toBeRemovedVertex, dag, e.from, upStreamNames.get(0)))
+ var i: Int = 1
+ others.foreach(o => {
+ newStreamProducers += replace(toBeAddedEdges, toBeRemovedVertex, dag, o, upStreamNames.get(i))
+ i += 1
+ })
+ }
+ case _: FlatMapProducer[AnyRef, AnyRef] => {
+ newStreamProducers += replace(toBeAddedEdges, toBeRemovedVertex, dag, current, upStreamNames.get(0))
+ }
+ case _: MapperProducer[AnyRef,AnyRef] => {
+ newStreamProducers += replace(toBeAddedEdges, toBeRemovedVertex, dag, current, upStreamNames.get(0))
+ }
+ case s: StreamProducer[AnyRef] if dag.inDegreeOf(s) == 0 => {
+ newStreamProducers += replace(toBeAddedEdges, toBeRemovedVertex, dag, current, upStreamNames.get(0))
+ }
+ case p@_ => throw new IllegalStateException(s"$p can not be put before AlertStreamSink, only StreamUnionProducer,FlatMapProducer and MapProducer are supported")
+ }
+ newStreamProducers
+ }
+
+ protected def replace(toBeAddedEdges: ListBuffer[StreamConnector[Any,Any]], toBeRemovedVertex: ListBuffer[StreamProducer[Any]],
dag: DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]], current: StreamProducer[Any], upStreamName: String) : StreamProducer[Any]= {
var newsp: StreamProducer[Any] = null
current match {
@@ -201,3 +215,4 @@ object StreamAlertExpansion{
}
}
+
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamProducer.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamProducer.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamProducer.scala
index 4d7dcd1..4d81424 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamProducer.scala
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamProducer.scala
@@ -24,13 +24,14 @@ import java.util.concurrent.atomic.AtomicInteger
import backtype.storm.topology.base.BaseRichSpout
import com.typesafe.config.Config
import org.apache.eagle.alert.entity.AlertAPIEntity
-import org.apache.eagle.datastream._
+import org.apache.eagle.datastream.FlatMapper
import org.apache.eagle.partition.PartitionStrategy
+import org.apache.eagle.policy.common.Constants
import org.jgrapht.experimental.dag.DirectedAcyclicGraph
import org.slf4j.LoggerFactory
-import scala.collection.JavaConversions._
-import scala.collection.JavaConverters._
+import scala.collection.JavaConversions.{asScalaBuffer, seqAsJavaList}
+import scala.collection.JavaConverters.asScalaBufferConverter
/**
* StreamProducer = StreamInfo + StreamProtocol
*
@@ -193,6 +194,24 @@ abstract class StreamProducer[+T <: Any] extends StreamInfo with StreamProtocol[
alert(util.Arrays.asList(upStreamName), alertExecutorId, consume = false)
}
+ def aggregate(upStreamNames: java.util.List[String], queryExecutorId : String, strategy: PartitionStrategy = null): StreamProducer[T] = {
+ val ret= AggregateProducer(upStreamNames, queryExecutorId, null, strategy)
+ hookup(this, ret)
+ ret
+ }
+
+ def aggregate(cql : String, strategy: PartitionStrategy): StreamProducer[T] = {
+ val ret= AggregateProducer(util.Arrays.asList(Constants.EAGLE_DEFAULT_POLICY_NAME), null, cql, strategy)
+ hookup(this, ret)
+ ret
+ }
+
+ def persist(executorId : String, storageType: StorageType.StorageType) : StreamProducer[T] = {
+ val ret = PersistProducer(executorId, storageType)
+ hookup(this, ret)
+ ret
+ }
+
protected def hookup[T1,T2](current: StreamProducer[T1], next: StreamProducer[T2]) = {
current.graph.addVertex(next)
current.graph.addEdge(current, next, StreamConnector(current, next))
@@ -216,7 +235,7 @@ abstract class StreamProducer[+T <: Any] extends StreamInfo with StreamProtocol[
/**
* Component name
- *
+ *
* @param componentName component name
* @return
*/
@@ -278,6 +297,10 @@ case class AlertStreamSink(upStreamNames: util.List[String], alertExecutorId : S
}
}
+case class AggregateProducer[T](upStreamNames: util.List[String], analyzerId : String, cepQl: String = null, strategy:PartitionStrategy = null) extends StreamProducer[T]
+
+case class PersistProducer[T](executorId :String, storageType: StorageType.StorageType) extends StreamProducer[T]
+
object UniqueId{
val id : AtomicInteger = new AtomicInteger(0);
def incrementAndGetId() : Int = {
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamProtocol.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamProtocol.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamProtocol.scala
index 346f728..3ed067d 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamProtocol.scala
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamProtocol.scala
@@ -83,6 +83,12 @@ class StreamInfo extends Serializable{
override def hashCode(): Int = new HashCodeBuilder().append(this.id).append(this.getClass).toHashCode
}
+
+object StorageType extends Enumeration {
+ type StorageType = Value
+ val KAFKA, DRUID, HBASE = Value
+}
+
/**
* Stream interaction protocol interface
*
@@ -144,8 +150,16 @@ trait StreamProtocol[+T <: Any]{
* @return
*/
def groupByKey(keyer:T => Any):StreamProducer[T]
+
def streamUnion[T2,T3](otherStreams : Seq[StreamProducer[T2]]) : StreamProducer[T3]
def alert(upStreamNames: Seq[String], alertExecutorId : String, consume: Boolean,strategy : PartitionStrategy)
+
+ def aggregate(upStreamNames: java.util.List[String], executorId :String, strategy:PartitionStrategy): StreamProducer[T]
+
+ def aggregate(cql : String, strategy:PartitionStrategy): StreamProducer[T]
+
+ def persist(executorId : String, storageType: StorageType.StorageType): StreamProducer[T]
+
/**
* Set processing element parallelism setting
* @param parallelismNum parallelism value
@@ -173,4 +187,4 @@ trait StreamProtocol[+T <: Any]{
def ~>[R](flatMapper : FlatMapper[R]) = this.flatMap[R](flatMapper)
def ! (upStreamNames: Seq[String], alertExecutorId : String, consume: Boolean = true,strategy: PartitionStrategy = null) = alert(upStreamNames, alertExecutorId, consume,strategy)
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormBoltFactory.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormBoltFactory.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormBoltFactory.scala
index b048b90..29b5cf4 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormBoltFactory.scala
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormBoltFactory.scala
@@ -20,6 +20,7 @@ package org.apache.eagle.datastream.storm
import backtype.storm.topology.base.BaseRichBolt
import com.typesafe.config.Config
+import org.apache.eagle.dataproc.impl.persist.PersistExecutor
import org.apache.eagle.datastream._
import org.apache.eagle.datastream.core._
@@ -47,6 +48,11 @@ object StormBoltFactory {
case foreach:ForeachProducer[Any] => {
ForeachBoltWrapper(foreach.fn)
}
+ case persist : PersistProducer[Any] => {
+ val persisExecutor = new PersistExecutor(persist.executorId, persist.storageType.toString());
+ persisExecutor.prepareConfig(config);
+ JavaStormBoltWrapper(persisExecutor.asInstanceOf[JavaStormStreamExecutor[EagleTuple]])
+ }
case _ => throw new UnsupportedOperationException(s"Unsupported producer: ${producer.toString}")
}
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormExecutorForAlertWrapper.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormExecutorForAlertWrapper.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormExecutorForAlertWrapper.scala
index 1b0d133..bb34972 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormExecutorForAlertWrapper.scala
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormExecutorForAlertWrapper.scala
@@ -18,8 +18,13 @@ package org.apache.eagle.datastream.storm
import java.util
+import org.apache.eagle.datastream.Collector
+import org.apache.eagle.datastream.StormStreamExecutor
+import org.apache.eagle.datastream.StormStreamExecutor3
+import org.apache.eagle.datastream.Tuple2
+import org.apache.eagle.datastream.Tuple3
+
import com.typesafe.config.Config
-import org.apache.eagle.datastream._
case class StormExecutorForAlertWrapper(delegate: StormStreamExecutor[Tuple2[String, util.SortedMap[AnyRef, AnyRef]]], streamName: String)
extends StormStreamExecutor3[String, String, util.SortedMap[Object, Object]]{
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/utils/AlertExecutorConsumerUtils.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/utils/AlertExecutorConsumerUtils.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/utils/AlertExecutorConsumerUtils.scala
index 67818e1..76250e2 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/utils/AlertExecutorConsumerUtils.scala
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/utils/AlertExecutorConsumerUtils.scala
@@ -22,10 +22,10 @@ package org.apache.eagle.datastream.utils
import java.util
import org.apache.eagle.alert.dedup.{AlertEmailDeduplicationExecutor, AlertEntityDeduplicationExecutor}
+import org.apache.eagle.alert.executor.AlertExecutor
import org.apache.eagle.alert.notification.AlertNotificationExecutor
import org.apache.eagle.alert.persist.AlertPersistExecutor
import org.apache.eagle.datastream.core.{StreamConnector, FlatMapProducer, StreamProducer}
-import org.apache.eagle.executor.AlertExecutor
import org.slf4j.{Logger, LoggerFactory}
import scala.collection.mutable.ListBuffer
@@ -54,8 +54,8 @@ object AlertExecutorConsumerUtils {
def setupAlertConsumers(toBeAddedEdges: ListBuffer[StreamConnector[Any,Any]], alertStreamProducers: List[StreamProducer[Any]]): Unit = {
val alertExecutorIdList: java.util.List[String] = new util.ArrayList[String]()
alertStreamProducers.map(x =>
- alertExecutorIdList.add(x.asInstanceOf[FlatMapProducer[AnyRef, AnyRef]].mapper.asInstanceOf[AlertExecutor].getAlertExecutorId));
- val alertDefDao = alertStreamProducers.head.asInstanceOf[FlatMapProducer[AnyRef, AnyRef]].mapper.asInstanceOf[AlertExecutor].getAlertDefinitionDao
+ alertExecutorIdList.add(x.asInstanceOf[FlatMapProducer[AnyRef, AnyRef]].mapper.asInstanceOf[AlertExecutor].getExecutorId));
+ val alertDefDao = alertStreamProducers.head.asInstanceOf[FlatMapProducer[AnyRef, AnyRef]].mapper.asInstanceOf[AlertExecutor].getPolicyDefinitionDao
val entityDedupExecutor: AlertEntityDeduplicationExecutor = new AlertEntityDeduplicationExecutor(alertExecutorIdList, alertDefDao)
val emailDedupExecutor: AlertEmailDeduplicationExecutor = new AlertEmailDeduplicationExecutor(alertExecutorIdList, alertDefDao)
val notificationExecutor: AlertNotificationExecutor = new AlertNotificationExecutor(alertExecutorIdList, alertDefDao)
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/TestStreamAggregate.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/TestStreamAggregate.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/TestStreamAggregate.java
new file mode 100644
index 0000000..ed5d705
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/TestStreamAggregate.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.datastream;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichSpout;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import junit.framework.Assert;
+import org.apache.eagle.datastream.core.*;
+import org.apache.eagle.datastream.storm.StormExecutionEnvironment;
+import org.apache.eagle.partition.PartitionStrategy;
+import org.apache.eagle.alert.entity.AlertAPIEntity;
+import org.jgrapht.experimental.dag.DirectedAcyclicGraph;
+import org.junit.Before;
+import org.junit.Test;
+import scala.collection.Seq;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+
+/**
+ * @since Dec 18, 2015
+ *
+ */
+public class TestStreamAggregate {
+
+ private Config config;
+
+ @SuppressWarnings("serial")
+ private final class SimpleSpout extends BaseRichSpout {
+ @SuppressWarnings("rawtypes")
+ @Override
+ public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+ }
+ @Override
+ public void nextTuple() {
+ }
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ }
+ }
+
+ public static class TestEnvironment extends StormExecutionEnvironment {
+ private static final long serialVersionUID = 1L;
+ public TestEnvironment(Config conf) {
+ super(conf);
+ }
+ @Override
+ public void execute(StreamDAG dag) {
+ System.out.println("DAT completed!");
+ }
+ }
+
+ public static class DummyStrategy implements PartitionStrategy {
+ private static final long serialVersionUID = 1L;
+ @Override
+ public int balance(String key, int buckNum) {
+ return 0;
+ }
+ };
+
+ public static class DummyExecutor extends JavaStormStreamExecutor2<String, AlertAPIEntity> {
+ @Override
+ public void prepareConfig(Config config) {
+ }
+ @Override
+ public void init() {
+ }
+ @Override
+ public void flatMap(List input, Collector collector) {
+ }
+ }
+
+ @Before
+ public void setUp() {
+ System.setProperty("config.resource", "/application.conf");
+ ConfigFactory.invalidateCaches();
+ config = ConfigFactory.load();
+ }
+
+ @SuppressWarnings({ "unchecked", "rawtypes", "serial" })
+ @Test
+ public void testAggregate1() {
+ StormExecutionEnvironment exe = new TestEnvironment(config);
+
+ BaseRichSpout spout = new SimpleSpout();
+ StormSourceProducer ssp = exe.fromSpout(spout);
+
+ ssp.flatMap(new FlatMapper<String>() {
+ @Override
+ public void flatMap(Seq<Object> input, Collector<String> collector) {
+ // do nothing
+ }
+ }).aggregate(Arrays.asList("c3EsLogEventStream"), "qid", new DummyStrategy());
+
+ try {
+ exe.execute();
+ Assert.fail("customzied flat mapper(non java storm executor) before analyze is not supported!");
+ } catch (Exception e ){
+ }
+ }
+
+ @SuppressWarnings({ "unchecked", "rawtypes", "serial" })
+ @Test
+ public void testAggregate() {
+ StormExecutionEnvironment exe = new TestEnvironment(config);
+ StormSourceProducer ssp = exe.fromSpout(new SimpleSpout());
+ DummyExecutor dummy = new DummyExecutor();
+ ssp.flatMap(dummy).aggregate(Arrays.asList("c3EsLogEventStream"), "analyzeStreamExecutor", new DummyStrategy());
+
+ try {
+ exe.execute();
+ } catch (Exception e) {
+ Assert.fail("customized flat mapper before");
+ }
+ // Assertion
+ DirectedAcyclicGraph<StreamProducer<Object>, StreamConnector<Object, Object>> dag = exe.dag();
+ Assert.assertEquals("three vertex", 3, dag.vertexSet().size());
+ boolean hasWrapped = false;
+ for (StreamProducer<Object> obj : dag.vertexSet()) {
+ if (obj instanceof FlatMapProducer) {
+ if (((FlatMapProducer) obj).mapper() instanceof JavaStormExecutorForAlertWrapper) {
+ hasWrapped = true;
+ Assert.assertEquals("dummy executor should be wrapped in the alert wrapper func", dummy,
+ ((JavaStormExecutorForAlertWrapper) ((FlatMapProducer) obj).mapper() ).getDelegate());
+
+ }
+ }
+ }
+ Assert.assertTrue(hasWrapped);
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/resources/application.conf b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/resources/application.conf
index c098a7a..7b80481 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/resources/application.conf
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/resources/application.conf
@@ -36,7 +36,24 @@
"alertExecutorConfigs" : {
"eventStreamExecutor" : {
"parallelism" : 1,
- "partitioner" : "org.apache.eagle.alert.policy.DefaultPolicyPartitioner"
+ "partitioner" : "org.apache.eagle.policy.DefaultPolicyPartitioner"
+ "needValidation" : "true"
+ }
+ },
+ "persistExecutorConfigs" {
+ "persistExecutor1" : {
+ "kafka": {
+ "bootstrap_servers" : "localhost",
+ "topics" : {
+ "defaultOutput" : "downSampleOutput"
+ }
+ }
+ }
+ },
+ "aggregateExecutorConfigs" : {
+ "aggregateStreamExecutor" : {
+ "parallelism" : 1,
+ "partitioner" : "org.apache.eagle.policy.DefaultPolicyPartitioner"
"needValidation" : "true"
}
},
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/MLAnomalyCallback.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/MLAnomalyCallback.java b/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/MLAnomalyCallback.java
index 22b3e5c..5005c42 100644
--- a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/MLAnomalyCallback.java
+++ b/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/MLAnomalyCallback.java
@@ -16,7 +16,7 @@
*/
package org.apache.eagle.ml;
-import org.apache.eagle.alert.siddhi.EagleAlertContext;
+import org.apache.eagle.policy.PolicyEvaluationContext;
import org.apache.eagle.ml.model.MLCallbackResult;
public interface MLAnomalyCallback {
@@ -24,5 +24,5 @@ public interface MLAnomalyCallback {
* @param callbackResult call-backed result
* @param alertContext context
*/
- void receive(MLCallbackResult callbackResult,EagleAlertContext alertContext);
+ void receive(MLCallbackResult callbackResult,PolicyEvaluationContext alertContext);
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/MLPolicyEvaluator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/MLPolicyEvaluator.java b/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/MLPolicyEvaluator.java
index c2a8d6b..ff88a7a 100644
--- a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/MLPolicyEvaluator.java
+++ b/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/MLPolicyEvaluator.java
@@ -16,27 +16,31 @@
*/
package org.apache.eagle.ml;
-import org.apache.eagle.alert.common.AlertConstants;
-import org.apache.eagle.alert.config.AbstractPolicyDefinition;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.eagle.policy.common.Constants;
+import org.apache.eagle.policy.config.AbstractPolicyDefinition;
import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
-import org.apache.eagle.alert.policy.PolicyEvaluator;
-import org.apache.eagle.alert.policy.PolicyManager;
+import org.apache.eagle.policy.PolicyEvaluator;
+import org.apache.eagle.policy.PolicyManager;
import org.apache.eagle.dataproc.core.JsonSerDeserUtils;
import org.apache.eagle.dataproc.core.ValuesArray;
import org.apache.eagle.ml.impl.MLAnomalyCallbackImpl;
import org.apache.eagle.ml.model.MLAlgorithm;
import org.apache.eagle.ml.model.MLPolicyDefinition;
import org.apache.eagle.ml.utils.MLReflectionUtils;
-import com.typesafe.config.Config;
-import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.*;
+import com.typesafe.config.Config;
-public class MLPolicyEvaluator implements PolicyEvaluator {
+public class MLPolicyEvaluator implements PolicyEvaluator<AlertDefinitionAPIEntity> {
private static Logger LOG = LoggerFactory.getLogger(MLPolicyEvaluator.class);
- private final String[] sourceStreams;
private volatile MLRuntime mlRuntime;
private String policyName;
private Config config;
@@ -61,10 +65,9 @@ public class MLPolicyEvaluator implements PolicyEvaluator {
public MLPolicyEvaluator(Config config, String policyName, AbstractPolicyDefinition policyDef, String[] sourceStreams, boolean needValidation){
this.config = config;
this.policyName = policyName;
- this.sourceStreams = sourceStreams;
LOG.info("Initializing policy named: "+policyName);
this.context = new HashMap<>();
- this.context.put(AlertConstants.SOURCE_STREAMS, StringUtils.join(sourceStreams,","));
+ this.context.put(Constants.SOURCE_STREAMS, StringUtils.join(sourceStreams,","));
this.init(policyDef);
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/impl/MLAnomalyCallbackImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/impl/MLAnomalyCallbackImpl.java b/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/impl/MLAnomalyCallbackImpl.java
index 4684d8d..c74826b 100644
--- a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/impl/MLAnomalyCallbackImpl.java
+++ b/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/impl/MLAnomalyCallbackImpl.java
@@ -16,23 +16,24 @@
*/
package org.apache.eagle.ml.impl;
-import org.apache.eagle.alert.common.AlertConstants;
+import java.lang.management.ManagementFactory;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.eagle.policy.common.Constants;
import org.apache.eagle.alert.entity.AlertAPIEntity;
-import org.apache.eagle.alert.siddhi.EagleAlertContext;
+import org.apache.eagle.policy.PolicyEvaluationContext;
import org.apache.eagle.common.DateTimeUtil;
import org.apache.eagle.common.config.EagleConfigConstants;
+import org.apache.eagle.common.metric.AlertContext;
import org.apache.eagle.ml.MLAnomalyCallback;
import org.apache.eagle.ml.MLPolicyEvaluator;
import org.apache.eagle.ml.model.MLCallbackResult;
-import org.apache.eagle.common.metric.AlertContext;
-import com.typesafe.config.Config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.lang.management.ManagementFactory;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
+import com.typesafe.config.Config;
public class MLAnomalyCallbackImpl implements MLAnomalyCallback {
private static Logger LOG = LoggerFactory.getLogger(MLAnomalyCallbackImpl.class);
@@ -54,13 +55,13 @@ public class MLAnomalyCallbackImpl implements MLAnomalyCallback {
* @param alertContext context
*/
@Override
- public void receive(MLCallbackResult aResult,EagleAlertContext alertContext) {
+ public void receive(MLCallbackResult aResult,PolicyEvaluationContext alertContext) {
LOG.info("Receive called with : " + aResult.toString());
AlertAPIEntity alert = renderAlert(aResult,alertContext);
- alertContext.alertExecutor.onAlerts(alertContext, Arrays.asList(alert));
+ alertContext.alertExecutor.onEvalEvents(alertContext, Arrays.asList(alert));
}
- private AlertAPIEntity renderAlert(MLCallbackResult aResult,EagleAlertContext alertContext){
+ private AlertAPIEntity renderAlert(MLCallbackResult aResult,PolicyEvaluationContext alertContext){
String site = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.SITE);
String dataSource = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.DATA_SOURCE);
@@ -70,10 +71,10 @@ public class MLAnomalyCallbackImpl implements MLAnomalyCallback {
Map<String, String> tags = new HashMap<>();
tags.put(EagleConfigConstants.SITE, site);
tags.put(EagleConfigConstants.DATA_SOURCE, dataSource);
- tags.put(AlertConstants.SOURCE_STREAMS, alertContext.evaluator.getAdditionalContext().get(AlertConstants.SOURCE_STREAMS));
- tags.put(AlertConstants.POLICY_ID, alertContext.policyId);
- tags.put(AlertConstants.ALERT_SOURCE, source);
- tags.put(AlertConstants.ALERT_EXECUTOR_ID, alertContext.alertExecutor.getAlertExecutorId());
+ tags.put(Constants.SOURCE_STREAMS, (String)alertContext.evaluator.getAdditionalContext().get(Constants.SOURCE_STREAMS));
+ tags.put(Constants.POLICY_ID, alertContext.policyId);
+ tags.put(Constants.ALERT_SOURCE, source);
+ tags.put(Constants.ALERT_EXECUTOR_ID, alertContext.alertExecutor.getExecutorId());
entity.setTags(tags);
entity.setTimestamp(aResult.getTimestamp());
@@ -83,9 +84,9 @@ public class MLAnomalyCallbackImpl implements MLAnomalyCallback {
if(aResult.getContext() != null) context.addAll(aResult.getContext());
String alertMessage = "Anomaly activities detected by algorithm ["+aResult.getAlgorithmName()+"] with information: " + aResult.toString() ;
- context.addProperty(AlertConstants.ALERT_EVENT, aResult.toString());
- context.addProperty(AlertConstants.ALERT_MESSAGE, alertMessage);
- context.addProperty(AlertConstants.ALERT_TIMESTAMP_PROPERTY, DateTimeUtil.millisecondsToHumanDateWithSeconds(System.currentTimeMillis()));
+ context.addProperty(Constants.ALERT_EVENT, aResult.toString());
+ context.addProperty(Constants.ALERT_MESSAGE, alertMessage);
+ context.addProperty(Constants.ALERT_TIMESTAMP_PROPERTY, DateTimeUtil.millisecondsToHumanDateWithSeconds(System.currentTimeMillis()));
try {
site = config.getString("eagleProps.site");
@@ -98,7 +99,7 @@ public class MLAnomalyCallbackImpl implements MLAnomalyCallback {
context.addProperty(EagleConfigConstants.DATA_SOURCE, dataSource);
context.addProperty(EagleConfigConstants.SITE, site);
- context.addProperty(AlertConstants.POLICY_NAME, alertContext.policyId);
+ context.addProperty(Constants.POLICY_NAME, alertContext.policyId);
entity.setAlertContext(context);
return entity;
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/impl/MLPolicyEvaluatorServiceProviderImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/impl/MLPolicyEvaluatorServiceProviderImpl.java b/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/impl/MLPolicyEvaluatorServiceProviderImpl.java
index f61a902..d0ac75f 100644
--- a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/impl/MLPolicyEvaluatorServiceProviderImpl.java
+++ b/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/impl/MLPolicyEvaluatorServiceProviderImpl.java
@@ -16,9 +16,9 @@
*/
package org.apache.eagle.ml.impl;
-import org.apache.eagle.alert.common.AlertConstants;
-import org.apache.eagle.alert.policy.PolicyEvaluator;
-import org.apache.eagle.alert.policy.PolicyEvaluatorServiceProvider;
+import org.apache.eagle.policy.common.Constants;
+import org.apache.eagle.policy.PolicyEvaluator;
+import org.apache.eagle.policy.PolicyEvaluatorServiceProvider;
import org.apache.eagle.ml.MLPolicyEvaluator;
import org.apache.eagle.ml.model.MLPolicyDefinition;
import com.fasterxml.jackson.databind.Module;
@@ -35,7 +35,7 @@ public class MLPolicyEvaluatorServiceProviderImpl implements PolicyEvaluatorServ
@Override
public String getPolicyType() {
- return AlertConstants.policyType.MachineLearning.name();
+ return Constants.policyType.MachineLearning.name();
}
@Override
@@ -45,7 +45,7 @@ public class MLPolicyEvaluatorServiceProviderImpl implements PolicyEvaluatorServ
@Override
public List<Module> getBindingModules() {
- Module module1 = new SimpleModule(AlertConstants.POLICY_DEFINITION).registerSubtypes(new NamedType(MLPolicyDefinition.class, getPolicyType()));
+ Module module1 = new SimpleModule(Constants.POLICY_DEFINITION).registerSubtypes(new NamedType(MLPolicyDefinition.class, getPolicyType()));
Module module2 = new SimpleModule(ALERT_CONTEXT).registerSubtypes(new NamedType(Properties.class, getPolicyType()));
return Arrays.asList(module1, module2);
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/model/MLPolicyDefinition.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/model/MLPolicyDefinition.java b/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/model/MLPolicyDefinition.java
index 71fabbf..70e0f71 100644
--- a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/model/MLPolicyDefinition.java
+++ b/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/model/MLPolicyDefinition.java
@@ -16,7 +16,7 @@
*/
package org.apache.eagle.ml.model;
-import org.apache.eagle.alert.config.AbstractPolicyDefinition;
+import org.apache.eagle.policy.config.AbstractPolicyDefinition;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/resources/META-INF/services/org.apache.eagle.alert.policy.PolicyEvaluatorServiceProvider
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/resources/META-INF/services/org.apache.eagle.alert.policy.PolicyEvaluatorServiceProvider b/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/resources/META-INF/services/org.apache.eagle.alert.policy.PolicyEvaluatorServiceProvider
deleted file mode 100644
index d7a2754..0000000
--- a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/resources/META-INF/services/org.apache.eagle.alert.policy.PolicyEvaluatorServiceProvider
+++ /dev/null
@@ -1,16 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-org.apache.eagle.ml.impl.MLPolicyEvaluatorServiceProviderImpl
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/resources/META-INF/services/org.apache.eagle.policy.PolicyEvaluatorServiceProvider
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/resources/META-INF/services/org.apache.eagle.policy.PolicyEvaluatorServiceProvider b/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/resources/META-INF/services/org.apache.eagle.policy.PolicyEvaluatorServiceProvider
new file mode 100644
index 0000000..d7a2754
--- /dev/null
+++ b/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/resources/META-INF/services/org.apache.eagle.policy.PolicyEvaluatorServiceProvider
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.eagle.ml.impl.MLPolicyEvaluatorServiceProviderImpl
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/test/resources/META-INF/services/org.apache.eagle.alert.policy.PolicyEvaluatorServiceProvider
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/test/resources/META-INF/services/org.apache.eagle.alert.policy.PolicyEvaluatorServiceProvider b/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/test/resources/META-INF/services/org.apache.eagle.alert.policy.PolicyEvaluatorServiceProvider
deleted file mode 100644
index d7a2754..0000000
--- a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/test/resources/META-INF/services/org.apache.eagle.alert.policy.PolicyEvaluatorServiceProvider
+++ /dev/null
@@ -1,16 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-org.apache.eagle.ml.impl.MLPolicyEvaluatorServiceProviderImpl
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/test/resources/META-INF/services/org.apache.eagle.policy.PolicyEvaluatorServiceProvider
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/test/resources/META-INF/services/org.apache.eagle.policy.PolicyEvaluatorServiceProvider b/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/test/resources/META-INF/services/org.apache.eagle.policy.PolicyEvaluatorServiceProvider
new file mode 100644
index 0000000..d7a2754
--- /dev/null
+++ b/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/test/resources/META-INF/services/org.apache.eagle.policy.PolicyEvaluatorServiceProvider
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.eagle.ml.impl.MLPolicyEvaluatorServiceProviderImpl
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/test/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/test/resources/application.conf b/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/test/resources/application.conf
index 4d8f08c..657011c 100644
--- a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/test/resources/application.conf
+++ b/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/test/resources/application.conf
@@ -46,7 +46,7 @@
"alertExecutorConfigs" : {
"userAnomalousActivityDetectionAlertExecutor" : {
"parallelism" : 2,
- "partitioner" : "org.apache.eagle.alert.policy.DefaultPolicyPartitioner"
+ "partitioner" : "org.apache.eagle.policy.DefaultPolicyPartitioner"
}
},
"dynamicConfigSource" : {
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-policy/eagle-policy-base/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/pom.xml b/eagle-core/eagle-policy/eagle-policy-base/pom.xml
new file mode 100644
index 0000000..5f2b9f8
--- /dev/null
+++ b/eagle-core/eagle-policy/eagle-policy-base/pom.xml
@@ -0,0 +1,62 @@
+<?xml version="1.0"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>eagle</groupId>
+ <artifactId>eagle-policy-parent</artifactId>
+ <version>0.3.0</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+
+ <artifactId>eagle-policy-base</artifactId>
+ <packaging>jar</packaging>
+ <name>eagle-policy-base</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>eagle</groupId>
+ <artifactId>eagle-entity-base</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>eagle</groupId>
+ <artifactId>eagle-client-base</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>eagle</groupId>
+ <artifactId>eagle-metric</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>eagle</groupId>
+ <artifactId>eagle-stream-process-base</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.wso2.siddhi</groupId>
+ <artifactId>siddhi-extension-string</artifactId>
+ <version>${siddhi.version}</version>
+ </dependency>
+ </dependencies>
+</project>
+
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AbstractPolicyDefinitionEntity.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AbstractPolicyDefinitionEntity.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AbstractPolicyDefinitionEntity.java
new file mode 100644
index 0000000..3f45be7
--- /dev/null
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AbstractPolicyDefinitionEntity.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.entity;
+
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+
+@SuppressWarnings("serial")
+public abstract class AbstractPolicyDefinitionEntity extends TaggedLogAPIEntity {
+
+ public abstract String getPolicyDef();
+
+ public abstract boolean isEnabled();
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertAPIEntity.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertAPIEntity.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertAPIEntity.java
new file mode 100644
index 0000000..0e5d3c8
--- /dev/null
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertAPIEntity.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.entity;
+
+import org.apache.eagle.policy.common.Constants;
+import org.apache.eagle.common.metric.AlertContext;
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.meta.Column;
+import org.apache.eagle.log.entity.meta.ColumnFamily;
+import org.apache.eagle.log.entity.meta.Prefix;
+import org.apache.eagle.log.entity.meta.Service;
+import org.apache.eagle.log.entity.meta.Table;
+import org.apache.eagle.log.entity.meta.TimeSeries;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@Table("alertdetail")
+@ColumnFamily("f")
+@Prefix("hadoop")
+@Service(Constants.ALERT_SERVICE_ENDPOINT_NAME)
+@TimeSeries(true)
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class AlertAPIEntity extends TaggedLogAPIEntity{
+ @Column("description")
+ private String description;
+ @Column("remediationID")
+ private String remediationID;
+ @Column("remediationCallback")
+ private String remediationCallback;
+ @Column("alertContext")
+ private AlertContext alertContext;
+ @Column("streamId")
+ private String streamId;
+
+ public String getDescription() {
+ return description;
+ }
+
+ public void setDescription(String description) {
+ this.description = description;
+ _pcs.firePropertyChange("description", null, null);
+ }
+
+ public String getRemediationID() {
+ return remediationID;
+ }
+
+ public void setRemediationID(String remediationID) {
+ this.remediationID = remediationID;
+ _pcs.firePropertyChange("remediationID", null, null);
+ }
+
+ public String getRemediationCallback() {
+ return remediationCallback;
+ }
+
+ public void setRemediationCallback(String remediationCallback) {
+ this.remediationCallback = remediationCallback;
+ _pcs.firePropertyChange("remediationCallback", null, null);
+ }
+
+ public AlertContext getAlertContext() {
+ return alertContext;
+ }
+
+ public void setAlertContext(AlertContext alertContext) {
+ this.alertContext = alertContext;
+ _pcs.firePropertyChange("alertContext", null, null);
+ }
+
+ public String getStreamId() {
+ return streamId;
+ }
+
+ public void setStreamId(String streamId) {
+ this.streamId = streamId;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertContextSerDeser.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertContextSerDeser.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertContextSerDeser.java
new file mode 100644
index 0000000..f001725
--- /dev/null
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertContextSerDeser.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.entity;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.eagle.common.metric.AlertContext;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import org.apache.eagle.log.entity.meta.EntitySerDeser;
+
+public class AlertContextSerDeser implements EntitySerDeser<AlertContext> {
+
+ @Override
+ public AlertContext deserialize(byte[] bytes) {
+ AlertContext context = new AlertContext();
+ Map<String, String> properties = new HashMap<String, String>();
+ final int length = bytes.length;
+ if (length < 4) { return context; }
+ int size = Bytes.toInt(bytes, 0, 4);
+
+ int offset = 4;
+ for (int i = 0; i < size; i++) {
+ int keySize = Bytes.toInt(bytes, offset, 4);
+ offset += 4;
+ int valueSize = Bytes.toInt(bytes, offset, 4);
+ offset += 4;
+ String key = Bytes.toString(bytes, offset, keySize);
+ offset += keySize;
+ String value =Bytes.toString(bytes, offset, valueSize);
+ offset += valueSize;
+ properties.put(key, value);
+ }
+ context.addAll(properties);
+ return context;
+ }
+
+ @Override
+ public byte[] serialize(AlertContext context) {
+
+ final Map<String, String> pair = context.getProperties();
+ int totalSize = 4;
+ for (Entry<String, String> entry : pair.entrySet()) {
+ String key = entry.getKey();
+ String value = entry.getValue();
+ int keySize = 0;
+ if(key!=null) keySize = key.getBytes().length;
+ int valueSize = 0;
+ if(value!=null) valueSize = value.getBytes().length;
+ totalSize += keySize + valueSize + 8;
+ }
+ byte[] buffer = new byte[totalSize];
+
+ Bytes.putInt(buffer, 0, pair.size());
+ int offset = 4;
+ for (Entry<String, String> entry : pair.entrySet()) {
+ String key = entry.getKey();
+ String value = entry.getValue();
+
+ int keySize = key !=null ? key.getBytes().length : 0;
+ int valueSize = value != null ? value.getBytes().length:0;
+
+ Bytes.putInt(buffer, offset, keySize);
+ offset += 4;
+ Bytes.putInt(buffer, offset, valueSize);
+ offset += 4;
+
+
+ Bytes.putBytes(buffer, offset, key != null ? key.getBytes() : new byte[0], 0, keySize);
+ offset += keySize;
+ Bytes.putBytes(buffer, offset, value != null ? value.getBytes() : new byte[0], 0, valueSize);
+ offset += valueSize;
+ }
+ return buffer;
+ }
+
+
+ @Override
+ public Class<AlertContext> type(){
+ return AlertContext.class;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertDataSourceEntity.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertDataSourceEntity.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertDataSourceEntity.java
new file mode 100644
index 0000000..173481d
--- /dev/null
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertDataSourceEntity.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.entity;
+
+import org.apache.eagle.policy.common.Constants;
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.meta.*;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@Table("alertDataSource")
+@ColumnFamily("f")
+@Prefix("alertDataSource")
+@Service(Constants.ALERT_DATA_SOURCE_SERVICE_ENDPOINT_NAME)
+@JsonIgnoreProperties(ignoreUnknown = true)
+@TimeSeries(false)
+@Tags({"site", "dataSource"})
+public class AlertDataSourceEntity extends TaggedLogAPIEntity{
+ @Column("a")
+ private boolean enabled;
+ @Column("b")
+ private String config;
+ @Column("c")
+ private String desc;
+
+ public String getConfig() {
+ return config;
+ }
+
+ public void setConfig(String config) {
+ this.config = config;
+ valueChanged("config");
+ }
+
+ public boolean isEnabled() {
+ return enabled;
+ }
+
+ public void setEnabled(boolean enabled) {
+ this.enabled = enabled;
+ valueChanged("enabled");
+ }
+
+ public String getDesc() {
+ return desc;
+ }
+
+ public void setDesc(String desc) {
+ this.desc = desc;
+ valueChanged("desc");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertDefinitionAPIEntity.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertDefinitionAPIEntity.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertDefinitionAPIEntity.java
new file mode 100644
index 0000000..727506a
--- /dev/null
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertDefinitionAPIEntity.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.entity;
+
+import org.apache.commons.lang.builder.HashCodeBuilder;
+import org.apache.eagle.policy.common.Constants;
+import org.apache.eagle.log.entity.meta.Column;
+import org.apache.eagle.log.entity.meta.ColumnFamily;
+import org.apache.eagle.log.entity.meta.Index;
+import org.apache.eagle.log.entity.meta.Indexes;
+import org.apache.eagle.log.entity.meta.Prefix;
+import org.apache.eagle.log.entity.meta.Service;
+import org.apache.eagle.log.entity.meta.Table;
+import org.apache.eagle.log.entity.meta.Tags;
+import org.apache.eagle.log.entity.meta.TimeSeries;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+/**
+ * site: site name
+ * dataSource: data source name
+ *
+ * alertExecutorId: Group Policy by alertExecutorId, the policy definition with the sample ["site", "dataSource", "alertExecutorId"] should run on the sample alert executor
+ *
+ * policyId: policy name, should be unique
+ * policyType: policy engine implementation type
+ */
+@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@Table("alertdef")
+@ColumnFamily("f")
+@Prefix("alertdef")
+@Service(Constants.ALERT_DEFINITION_SERVICE_ENDPOINT_NAME)
+@JsonIgnoreProperties(ignoreUnknown = true)
+@TimeSeries(false)
+@Tags({"site", "dataSource", "alertExecutorId", "policyId", "policyType"})
+@Indexes({
+ @Index(name="Index_1_alertExecutorId", columns = { "alertExecutorID" }, unique = true),
+})
+public class AlertDefinitionAPIEntity extends AbstractPolicyDefinitionEntity {
+ @Column("a")
+ private String desc;
+ @Column("b")
+ private String policyDef;
+ @Column("c")
+ private String dedupeDef;
+ @Column("d")
+ private String notificationDef;
+ @Column("e")
+ private String remediationDef;
+ @Column("f")
+ private boolean enabled;
+ @Column("g")
+ private String owner;
+ @Column("h")
+ private long lastModifiedDate;
+ @Column("i")
+ private long severity;
+ @Column("j")
+ private long createdTime;
+
+ public String getDesc() {
+ return desc;
+ }
+ public void setDesc(String desc) {
+ this.desc = desc;
+ valueChanged("desc");
+ }
+ public String getPolicyDef() {
+ return policyDef;
+ }
+ public void setPolicyDef(String policyDef) {
+ this.policyDef = policyDef;
+ valueChanged("policyDef");
+ }
+ public String getDedupeDef() {
+ return dedupeDef;
+ }
+ public void setDedupeDef(String dedupeDef) {
+ this.dedupeDef = dedupeDef;
+ valueChanged("dedupeDef");
+ }
+ public String getNotificationDef() {
+ return notificationDef;
+ }
+ public void setNotificationDef(String notificationDef) {
+ this.notificationDef = notificationDef;
+ valueChanged("notificationDef");
+ }
+ public String getRemediationDef() {
+ return remediationDef;
+ }
+ public void setRemediationDef(String remediationDef) {
+ this.remediationDef = remediationDef;
+ valueChanged("remediationDef");
+ }
+ public boolean isEnabled() {
+ return enabled;
+ }
+ public void setEnabled(boolean enabled) {
+ this.enabled = enabled;
+ valueChanged("enabled");
+ }
+ public String getOwner() {
+ return owner;
+ }
+ public void setOwner(String owner) {
+ this.owner = owner;
+ valueChanged("owner");
+ }
+ public long getLastModifiedDate() {
+ return lastModifiedDate;
+ }
+ public void setLastModifiedDate(long lastModifiedDate) {
+ this.lastModifiedDate = lastModifiedDate;
+ valueChanged("lastModifiedDate");
+ }
+ public long getSeverity() {
+ return severity;
+ }
+ public void setSeverity(long severity) {
+ this.severity = severity;
+ valueChanged("severity");
+ }
+ public long getCreatedTime() {
+ return createdTime;
+ }
+ public void setCreatedTime(long createdTime) {
+ this.createdTime = createdTime;
+ valueChanged("createdTime");
+ }
+ public boolean equals(Object o){
+ if(o == this)
+ return true;
+ if(!(o instanceof AlertDefinitionAPIEntity))
+ return false;
+ AlertDefinitionAPIEntity that = (AlertDefinitionAPIEntity)o;
+ if(that.enabled == this.enabled &&
+ compare(that.policyDef, this.policyDef) &&
+ compare(that.dedupeDef, this.dedupeDef) &&
+ compare(that.notificationDef, this.notificationDef) &&
+ compare(that.remediationDef, this.remediationDef))
+ return true;
+ return false;
+ }
+
+ private boolean compare(String a, String b){
+ if(a == b)
+ return true;
+ if(a == null || b == null)
+ return false;
+ if(a.equals(b))
+ return true;
+ return false;
+ }
+
+ public int hashCode(){
+ HashCodeBuilder builder = new HashCodeBuilder();
+ builder.append(enabled);
+ builder.append(policyDef);
+ builder.append(dedupeDef);
+ builder.append(notificationDef);
+ builder.append(remediationDef);
+ return builder.toHashCode();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertEntityRepository.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertEntityRepository.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertEntityRepository.java
new file mode 100644
index 0000000..ad518e9
--- /dev/null
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertEntityRepository.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.entity;
+
+import org.apache.eagle.common.metric.AlertContext;
+import org.apache.eagle.log.entity.repo.EntityRepository;
+
+public class AlertEntityRepository extends EntityRepository {
+ public AlertEntityRepository() {
+ serDeserMap.put(AlertContext.class, new AlertContextSerDeser());
+ entitySet.add(AlertAPIEntity.class);
+ entitySet.add(AlertDefinitionAPIEntity.class);
+ entitySet.add(AlertStreamSchemaEntity.class);
+ entitySet.add(AlertStreamEntity.class);
+ entitySet.add(AlertDataSourceEntity.class);
+ entitySet.add(AlertExecutorEntity.class);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertExecutorEntity.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertExecutorEntity.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertExecutorEntity.java
new file mode 100644
index 0000000..97f18f7
--- /dev/null
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertExecutorEntity.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.entity;
+
+import org.apache.eagle.policy.common.Constants;
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.meta.*;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@Table("alertExecutor")
+@ColumnFamily("f")
+@Prefix("alertExecutor")
+@Service(Constants.ALERT_EXECUTOR_SERVICE_ENDPOINT_NAME)
+@JsonIgnoreProperties(ignoreUnknown = true)
+@TimeSeries(false)
+@Tags({"dataSource", "alertExecutorId", "streamName"})
+public class AlertExecutorEntity extends TaggedLogAPIEntity{
+ @Column("a")
+ private String desc;
+
+ public String getDesc() {
+ return desc;
+ }
+
+ public void setDesc(String desc) {
+ this.desc = desc;
+ valueChanged("desc");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertStreamEntity.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertStreamEntity.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertStreamEntity.java
new file mode 100644
index 0000000..adff3d5
--- /dev/null
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertStreamEntity.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.entity;
+
+import org.apache.eagle.policy.common.Constants;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.meta.Column;
+import org.apache.eagle.log.entity.meta.ColumnFamily;
+import org.apache.eagle.log.entity.meta.Prefix;
+import org.apache.eagle.log.entity.meta.Service;
+import org.apache.eagle.log.entity.meta.Table;
+import org.apache.eagle.log.entity.meta.Tags;
+import org.apache.eagle.log.entity.meta.TimeSeries;
+
+@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@Table("alertStream")
+@ColumnFamily("f")
+@Prefix("alertStream")
+@Service(Constants.ALERT_STREAM_SERVICE_ENDPOINT_NAME)
+@JsonIgnoreProperties(ignoreUnknown = true)
+@TimeSeries(false)
+@Tags({"dataSource", "streamName"})
+public class AlertStreamEntity extends TaggedLogAPIEntity{
+ @Column("a")
+ private String desc;
+
+ public String getDesc() {
+ return desc;
+ }
+ public void setDesc(String desc) {
+ this.desc = desc;
+ valueChanged("desc");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertStreamSchemaEntity.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertStreamSchemaEntity.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertStreamSchemaEntity.java
new file mode 100644
index 0000000..2ba2166
--- /dev/null
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertStreamSchemaEntity.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.entity;
+
+import org.apache.eagle.policy.common.Constants;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.meta.Column;
+import org.apache.eagle.log.entity.meta.ColumnFamily;
+import org.apache.eagle.log.entity.meta.Prefix;
+import org.apache.eagle.log.entity.meta.Service;
+import org.apache.eagle.log.entity.meta.Table;
+import org.apache.eagle.log.entity.meta.Tags;
+import org.apache.eagle.log.entity.meta.TimeSeries;
+
+/**
+ * ddl to create streammetadata table
+ *
+ * create 'alertStreamSchema', {NAME => 'f', BLOOMFILTER => 'ROW', VERSIONS => '1', COMPRESSION => 'SNAPPY'}
+ */
+@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@Table("alertStreamSchema")
+@ColumnFamily("f")
+@Prefix("alertStreamSchema")
+@Service(Constants.ALERT_STREAM_SCHEMA_SERVICE_ENDPOINT_NAME)
+@JsonIgnoreProperties(ignoreUnknown = true)
+@TimeSeries(false)
+@Tags({"dataSource", "streamName", "attrName"})
+public class AlertStreamSchemaEntity extends TaggedLogAPIEntity{
+ @Column("a")
+ private String attrType;
+ @Column("b")
+ private String category;
+ @Column("c")
+ private String attrValueResolver;
+ /* all tags form the key for alert de-duplication */
+ @Column("d")
+ private Boolean usedAsTag;
+ @Column("e")
+ private String attrDescription;
+ @Column("f")
+ private String attrDisplayName;
+ @Column("g")
+ private String defaultValue;
+
+ public String getAttrType() {
+ return attrType;
+ }
+ public void setAttrType(String attrType) {
+ this.attrType = attrType;
+ valueChanged("attrType");
+ }
+ public String getCategory() {
+ return category;
+ }
+ public void setCategory(String category) {
+ this.category = category;
+ valueChanged("category");
+ }
+ public String getAttrValueResolver() {
+ return attrValueResolver;
+ }
+ public void setAttrValueResolver(String attrValueResolver) {
+ this.attrValueResolver = attrValueResolver;
+ valueChanged("attrValueResolver");
+ }
+ public Boolean getUsedAsTag() {
+ return usedAsTag;
+ }
+ public void setUsedAsTag(Boolean usedAsTag) {
+ this.usedAsTag = usedAsTag;
+ valueChanged("usedAsTag");
+ }
+ public String getAttrDescription() {
+ return attrDescription;
+ }
+ public void setAttrDescription(String attrDescription) {
+ this.attrDescription = attrDescription;
+ valueChanged("attrDescription");
+ }
+ public String getAttrDisplayName() {
+ return attrDisplayName;
+ }
+ public void setAttrDisplayName(String attrDisplayName) {
+ this.attrDisplayName = attrDisplayName;
+ valueChanged("attrDisplayName");
+ }
+ public String getDefaultValue() {
+ return defaultValue;
+ }
+ public void setDefaultValue(String defaultValue) {
+ this.defaultValue = defaultValue;
+ valueChanged("defaultValue");
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/DefaultPolicyPartitioner.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/DefaultPolicyPartitioner.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/DefaultPolicyPartitioner.java
new file mode 100644
index 0000000..1143b11
--- /dev/null
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/DefaultPolicyPartitioner.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.policy;
+
+
+public class DefaultPolicyPartitioner implements PolicyPartitioner{
+ @Override
+ public int partition(int numTotalPartitions, String policyType,
+ String policyId) {
+ final int prime = 31;
+ int result = 1;
+ result = result * prime + policyType.hashCode();
+ result = result < 0 ? result*-1 : result;
+ result = result * prime + policyId.hashCode();
+ result = result < 0 ? result*-1 : result;
+ return result % numTotalPartitions;
+ }
+}