You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2016/01/12 08:47:57 UTC

[4/8] incubator-eagle git commit: EAGLE-79 Provide aggregation and persistence DSL support

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/ExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/ExecutionEnvironment.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/ExecutionEnvironment.scala
index 7394b75..b189c57 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/ExecutionEnvironment.scala
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/ExecutionEnvironment.scala
@@ -63,6 +63,8 @@ abstract class ExecutionEnvironmentBase(private val conf:Config)  extends Execut
     implicit val i_conf = _config.get
     StreamNameExpansion()
     GraphPrinter.print(dag,message="Before expanded DAG ")
+    StreamAggregateExpansion()
+    GraphPrinter.print(dag,message="after analyze expanded DAG ")
     StreamAlertExpansion()
     StreamUnionExpansion()
     StreamGroupbyExpansion()

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamAggregateExpansion.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamAggregateExpansion.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamAggregateExpansion.scala
new file mode 100644
index 0000000..a95001b
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamAggregateExpansion.scala
@@ -0,0 +1,82 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.datastream.core
+
+import com.typesafe.config.Config
+import org.apache.eagle.dataproc.impl.aggregate.AggregateExecutorFactory
+import org.apache.eagle.datastream.FlatMapper
+import org.jgrapht.experimental.dag.DirectedAcyclicGraph
+
+import scala.collection.JavaConversions.asScalaSet
+import scala.collection.mutable.ListBuffer
+
+/**
+ * The expansion job for stream analyze
+ * 
+ * TODO : should re-use flow with stream alert expansion, make code cleaner
+ */
+class StreamAggregateExpansion(config: Config) extends StreamAlertExpansion(config) {
+
+  override def onIteration(toBeAddedEdges: ListBuffer[StreamConnector[Any, Any]], toBeRemovedVertex: ListBuffer[StreamProducer[Any]],
+    dag: DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any, Any]], current: StreamProducer[Any],
+    child: StreamProducer[Any]): Unit = {
+    child match {
+      case AggregateProducer(upStreamNames, analyzerId, cepQl, strategy) => {
+        /**
+         * Rewrite the tree to add output field wrapper since policy executors accept only fixed tuple format 
+         */
+        val newStreamProducers = rewriteWithStreamOutputWrapper(current, dag, toBeAddedEdges, toBeRemovedVertex, upStreamNames)
+
+
+        val analyzeExecutors = if (cepQl != null) {
+          AggregateExecutorFactory.Instance.createExecutors(cepQl)
+        } else {
+          AggregateExecutorFactory.Instance.createExecutors(config, upStreamNames, analyzerId)
+        }
+
+        analyzeExecutors.foreach(exec => {
+          val t = FlatMapProducer(exec.asInstanceOf[FlatMapper[Any]]).nameAs(exec.getExecutorId() + "_" + exec.getPartitionSeq()).initWith(dag,config, hook = false)
+
+          // connect with previous
+          if (strategy == null) {
+            newStreamProducers.foreach(s => toBeAddedEdges += StreamConnector(s, t))
+          } else {
+            newStreamProducers.foreach(s => toBeAddedEdges += StreamConnector(s, t, strategy))
+          }
+
+          // connect with next
+          val outgoingEdges = dag.outgoingEdgesOf(child)
+          outgoingEdges.foreach(e => toBeAddedEdges += StreamConnector(t, e.to, e))
+        })
+        
+        // remote current child
+        toBeRemovedVertex += child
+      }
+      case _ => 
+    }
+  }
+  
+}
+
+object StreamAggregateExpansion{
+  def apply()(implicit config:Config, dag: DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]]): StreamAggregateExpansion ={
+    val e = new StreamAggregateExpansion(config)
+    e.expand(dag)
+    e
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamAlertExpansion.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamAlertExpansion.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamAlertExpansion.scala
index 84532dc..c731ac9 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamAlertExpansion.scala
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamAlertExpansion.scala
@@ -1,4 +1,5 @@
 /*
+
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -19,18 +20,23 @@ package org.apache.eagle.datastream.core
 
 import java.util
 
-import com.typesafe.config.Config
-import org.apache.eagle.alert.dao.AlertDefinitionDAOImpl
-import org.apache.eagle.datastream._
+import org.apache.eagle.alert.executor.AlertExecutorCreationUtils
+import org.apache.eagle.policy.dao.AlertDefinitionDAOImpl
+
+import scala.collection.JavaConversions.asScalaSet
+import scala.collection.mutable.ListBuffer
+import org.apache.eagle.datastream.EagleTuple
+import org.apache.eagle.datastream.JavaStormExecutorForAlertWrapper
+import org.apache.eagle.datastream.JavaStormStreamExecutor
+import org.apache.eagle.datastream.StormStreamExecutor
+import org.apache.eagle.datastream.Tuple2
 import org.apache.eagle.datastream.storm.StormExecutorForAlertWrapper
 import org.apache.eagle.datastream.utils.AlertExecutorConsumerUtils
-import org.apache.eagle.executor.AlertExecutorCreationUtils
 import org.apache.eagle.service.client.EagleServiceConnector
 import org.jgrapht.experimental.dag.DirectedAcyclicGraph
 import org.slf4j.LoggerFactory
 
-import scala.collection.JavaConversions._
-import scala.collection.mutable.ListBuffer
+import com.typesafe.config.Config
 
 /**
  * The constraints for alert is:
@@ -69,7 +75,7 @@ case class StreamAlertExpansion(config: Config) extends StreamDAGExpansion(confi
     toBeRemovedVertex.foreach(v => dag.removeVertex(v))
   }
 
-  def onIteration(toBeAddedEdges: ListBuffer[StreamConnector[Any,Any]], toBeRemovedVertex: ListBuffer[StreamProducer[Any]],
+  def onIteration(toBeAddedEdges: ListBuffer[StreamConnector[Any,Any]], toBeRemovedVertex: ListBuffer[StreamProducer[Any]], 
                dag: DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]], current: StreamProducer[Any], child: StreamProducer[Any]): Unit = {
     child match {
       case AlertStreamSink(upStreamNames, alertExecutorId, withConsumer,strategy) => {
@@ -77,28 +83,7 @@ case class StreamAlertExpansion(config: Config) extends StreamDAGExpansion(confi
          * step 1: wrapper previous StreamProducer with one more field "streamName"
          * for AlertStreamSink, we check previous StreamProducer and replace that
          */
-        val newStreamProducers = new ListBuffer[StreamProducer[Any]]
-        current match {
-          case StreamUnionProducer(others) => {
-            val incomingEdges = dag.incomingEdgesOf(current)
-            incomingEdges.foreach(e => newStreamProducers += replace(toBeAddedEdges, toBeRemovedVertex, dag, e.from, upStreamNames.get(0)))
-            var i: Int = 1
-            others.foreach(o => {
-              newStreamProducers += replace(toBeAddedEdges, toBeRemovedVertex, dag, o, upStreamNames.get(i))
-              i += 1
-            })
-          }
-          case _: FlatMapProducer[AnyRef, AnyRef] => {
-            newStreamProducers += replace(toBeAddedEdges, toBeRemovedVertex, dag, current, upStreamNames.get(0))
-          }
-          case _: MapperProducer[AnyRef,AnyRef] => {
-            newStreamProducers += replace(toBeAddedEdges, toBeRemovedVertex, dag, current, upStreamNames.get(0))
-          }
-          case s: StreamProducer[AnyRef] if dag.inDegreeOf(s) == 0 => {
-            newStreamProducers += replace(toBeAddedEdges, toBeRemovedVertex, dag, current, upStreamNames.get(0))
-          }
-          case p@_ => throw new IllegalStateException(s"$p can not be put before AlertStreamSink, only StreamUnionProducer,FlatMapProducer and MapProducer are supported")
-        }
+        val newStreamProducers = rewriteWithStreamOutputWrapper(current, dag, toBeAddedEdges, toBeRemovedVertex, upStreamNames)
 
         /**
          * step 2: partition alert executor by policy partitioner class
@@ -106,7 +91,7 @@ case class StreamAlertExpansion(config: Config) extends StreamDAGExpansion(confi
         val alertExecutors = AlertExecutorCreationUtils.createAlertExecutors(config, new AlertDefinitionDAOImpl(new EagleServiceConnector(config)), upStreamNames, alertExecutorId)
         var alertProducers = new scala.collection.mutable.MutableList[StreamProducer[Any]]
         alertExecutors.foreach(exec => {
-          val t = FlatMapProducer(exec).nameAs(exec.getAlertExecutorId + "_" + exec.getPartitionSeq).initWith(dag,config,hook = false)
+          val t = FlatMapProducer(exec).nameAs(exec.getExecutorId + "_" + exec.getPartitionSeq).initWith(dag,config, hook = false)
           alertProducers += t
           newStreamProducers.foreach(newsp => toBeAddedEdges += StreamConnector[Any,Any](newsp, t,Seq(0)))
           if (strategy == null) {
@@ -129,7 +114,36 @@ case class StreamAlertExpansion(config: Config) extends StreamDAGExpansion(confi
     }
   }
 
-  private def replace(toBeAddedEdges: ListBuffer[StreamConnector[Any,Any]], toBeRemovedVertex: ListBuffer[StreamProducer[Any]],
+  protected def rewriteWithStreamOutputWrapper(current: org.apache.eagle.datastream.core.StreamProducer[Any], dag: org.jgrapht.experimental.dag.DirectedAcyclicGraph[org.apache.eagle.datastream.core.StreamProducer[Any],org.apache.eagle.datastream.core.StreamConnector[Any,Any]], toBeAddedEdges: scala.collection.mutable.ListBuffer[org.apache.eagle.datastream.core.StreamConnector[Any,Any]], toBeRemovedVertex: scala.collection.mutable.ListBuffer[org.apache.eagle.datastream.core.StreamProducer[Any]], upStreamNames: java.util.List[String]) = {/**
+     * step 1: wrapper previous StreamProducer with one more field "streamName"
+     * for AlertStreamSink, we check previous StreamProducer and replace that
+     */
+    val newStreamProducers = new ListBuffer[StreamProducer[Any]]
+    current match {
+      case StreamUnionProducer(others) => {
+        val incomingEdges = dag.incomingEdgesOf(current)
+        incomingEdges.foreach(e => newStreamProducers += replace(toBeAddedEdges, toBeRemovedVertex, dag, e.from, upStreamNames.get(0)))
+        var i: Int = 1
+        others.foreach(o => {
+          newStreamProducers += replace(toBeAddedEdges, toBeRemovedVertex, dag, o, upStreamNames.get(i))
+          i += 1
+        })
+      }
+      case _: FlatMapProducer[AnyRef, AnyRef] => {
+        newStreamProducers += replace(toBeAddedEdges, toBeRemovedVertex, dag, current, upStreamNames.get(0))
+      }
+      case _: MapperProducer[AnyRef,AnyRef] => {
+        newStreamProducers += replace(toBeAddedEdges, toBeRemovedVertex, dag, current, upStreamNames.get(0))
+      }
+      case s: StreamProducer[AnyRef] if dag.inDegreeOf(s) == 0 => {
+        newStreamProducers += replace(toBeAddedEdges, toBeRemovedVertex, dag, current, upStreamNames.get(0))
+      }
+      case p@_ => throw new IllegalStateException(s"$p can not be put before AlertStreamSink, only StreamUnionProducer,FlatMapProducer and MapProducer are supported")
+    }
+    newStreamProducers
+  }
+
+  protected def replace(toBeAddedEdges: ListBuffer[StreamConnector[Any,Any]], toBeRemovedVertex: ListBuffer[StreamProducer[Any]],
                       dag: DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]], current: StreamProducer[Any], upStreamName: String) : StreamProducer[Any]= {
     var newsp: StreamProducer[Any] = null
     current match {
@@ -201,3 +215,4 @@ object StreamAlertExpansion{
   }
 }
 
+

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamProducer.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamProducer.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamProducer.scala
index 4d7dcd1..4d81424 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamProducer.scala
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamProducer.scala
@@ -24,13 +24,14 @@ import java.util.concurrent.atomic.AtomicInteger
 import backtype.storm.topology.base.BaseRichSpout
 import com.typesafe.config.Config
 import org.apache.eagle.alert.entity.AlertAPIEntity
-import org.apache.eagle.datastream._
+import org.apache.eagle.datastream.FlatMapper
 import org.apache.eagle.partition.PartitionStrategy
+import org.apache.eagle.policy.common.Constants
 import org.jgrapht.experimental.dag.DirectedAcyclicGraph
 import org.slf4j.LoggerFactory
 
-import scala.collection.JavaConversions._
-import scala.collection.JavaConverters._
+import scala.collection.JavaConversions.{asScalaBuffer, seqAsJavaList}
+import scala.collection.JavaConverters.asScalaBufferConverter
 /**
  * StreamProducer = StreamInfo + StreamProtocol
  *
@@ -193,6 +194,24 @@ abstract class StreamProducer[+T <: Any] extends StreamInfo with StreamProtocol[
     alert(util.Arrays.asList(upStreamName), alertExecutorId, consume = false)
   }
 
+  def aggregate(upStreamNames: java.util.List[String], queryExecutorId : String, strategy: PartitionStrategy = null): StreamProducer[T] = {
+    val ret= AggregateProducer(upStreamNames, queryExecutorId, null, strategy)
+    hookup(this, ret)
+    ret
+  }
+
+  def aggregate(cql : String, strategy: PartitionStrategy): StreamProducer[T] = {
+    val ret= AggregateProducer(util.Arrays.asList(Constants.EAGLE_DEFAULT_POLICY_NAME), null, cql, strategy)
+    hookup(this, ret)
+    ret
+  }
+  
+  def persist(executorId : String, storageType: StorageType.StorageType) : StreamProducer[T] = {
+    val ret = PersistProducer(executorId, storageType)
+    hookup(this, ret)
+    ret
+  }
+
   protected def hookup[T1,T2](current: StreamProducer[T1], next: StreamProducer[T2]) = {
     current.graph.addVertex(next)
     current.graph.addEdge(current, next, StreamConnector(current, next))
@@ -216,7 +235,7 @@ abstract class StreamProducer[+T <: Any] extends StreamInfo with StreamProtocol[
 
   /**
    * Component name
-   * 
+   *
    * @param componentName component name
    * @return
    */
@@ -278,6 +297,10 @@ case class AlertStreamSink(upStreamNames: util.List[String], alertExecutorId : S
   }
 }
 
+case class AggregateProducer[T](upStreamNames: util.List[String], analyzerId : String, cepQl: String = null, strategy:PartitionStrategy = null) extends StreamProducer[T]
+
+case class PersistProducer[T](executorId :String, storageType: StorageType.StorageType) extends StreamProducer[T]
+
 object UniqueId{
   val id : AtomicInteger = new AtomicInteger(0);
   def incrementAndGetId() : Int = {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamProtocol.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamProtocol.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamProtocol.scala
index 346f728..3ed067d 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamProtocol.scala
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamProtocol.scala
@@ -83,6 +83,12 @@ class StreamInfo  extends Serializable{
   override def hashCode(): Int = new HashCodeBuilder().append(this.id).append(this.getClass).toHashCode
 }
 
+
+object StorageType extends Enumeration {
+  type StorageType = Value
+  val KAFKA, DRUID, HBASE = Value
+}
+
 /**
  * Stream interaction protocol interface
  *
@@ -144,8 +150,16 @@ trait StreamProtocol[+T <: Any]{
    * @return
    */
   def groupByKey(keyer:T => Any):StreamProducer[T]
+
   def streamUnion[T2,T3](otherStreams : Seq[StreamProducer[T2]]) : StreamProducer[T3]
   def alert(upStreamNames: Seq[String], alertExecutorId : String, consume: Boolean,strategy : PartitionStrategy)
+
+  def aggregate(upStreamNames: java.util.List[String], executorId :String, strategy:PartitionStrategy): StreamProducer[T]
+
+  def aggregate(cql : String, strategy:PartitionStrategy): StreamProducer[T]
+
+  def persist(executorId : String, storageType: StorageType.StorageType): StreamProducer[T]
+  
   /**
    * Set processing element parallelism setting
    * @param parallelismNum parallelism value
@@ -173,4 +187,4 @@ trait StreamProtocol[+T <: Any]{
   def ~>[R](flatMapper : FlatMapper[R]) = this.flatMap[R](flatMapper)
   def ! (upStreamNames: Seq[String], alertExecutorId : String, consume: Boolean = true,strategy: PartitionStrategy = null) = alert(upStreamNames, alertExecutorId, consume,strategy)
 
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormBoltFactory.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormBoltFactory.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormBoltFactory.scala
index b048b90..29b5cf4 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormBoltFactory.scala
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormBoltFactory.scala
@@ -20,6 +20,7 @@ package org.apache.eagle.datastream.storm
 
 import backtype.storm.topology.base.BaseRichBolt
 import com.typesafe.config.Config
+import org.apache.eagle.dataproc.impl.persist.PersistExecutor
 import org.apache.eagle.datastream._
 import org.apache.eagle.datastream.core._
 
@@ -47,6 +48,11 @@ object StormBoltFactory {
       case foreach:ForeachProducer[Any] => {
         ForeachBoltWrapper(foreach.fn)
       }
+      case persist : PersistProducer[Any] => {
+        val persisExecutor = new PersistExecutor(persist.executorId, persist.storageType.toString());
+        persisExecutor.prepareConfig(config);
+        JavaStormBoltWrapper(persisExecutor.asInstanceOf[JavaStormStreamExecutor[EagleTuple]])
+      }
       case _ => throw new UnsupportedOperationException(s"Unsupported producer: ${producer.toString}")
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormExecutorForAlertWrapper.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormExecutorForAlertWrapper.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormExecutorForAlertWrapper.scala
index 1b0d133..bb34972 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormExecutorForAlertWrapper.scala
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormExecutorForAlertWrapper.scala
@@ -18,8 +18,13 @@ package org.apache.eagle.datastream.storm
 
 import java.util
 
+import org.apache.eagle.datastream.Collector
+import org.apache.eagle.datastream.StormStreamExecutor
+import org.apache.eagle.datastream.StormStreamExecutor3
+import org.apache.eagle.datastream.Tuple2
+import org.apache.eagle.datastream.Tuple3
+
 import com.typesafe.config.Config
-import org.apache.eagle.datastream._
 
 case class StormExecutorForAlertWrapper(delegate: StormStreamExecutor[Tuple2[String, util.SortedMap[AnyRef, AnyRef]]], streamName: String)
   extends StormStreamExecutor3[String, String, util.SortedMap[Object, Object]]{

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/utils/AlertExecutorConsumerUtils.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/utils/AlertExecutorConsumerUtils.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/utils/AlertExecutorConsumerUtils.scala
index 67818e1..76250e2 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/utils/AlertExecutorConsumerUtils.scala
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/utils/AlertExecutorConsumerUtils.scala
@@ -22,10 +22,10 @@ package org.apache.eagle.datastream.utils
 import java.util
 
 import org.apache.eagle.alert.dedup.{AlertEmailDeduplicationExecutor, AlertEntityDeduplicationExecutor}
+import org.apache.eagle.alert.executor.AlertExecutor
 import org.apache.eagle.alert.notification.AlertNotificationExecutor
 import org.apache.eagle.alert.persist.AlertPersistExecutor
 import org.apache.eagle.datastream.core.{StreamConnector, FlatMapProducer, StreamProducer}
-import org.apache.eagle.executor.AlertExecutor
 import org.slf4j.{Logger, LoggerFactory}
 
 import scala.collection.mutable.ListBuffer
@@ -54,8 +54,8 @@ object AlertExecutorConsumerUtils {
   def setupAlertConsumers(toBeAddedEdges: ListBuffer[StreamConnector[Any,Any]], alertStreamProducers: List[StreamProducer[Any]]): Unit = {
     val alertExecutorIdList: java.util.List[String] = new util.ArrayList[String]()
     alertStreamProducers.map(x =>
-      alertExecutorIdList.add(x.asInstanceOf[FlatMapProducer[AnyRef, AnyRef]].mapper.asInstanceOf[AlertExecutor].getAlertExecutorId));
-    val alertDefDao = alertStreamProducers.head.asInstanceOf[FlatMapProducer[AnyRef, AnyRef]].mapper.asInstanceOf[AlertExecutor].getAlertDefinitionDao
+      alertExecutorIdList.add(x.asInstanceOf[FlatMapProducer[AnyRef, AnyRef]].mapper.asInstanceOf[AlertExecutor].getExecutorId));
+    val alertDefDao = alertStreamProducers.head.asInstanceOf[FlatMapProducer[AnyRef, AnyRef]].mapper.asInstanceOf[AlertExecutor].getPolicyDefinitionDao
     val entityDedupExecutor: AlertEntityDeduplicationExecutor = new AlertEntityDeduplicationExecutor(alertExecutorIdList, alertDefDao)
     val emailDedupExecutor: AlertEmailDeduplicationExecutor = new AlertEmailDeduplicationExecutor(alertExecutorIdList, alertDefDao)
     val notificationExecutor: AlertNotificationExecutor = new AlertNotificationExecutor(alertExecutorIdList, alertDefDao)

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/TestStreamAggregate.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/TestStreamAggregate.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/TestStreamAggregate.java
new file mode 100644
index 0000000..ed5d705
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/java/org/apache/eagle/datastream/TestStreamAggregate.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.datastream;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichSpout;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import junit.framework.Assert;
+import org.apache.eagle.datastream.core.*;
+import org.apache.eagle.datastream.storm.StormExecutionEnvironment;
+import org.apache.eagle.partition.PartitionStrategy;
+import org.apache.eagle.alert.entity.AlertAPIEntity;
+import org.jgrapht.experimental.dag.DirectedAcyclicGraph;
+import org.junit.Before;
+import org.junit.Test;
+import scala.collection.Seq;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+
+/**
+ * @since Dec 18, 2015
+ *
+ */
+public class TestStreamAggregate {
+
+	private Config config;
+
+	@SuppressWarnings("serial")
+	private final class SimpleSpout extends BaseRichSpout {
+		@SuppressWarnings("rawtypes")
+		@Override
+		public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+		}
+		@Override
+		public void nextTuple() {
+		}
+		@Override
+		public void declareOutputFields(OutputFieldsDeclarer declarer) {
+		}
+	}
+
+	public static class TestEnvironment extends StormExecutionEnvironment {
+		private static final long serialVersionUID = 1L;
+		public TestEnvironment(Config conf) {
+			super(conf);
+		}
+		@Override
+		public void execute(StreamDAG dag) {
+			System.out.println("DAT completed!");
+		}
+	}
+	
+	public static class DummyStrategy implements PartitionStrategy {
+		private static final long serialVersionUID = 1L;
+		@Override
+		public int balance(String key, int buckNum) {
+			return 0;
+		}
+	};
+	
+	public static class DummyExecutor extends JavaStormStreamExecutor2<String, AlertAPIEntity>  {
+		@Override
+		public void prepareConfig(Config config) {
+		}
+		@Override
+		public void init() {
+		}
+		@Override
+		public void flatMap(List input, Collector collector) {
+		}
+	}
+	
+	@Before
+	public void setUp() {
+		System.setProperty("config.resource", "/application.conf");
+		ConfigFactory.invalidateCaches();
+		config = ConfigFactory.load();
+	}
+
+	@SuppressWarnings({ "unchecked", "rawtypes", "serial" })
+	@Test
+	public void testAggregate1() {
+		StormExecutionEnvironment exe = new TestEnvironment(config);
+		
+		BaseRichSpout spout = new SimpleSpout();
+		StormSourceProducer ssp = exe.fromSpout(spout);
+		
+		ssp.flatMap(new FlatMapper<String>() {
+			@Override
+			public void flatMap(Seq<Object> input, Collector<String> collector) {
+				// do nothing
+			}
+		}).aggregate(Arrays.asList("c3EsLogEventStream"), "qid", new DummyStrategy());
+		
+		try {
+			exe.execute();
+			Assert.fail("customzied flat mapper(non java storm executor) before analyze is not supported!");
+		} catch (Exception e ){
+		}
+	}
+	
+	@SuppressWarnings({ "unchecked", "rawtypes", "serial" })
+	@Test
+	public void testAggregate() {
+		StormExecutionEnvironment exe = new TestEnvironment(config);
+		StormSourceProducer ssp = exe.fromSpout(new SimpleSpout());
+		DummyExecutor dummy = new DummyExecutor();
+		ssp.flatMap(dummy).aggregate(Arrays.asList("c3EsLogEventStream"), "analyzeStreamExecutor", new DummyStrategy());
+
+		try {
+			exe.execute();
+		} catch (Exception e) {
+			Assert.fail("customized flat mapper before");
+		}
+		// Assertion
+		DirectedAcyclicGraph<StreamProducer<Object>, StreamConnector<Object, Object>> dag = exe.dag();
+		Assert.assertEquals("three vertex", 3, dag.vertexSet().size());
+		boolean hasWrapped = false;
+		for (StreamProducer<Object> obj : dag.vertexSet()) {
+			if (obj instanceof FlatMapProducer) {
+				if (((FlatMapProducer) obj).mapper() instanceof JavaStormExecutorForAlertWrapper) {
+					hasWrapped = true;
+					Assert.assertEquals("dummy executor should be wrapped in the alert wrapper func", dummy,
+							((JavaStormExecutorForAlertWrapper) ((FlatMapProducer) obj).mapper() ).getDelegate());
+
+				}
+			}
+		}
+		Assert.assertTrue(hasWrapped);
+
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/resources/application.conf b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/resources/application.conf
index c098a7a..7b80481 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/resources/application.conf
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/resources/application.conf
@@ -36,7 +36,24 @@
 	"alertExecutorConfigs" : {
 		"eventStreamExecutor" : {
 			"parallelism" : 1,
-			"partitioner" : "org.apache.eagle.alert.policy.DefaultPolicyPartitioner"
+			"partitioner" : "org.apache.eagle.policy.DefaultPolicyPartitioner"
+			"needValidation" : "true"
+		}
+	},
+	"persistExecutorConfigs" {
+		"persistExecutor1" : {
+			"kafka": {
+				"bootstrap_servers" : "localhost",
+				"topics" : {
+					"defaultOutput" : "downSampleOutput"
+				}
+			}
+		}
+	},
+	"aggregateExecutorConfigs" : {
+		"aggregateStreamExecutor" : {
+			"parallelism" : 1,
+			"partitioner" : "org.apache.eagle.policy.DefaultPolicyPartitioner"
 			"needValidation" : "true"
 		}
 	},

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/MLAnomalyCallback.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/MLAnomalyCallback.java b/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/MLAnomalyCallback.java
index 22b3e5c..5005c42 100644
--- a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/MLAnomalyCallback.java
+++ b/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/MLAnomalyCallback.java
@@ -16,7 +16,7 @@
  */
 package org.apache.eagle.ml;
 
-import org.apache.eagle.alert.siddhi.EagleAlertContext;
+import org.apache.eagle.policy.PolicyEvaluationContext;
 import org.apache.eagle.ml.model.MLCallbackResult;
 
 public interface MLAnomalyCallback {
@@ -24,5 +24,5 @@ public interface MLAnomalyCallback {
      * @param callbackResult call-backed result
      * @param alertContext context
      */
-	void receive(MLCallbackResult callbackResult,EagleAlertContext alertContext);
+	void receive(MLCallbackResult callbackResult,PolicyEvaluationContext alertContext);
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/MLPolicyEvaluator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/MLPolicyEvaluator.java b/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/MLPolicyEvaluator.java
index c2a8d6b..ff88a7a 100644
--- a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/MLPolicyEvaluator.java
+++ b/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/MLPolicyEvaluator.java
@@ -16,27 +16,31 @@
  */
 package org.apache.eagle.ml;
 
-import org.apache.eagle.alert.common.AlertConstants;
-import org.apache.eagle.alert.config.AbstractPolicyDefinition;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.eagle.policy.common.Constants;
+import org.apache.eagle.policy.config.AbstractPolicyDefinition;
 import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
-import org.apache.eagle.alert.policy.PolicyEvaluator;
-import org.apache.eagle.alert.policy.PolicyManager;
+import org.apache.eagle.policy.PolicyEvaluator;
+import org.apache.eagle.policy.PolicyManager;
 import org.apache.eagle.dataproc.core.JsonSerDeserUtils;
 import org.apache.eagle.dataproc.core.ValuesArray;
 import org.apache.eagle.ml.impl.MLAnomalyCallbackImpl;
 import org.apache.eagle.ml.model.MLAlgorithm;
 import org.apache.eagle.ml.model.MLPolicyDefinition;
 import org.apache.eagle.ml.utils.MLReflectionUtils;
-import com.typesafe.config.Config;
-import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.*;
+import com.typesafe.config.Config;
 
-public class MLPolicyEvaluator implements PolicyEvaluator {
+public class MLPolicyEvaluator implements PolicyEvaluator<AlertDefinitionAPIEntity> {
 	private static Logger LOG = LoggerFactory.getLogger(MLPolicyEvaluator.class);
-    private final String[] sourceStreams;
     private volatile MLRuntime mlRuntime;
 	private String policyName;
 	private Config config;
@@ -61,10 +65,9 @@ public class MLPolicyEvaluator implements PolicyEvaluator {
 	public MLPolicyEvaluator(Config config, String policyName, AbstractPolicyDefinition policyDef, String[] sourceStreams, boolean needValidation){
 		this.config = config;
         this.policyName = policyName;
-        this.sourceStreams = sourceStreams;
         LOG.info("Initializing policy named: "+policyName);
         this.context = new HashMap<>();
-        this.context.put(AlertConstants.SOURCE_STREAMS, StringUtils.join(sourceStreams,","));
+        this.context.put(Constants.SOURCE_STREAMS, StringUtils.join(sourceStreams,","));
 		this.init(policyDef);
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/impl/MLAnomalyCallbackImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/impl/MLAnomalyCallbackImpl.java b/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/impl/MLAnomalyCallbackImpl.java
index 4684d8d..c74826b 100644
--- a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/impl/MLAnomalyCallbackImpl.java
+++ b/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/impl/MLAnomalyCallbackImpl.java
@@ -16,23 +16,24 @@
  */
 package org.apache.eagle.ml.impl;
 
-import org.apache.eagle.alert.common.AlertConstants;
+import java.lang.management.ManagementFactory;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.eagle.policy.common.Constants;
 import org.apache.eagle.alert.entity.AlertAPIEntity;
-import org.apache.eagle.alert.siddhi.EagleAlertContext;
+import org.apache.eagle.policy.PolicyEvaluationContext;
 import org.apache.eagle.common.DateTimeUtil;
 import org.apache.eagle.common.config.EagleConfigConstants;
+import org.apache.eagle.common.metric.AlertContext;
 import org.apache.eagle.ml.MLAnomalyCallback;
 import org.apache.eagle.ml.MLPolicyEvaluator;
 import org.apache.eagle.ml.model.MLCallbackResult;
-import org.apache.eagle.common.metric.AlertContext;
-import com.typesafe.config.Config;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.lang.management.ManagementFactory;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
+import com.typesafe.config.Config;
 
 public class MLAnomalyCallbackImpl implements MLAnomalyCallback {
 	private static Logger LOG = LoggerFactory.getLogger(MLAnomalyCallbackImpl.class);
@@ -54,13 +55,13 @@ public class MLAnomalyCallbackImpl implements MLAnomalyCallback {
      * @param alertContext context
      */
 	@Override
-	public void receive(MLCallbackResult aResult,EagleAlertContext alertContext) {
+	public void receive(MLCallbackResult aResult,PolicyEvaluationContext alertContext) {
 		LOG.info("Receive called with : " + aResult.toString());
         AlertAPIEntity alert = renderAlert(aResult,alertContext);
-        alertContext.alertExecutor.onAlerts(alertContext, Arrays.asList(alert));
+        alertContext.alertExecutor.onEvalEvents(alertContext, Arrays.asList(alert));
 	}
 
-    private AlertAPIEntity renderAlert(MLCallbackResult aResult,EagleAlertContext alertContext){
+    private AlertAPIEntity renderAlert(MLCallbackResult aResult,PolicyEvaluationContext alertContext){
         String site = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.SITE);
         String dataSource = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.DATA_SOURCE);
 
@@ -70,10 +71,10 @@ public class MLAnomalyCallbackImpl implements MLAnomalyCallback {
         Map<String, String> tags = new HashMap<>();
         tags.put(EagleConfigConstants.SITE, site);
         tags.put(EagleConfigConstants.DATA_SOURCE, dataSource);
-        tags.put(AlertConstants.SOURCE_STREAMS, alertContext.evaluator.getAdditionalContext().get(AlertConstants.SOURCE_STREAMS));
-        tags.put(AlertConstants.POLICY_ID, alertContext.policyId);
-        tags.put(AlertConstants.ALERT_SOURCE, source);
-        tags.put(AlertConstants.ALERT_EXECUTOR_ID, alertContext.alertExecutor.getAlertExecutorId());
+        tags.put(Constants.SOURCE_STREAMS, (String)alertContext.evaluator.getAdditionalContext().get(Constants.SOURCE_STREAMS));
+        tags.put(Constants.POLICY_ID, alertContext.policyId);
+        tags.put(Constants.ALERT_SOURCE, source);
+        tags.put(Constants.ALERT_EXECUTOR_ID, alertContext.alertExecutor.getExecutorId());
         entity.setTags(tags);
 
         entity.setTimestamp(aResult.getTimestamp());
@@ -83,9 +84,9 @@ public class MLAnomalyCallbackImpl implements MLAnomalyCallback {
         if(aResult.getContext() != null) context.addAll(aResult.getContext());
 
         String alertMessage = "Anomaly activities detected by algorithm ["+aResult.getAlgorithmName()+"] with information: " + aResult.toString() ;
-        context.addProperty(AlertConstants.ALERT_EVENT, aResult.toString());
-        context.addProperty(AlertConstants.ALERT_MESSAGE, alertMessage);
-        context.addProperty(AlertConstants.ALERT_TIMESTAMP_PROPERTY, DateTimeUtil.millisecondsToHumanDateWithSeconds(System.currentTimeMillis()));
+        context.addProperty(Constants.ALERT_EVENT, aResult.toString());
+        context.addProperty(Constants.ALERT_MESSAGE, alertMessage);
+        context.addProperty(Constants.ALERT_TIMESTAMP_PROPERTY, DateTimeUtil.millisecondsToHumanDateWithSeconds(System.currentTimeMillis()));
 
         try {
             site = config.getString("eagleProps.site");
@@ -98,7 +99,7 @@ public class MLAnomalyCallbackImpl implements MLAnomalyCallback {
 
         context.addProperty(EagleConfigConstants.DATA_SOURCE, dataSource);
         context.addProperty(EagleConfigConstants.SITE, site);
-        context.addProperty(AlertConstants.POLICY_NAME, alertContext.policyId);
+        context.addProperty(Constants.POLICY_NAME, alertContext.policyId);
 
         entity.setAlertContext(context);
         return entity;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/impl/MLPolicyEvaluatorServiceProviderImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/impl/MLPolicyEvaluatorServiceProviderImpl.java b/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/impl/MLPolicyEvaluatorServiceProviderImpl.java
index f61a902..d0ac75f 100644
--- a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/impl/MLPolicyEvaluatorServiceProviderImpl.java
+++ b/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/impl/MLPolicyEvaluatorServiceProviderImpl.java
@@ -16,9 +16,9 @@
  */
 package org.apache.eagle.ml.impl;
 
-import org.apache.eagle.alert.common.AlertConstants;
-import org.apache.eagle.alert.policy.PolicyEvaluator;
-import org.apache.eagle.alert.policy.PolicyEvaluatorServiceProvider;
+import org.apache.eagle.policy.common.Constants;
+import org.apache.eagle.policy.PolicyEvaluator;
+import org.apache.eagle.policy.PolicyEvaluatorServiceProvider;
 import org.apache.eagle.ml.MLPolicyEvaluator;
 import org.apache.eagle.ml.model.MLPolicyDefinition;
 import com.fasterxml.jackson.databind.Module;
@@ -35,7 +35,7 @@ public class MLPolicyEvaluatorServiceProviderImpl implements PolicyEvaluatorServ
 
 	@Override
 	public String getPolicyType() {
-		return AlertConstants.policyType.MachineLearning.name();
+		return Constants.policyType.MachineLearning.name();
 	}
 
 	@Override
@@ -45,7 +45,7 @@ public class MLPolicyEvaluatorServiceProviderImpl implements PolicyEvaluatorServ
 
 	@Override
 	public List<Module> getBindingModules() {
-		Module module1 = new SimpleModule(AlertConstants.POLICY_DEFINITION).registerSubtypes(new NamedType(MLPolicyDefinition.class, getPolicyType()));
+		Module module1 = new SimpleModule(Constants.POLICY_DEFINITION).registerSubtypes(new NamedType(MLPolicyDefinition.class, getPolicyType()));
 		Module module2 = new SimpleModule(ALERT_CONTEXT).registerSubtypes(new NamedType(Properties.class, getPolicyType()));
 		return Arrays.asList(module1, module2);
 	}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/model/MLPolicyDefinition.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/model/MLPolicyDefinition.java b/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/model/MLPolicyDefinition.java
index 71fabbf..70e0f71 100644
--- a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/model/MLPolicyDefinition.java
+++ b/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/java/org/apache/eagle/ml/model/MLPolicyDefinition.java
@@ -16,7 +16,7 @@
  */
 package org.apache.eagle.ml.model;
 
-import org.apache.eagle.alert.config.AbstractPolicyDefinition;
+import org.apache.eagle.policy.config.AbstractPolicyDefinition;
 import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
 import com.fasterxml.jackson.annotation.JsonTypeInfo;
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/resources/META-INF/services/org.apache.eagle.alert.policy.PolicyEvaluatorServiceProvider
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/resources/META-INF/services/org.apache.eagle.alert.policy.PolicyEvaluatorServiceProvider b/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/resources/META-INF/services/org.apache.eagle.alert.policy.PolicyEvaluatorServiceProvider
deleted file mode 100644
index d7a2754..0000000
--- a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/resources/META-INF/services/org.apache.eagle.alert.policy.PolicyEvaluatorServiceProvider
+++ /dev/null
@@ -1,16 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-org.apache.eagle.ml.impl.MLPolicyEvaluatorServiceProviderImpl
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/resources/META-INF/services/org.apache.eagle.policy.PolicyEvaluatorServiceProvider
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/resources/META-INF/services/org.apache.eagle.policy.PolicyEvaluatorServiceProvider b/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/resources/META-INF/services/org.apache.eagle.policy.PolicyEvaluatorServiceProvider
new file mode 100644
index 0000000..d7a2754
--- /dev/null
+++ b/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/main/resources/META-INF/services/org.apache.eagle.policy.PolicyEvaluatorServiceProvider
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.eagle.ml.impl.MLPolicyEvaluatorServiceProviderImpl
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/test/resources/META-INF/services/org.apache.eagle.alert.policy.PolicyEvaluatorServiceProvider
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/test/resources/META-INF/services/org.apache.eagle.alert.policy.PolicyEvaluatorServiceProvider b/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/test/resources/META-INF/services/org.apache.eagle.alert.policy.PolicyEvaluatorServiceProvider
deleted file mode 100644
index d7a2754..0000000
--- a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/test/resources/META-INF/services/org.apache.eagle.alert.policy.PolicyEvaluatorServiceProvider
+++ /dev/null
@@ -1,16 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-org.apache.eagle.ml.impl.MLPolicyEvaluatorServiceProviderImpl
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/test/resources/META-INF/services/org.apache.eagle.policy.PolicyEvaluatorServiceProvider
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/test/resources/META-INF/services/org.apache.eagle.policy.PolicyEvaluatorServiceProvider b/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/test/resources/META-INF/services/org.apache.eagle.policy.PolicyEvaluatorServiceProvider
new file mode 100644
index 0000000..d7a2754
--- /dev/null
+++ b/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/test/resources/META-INF/services/org.apache.eagle.policy.PolicyEvaluatorServiceProvider
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.eagle.ml.impl.MLPolicyEvaluatorServiceProviderImpl
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/test/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/test/resources/application.conf b/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/test/resources/application.conf
index 4d8f08c..657011c 100644
--- a/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/test/resources/application.conf
+++ b/eagle-core/eagle-machinelearning/eagle-machinelearning-base/src/test/resources/application.conf
@@ -46,7 +46,7 @@
   "alertExecutorConfigs" : {
      "userAnomalousActivityDetectionAlertExecutor" : {
        "parallelism" : 2,
-       "partitioner" : "org.apache.eagle.alert.policy.DefaultPolicyPartitioner"
+       "partitioner" : "org.apache.eagle.policy.DefaultPolicyPartitioner"
      }
   },
   "dynamicConfigSource" : {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-policy/eagle-policy-base/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/pom.xml b/eagle-core/eagle-policy/eagle-policy-base/pom.xml
new file mode 100644
index 0000000..5f2b9f8
--- /dev/null
+++ b/eagle-core/eagle-policy/eagle-policy-base/pom.xml
@@ -0,0 +1,62 @@
+<?xml version="1.0"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>eagle</groupId>
+        <artifactId>eagle-policy-parent</artifactId>
+        <version>0.3.0</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+
+    <artifactId>eagle-policy-base</artifactId>
+    <packaging>jar</packaging>
+    <name>eagle-policy-base</name>
+
+    <dependencies>
+        <dependency>
+            <groupId>eagle</groupId>
+            <artifactId>eagle-entity-base</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>eagle</groupId>
+            <artifactId>eagle-client-base</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>eagle</groupId>
+            <artifactId>eagle-metric</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>eagle</groupId>
+            <artifactId>eagle-stream-process-base</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.wso2.siddhi</groupId>
+            <artifactId>siddhi-extension-string</artifactId>
+            <version>${siddhi.version}</version>
+        </dependency>
+    </dependencies>
+</project>
+

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AbstractPolicyDefinitionEntity.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AbstractPolicyDefinitionEntity.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AbstractPolicyDefinitionEntity.java
new file mode 100644
index 0000000..3f45be7
--- /dev/null
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AbstractPolicyDefinitionEntity.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.entity;
+
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+
+@SuppressWarnings("serial")
+public abstract class AbstractPolicyDefinitionEntity extends TaggedLogAPIEntity {
+	
+	public abstract String getPolicyDef();
+	
+	public abstract boolean isEnabled();
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertAPIEntity.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertAPIEntity.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertAPIEntity.java
new file mode 100644
index 0000000..0e5d3c8
--- /dev/null
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertAPIEntity.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.entity;
+
+import org.apache.eagle.policy.common.Constants;
+import org.apache.eagle.common.metric.AlertContext;
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.meta.Column;
+import org.apache.eagle.log.entity.meta.ColumnFamily;
+import org.apache.eagle.log.entity.meta.Prefix;
+import org.apache.eagle.log.entity.meta.Service;
+import org.apache.eagle.log.entity.meta.Table;
+import org.apache.eagle.log.entity.meta.TimeSeries;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@Table("alertdetail")
+@ColumnFamily("f")
+@Prefix("hadoop")
+@Service(Constants.ALERT_SERVICE_ENDPOINT_NAME)
+@TimeSeries(true)
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class AlertAPIEntity extends TaggedLogAPIEntity{
+	@Column("description")
+	private String description;
+	@Column("remediationID")
+	private String remediationID;
+	@Column("remediationCallback")
+	private String remediationCallback;
+	@Column("alertContext")
+	private AlertContext alertContext;
+	@Column("streamId")
+	private String streamId;
+
+	public String getDescription() {
+		return description;
+	}
+
+	public void setDescription(String description) {
+		this.description = description;
+		_pcs.firePropertyChange("description", null, null);
+	}
+
+	public String getRemediationID() {
+		return remediationID;
+	}
+
+	public void setRemediationID(String remediationID) {
+		this.remediationID = remediationID;
+		_pcs.firePropertyChange("remediationID", null, null);
+	}
+
+	public String getRemediationCallback() {
+		return remediationCallback;
+	}
+
+	public void setRemediationCallback(String remediationCallback) {
+		this.remediationCallback = remediationCallback;
+		_pcs.firePropertyChange("remediationCallback", null, null);
+	}
+
+	public AlertContext getAlertContext() {
+		return alertContext;
+	}
+	
+	public void setAlertContext(AlertContext alertContext) {
+		this.alertContext = alertContext;
+		_pcs.firePropertyChange("alertContext", null, null);
+	}
+
+	public String getStreamId() {
+		return streamId;
+	}
+
+	public void setStreamId(String streamId) {
+		this.streamId = streamId;
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertContextSerDeser.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertContextSerDeser.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertContextSerDeser.java
new file mode 100644
index 0000000..f001725
--- /dev/null
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertContextSerDeser.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.entity;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.eagle.common.metric.AlertContext;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import org.apache.eagle.log.entity.meta.EntitySerDeser;
+
+public class AlertContextSerDeser implements EntitySerDeser<AlertContext> {
+
+	@Override
+	public AlertContext deserialize(byte[] bytes) {
+		AlertContext context = new AlertContext();
+		Map<String, String> properties = new HashMap<String, String>();
+		final int length = bytes.length;
+		if (length < 4) { return context; }
+		int size = Bytes.toInt(bytes, 0, 4);
+		
+		int offset = 4;
+		for (int i = 0; i < size; i++) {
+			int keySize =  Bytes.toInt(bytes, offset, 4);
+			offset += 4;
+			int valueSize =  Bytes.toInt(bytes, offset, 4);
+			offset += 4;
+			String key = Bytes.toString(bytes, offset, keySize);
+			offset += keySize;
+			String value =Bytes.toString(bytes, offset, valueSize);
+			offset += valueSize;
+			properties.put(key, value);
+		}
+		context.addAll(properties);
+		return context;
+	}
+
+	@Override
+	public byte[] serialize(AlertContext context) {
+		
+		final Map<String, String> pair = context.getProperties();
+		int totalSize = 4;
+		for (Entry<String, String> entry : pair.entrySet()) {
+			String key = entry.getKey();
+			String value = entry.getValue();
+            int keySize = 0;
+            if(key!=null) keySize = key.getBytes().length;
+			int valueSize = 0;
+            if(value!=null) valueSize = value.getBytes().length;
+			totalSize += keySize + valueSize + 8;
+		}
+		byte[] buffer = new byte[totalSize];
+		
+		Bytes.putInt(buffer, 0, pair.size());
+		int offset = 4;
+		for (Entry<String, String> entry : pair.entrySet()) {
+			String key = entry.getKey();
+			String value = entry.getValue();
+
+			int keySize = key !=null ? key.getBytes().length : 0;
+            int valueSize = value != null ? value.getBytes().length:0;
+
+            Bytes.putInt(buffer, offset, keySize);
+			offset += 4;
+			Bytes.putInt(buffer, offset, valueSize);
+			offset += 4;
+
+
+            Bytes.putBytes(buffer, offset, key != null ? key.getBytes() : new byte[0], 0, keySize);
+			offset += keySize;
+			Bytes.putBytes(buffer, offset, value != null ? value.getBytes() : new byte[0], 0, valueSize);
+			offset += valueSize;
+		}
+		return buffer;
+	}
+
+
+	@Override
+	public Class<AlertContext> type(){
+		return AlertContext.class;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertDataSourceEntity.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertDataSourceEntity.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertDataSourceEntity.java
new file mode 100644
index 0000000..173481d
--- /dev/null
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertDataSourceEntity.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.entity;
+
+import org.apache.eagle.policy.common.Constants;
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.meta.*;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@Table("alertDataSource")
+@ColumnFamily("f")
+@Prefix("alertDataSource")
+@Service(Constants.ALERT_DATA_SOURCE_SERVICE_ENDPOINT_NAME)
+@JsonIgnoreProperties(ignoreUnknown = true)
+@TimeSeries(false)
+@Tags({"site", "dataSource"})
+public class AlertDataSourceEntity extends TaggedLogAPIEntity{
+    @Column("a")
+    private boolean enabled;
+    @Column("b")
+    private String config;
+    @Column("c")
+    private String desc;
+
+    public String getConfig() {
+        return config;
+    }
+
+    public void setConfig(String config) {
+        this.config = config;
+        valueChanged("config");
+    }
+
+    public boolean isEnabled() {
+        return enabled;
+    }
+
+    public void setEnabled(boolean enabled) {
+        this.enabled = enabled;
+        valueChanged("enabled");
+    }
+
+    public String getDesc() {
+        return desc;
+    }
+
+    public void setDesc(String desc) {
+        this.desc = desc;
+        valueChanged("desc");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertDefinitionAPIEntity.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertDefinitionAPIEntity.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertDefinitionAPIEntity.java
new file mode 100644
index 0000000..727506a
--- /dev/null
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertDefinitionAPIEntity.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.entity;
+
+import org.apache.commons.lang.builder.HashCodeBuilder;
+import org.apache.eagle.policy.common.Constants;
+import org.apache.eagle.log.entity.meta.Column;
+import org.apache.eagle.log.entity.meta.ColumnFamily;
+import org.apache.eagle.log.entity.meta.Index;
+import org.apache.eagle.log.entity.meta.Indexes;
+import org.apache.eagle.log.entity.meta.Prefix;
+import org.apache.eagle.log.entity.meta.Service;
+import org.apache.eagle.log.entity.meta.Table;
+import org.apache.eagle.log.entity.meta.Tags;
+import org.apache.eagle.log.entity.meta.TimeSeries;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+/**
+ * site: site name
+ * dataSource: data source name
+ *
+ * alertExecutorId: Group Policy by alertExecutorId, the policy definition with the sample ["site", "dataSource", "alertExecutorId"] should run on the sample alert executor
+ *
+ * policyId: policy name, should be unique
+ * policyType: policy engine implementation type
+ */
+@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@Table("alertdef")
+@ColumnFamily("f")
+@Prefix("alertdef")
+@Service(Constants.ALERT_DEFINITION_SERVICE_ENDPOINT_NAME)
+@JsonIgnoreProperties(ignoreUnknown = true)
+@TimeSeries(false)
+@Tags({"site", "dataSource", "alertExecutorId", "policyId", "policyType"})
+@Indexes({
+	@Index(name="Index_1_alertExecutorId", columns = { "alertExecutorID" }, unique = true),
+})
+public class AlertDefinitionAPIEntity extends AbstractPolicyDefinitionEntity {
+	@Column("a")
+	private String desc;
+	@Column("b")
+	private String policyDef;
+	@Column("c")
+	private String dedupeDef;
+	@Column("d")
+	private String notificationDef;
+	@Column("e")
+	private String remediationDef;
+	@Column("f")
+	private boolean enabled;
+	@Column("g")
+	private String owner;	
+	@Column("h")
+	private long lastModifiedDate;
+	@Column("i")
+	private long severity;
+	@Column("j")
+	private long createdTime;
+
+	public String getDesc() {
+		return desc;
+	}
+	public void setDesc(String desc) {
+		this.desc = desc;
+		valueChanged("desc");
+	}
+	public String getPolicyDef() {
+		return policyDef;
+	}
+	public void setPolicyDef(String policyDef) {
+		this.policyDef = policyDef;
+		valueChanged("policyDef");
+	}
+	public String getDedupeDef() {
+		return dedupeDef;
+	}
+	public void setDedupeDef(String dedupeDef) {
+		this.dedupeDef = dedupeDef;
+		valueChanged("dedupeDef");
+	}
+	public String getNotificationDef() {
+		return notificationDef;
+	}
+	public void setNotificationDef(String notificationDef) {
+		this.notificationDef = notificationDef;
+		valueChanged("notificationDef");
+	}
+	public String getRemediationDef() {
+		return remediationDef;
+	}
+	public void setRemediationDef(String remediationDef) {
+		this.remediationDef = remediationDef;
+		valueChanged("remediationDef");
+	}
+	public boolean isEnabled() {
+		return enabled;
+	}
+	public void setEnabled(boolean enabled) {
+		this.enabled = enabled;
+		valueChanged("enabled");
+	}
+	public String getOwner() {
+		return owner;
+	}
+	public void setOwner(String owner) {
+		this.owner = owner;
+		valueChanged("owner");
+	}	
+	public long getLastModifiedDate() {
+		return lastModifiedDate;
+	}
+	public void setLastModifiedDate(long lastModifiedDate) {
+		this.lastModifiedDate = lastModifiedDate;
+		valueChanged("lastModifiedDate");
+	}	
+	public long getSeverity() {
+		return severity;
+	}
+	public void setSeverity(long severity) {
+		this.severity = severity;
+		valueChanged("severity");
+	}	
+	public long getCreatedTime() {
+		return createdTime;
+	}
+	public void setCreatedTime(long createdTime) {
+		this.createdTime = createdTime;
+		valueChanged("createdTime");
+	}
+	public boolean equals(Object o){
+		if(o == this)
+			return true;
+		if(!(o instanceof AlertDefinitionAPIEntity))
+			return false;
+		AlertDefinitionAPIEntity that = (AlertDefinitionAPIEntity)o;
+		if(that.enabled == this.enabled &&
+				compare(that.policyDef, this.policyDef) &&
+				compare(that.dedupeDef, this.dedupeDef) &&
+				compare(that.notificationDef, this.notificationDef) &&
+				compare(that.remediationDef, this.remediationDef))
+			return true;
+		return false;
+	}
+	
+	private boolean compare(String a, String b){
+		if(a == b)
+			return true;
+		if(a == null || b == null)
+			return false;
+		if(a.equals(b))
+			return true;
+		return false;
+	}
+	
+	public int hashCode(){
+		HashCodeBuilder builder = new HashCodeBuilder();
+		builder.append(enabled);
+		builder.append(policyDef);
+		builder.append(dedupeDef);
+		builder.append(notificationDef);
+		builder.append(remediationDef);
+		return builder.toHashCode();
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertEntityRepository.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertEntityRepository.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertEntityRepository.java
new file mode 100644
index 0000000..ad518e9
--- /dev/null
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertEntityRepository.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.entity;
+
+import org.apache.eagle.common.metric.AlertContext;
+import org.apache.eagle.log.entity.repo.EntityRepository;
+
+public class AlertEntityRepository extends EntityRepository {
+	public AlertEntityRepository() {
+		serDeserMap.put(AlertContext.class, new AlertContextSerDeser());
+		entitySet.add(AlertAPIEntity.class);
+		entitySet.add(AlertDefinitionAPIEntity.class);
+		entitySet.add(AlertStreamSchemaEntity.class);
+		entitySet.add(AlertStreamEntity.class);
+		entitySet.add(AlertDataSourceEntity.class);
+        entitySet.add(AlertExecutorEntity.class);
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertExecutorEntity.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertExecutorEntity.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertExecutorEntity.java
new file mode 100644
index 0000000..97f18f7
--- /dev/null
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertExecutorEntity.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.entity;
+
+import org.apache.eagle.policy.common.Constants;
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.meta.*;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@Table("alertExecutor")
+@ColumnFamily("f")
+@Prefix("alertExecutor")
+@Service(Constants.ALERT_EXECUTOR_SERVICE_ENDPOINT_NAME)
+@JsonIgnoreProperties(ignoreUnknown = true)
+@TimeSeries(false)
+@Tags({"dataSource", "alertExecutorId", "streamName"})
+public class AlertExecutorEntity extends TaggedLogAPIEntity{
+    @Column("a")
+    private String desc;
+
+    public String getDesc() {
+        return desc;
+    }
+
+    public void setDesc(String desc) {
+        this.desc = desc;
+        valueChanged("desc");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertStreamEntity.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertStreamEntity.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertStreamEntity.java
new file mode 100644
index 0000000..adff3d5
--- /dev/null
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertStreamEntity.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.entity;
+
+import org.apache.eagle.policy.common.Constants;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.meta.Column;
+import org.apache.eagle.log.entity.meta.ColumnFamily;
+import org.apache.eagle.log.entity.meta.Prefix;
+import org.apache.eagle.log.entity.meta.Service;
+import org.apache.eagle.log.entity.meta.Table;
+import org.apache.eagle.log.entity.meta.Tags;
+import org.apache.eagle.log.entity.meta.TimeSeries;
+
+@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@Table("alertStream")
+@ColumnFamily("f")
+@Prefix("alertStream")
+@Service(Constants.ALERT_STREAM_SERVICE_ENDPOINT_NAME)
+@JsonIgnoreProperties(ignoreUnknown = true)
+@TimeSeries(false)
+@Tags({"dataSource", "streamName"})
+public class AlertStreamEntity extends TaggedLogAPIEntity{
+	@Column("a")
+	private String desc;
+
+	public String getDesc() {
+		return desc;
+	}
+	public void setDesc(String desc) {
+		this.desc = desc;
+		valueChanged("desc");
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertStreamSchemaEntity.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertStreamSchemaEntity.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertStreamSchemaEntity.java
new file mode 100644
index 0000000..2ba2166
--- /dev/null
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertStreamSchemaEntity.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.entity;
+
+import org.apache.eagle.policy.common.Constants;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.meta.Column;
+import org.apache.eagle.log.entity.meta.ColumnFamily;
+import org.apache.eagle.log.entity.meta.Prefix;
+import org.apache.eagle.log.entity.meta.Service;
+import org.apache.eagle.log.entity.meta.Table;
+import org.apache.eagle.log.entity.meta.Tags;
+import org.apache.eagle.log.entity.meta.TimeSeries;
+
+/**
+ * ddl to create streammetadata table
+ * 
+ * create 'alertStreamSchema', {NAME => 'f', BLOOMFILTER => 'ROW', VERSIONS => '1', COMPRESSION => 'SNAPPY'}
+ */
+@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@Table("alertStreamSchema")
+@ColumnFamily("f")
+@Prefix("alertStreamSchema")
+@Service(Constants.ALERT_STREAM_SCHEMA_SERVICE_ENDPOINT_NAME)
+@JsonIgnoreProperties(ignoreUnknown = true)
+@TimeSeries(false)
+@Tags({"dataSource", "streamName", "attrName"})
+public class AlertStreamSchemaEntity extends TaggedLogAPIEntity{
+	@Column("a")
+	private String attrType;
+	@Column("b")
+	private String category;
+	@Column("c")
+	private String attrValueResolver;
+	/* all tags form the key for alert de-duplication */
+	@Column("d")
+	private Boolean usedAsTag;
+	@Column("e")
+	private String attrDescription;
+	@Column("f")
+	private String attrDisplayName;	
+	@Column("g")
+	private String defaultValue;
+
+	public String getAttrType() {
+		return attrType;
+	}
+	public void setAttrType(String attrType) {
+		this.attrType = attrType;
+		valueChanged("attrType");
+	}
+	public String getCategory() {
+		return category;
+	}
+	public void setCategory(String category) {
+		this.category = category;
+		valueChanged("category");
+	}
+	public String getAttrValueResolver() {
+		return attrValueResolver;
+	}
+	public void setAttrValueResolver(String attrValueResolver) {
+		this.attrValueResolver = attrValueResolver;
+		valueChanged("attrValueResolver");
+	}
+	public Boolean getUsedAsTag() {
+		return usedAsTag;
+	}
+	public void setUsedAsTag(Boolean usedAsTag) {
+		this.usedAsTag = usedAsTag;
+		valueChanged("usedAsTag");
+	}
+	public String getAttrDescription() {
+		return attrDescription;
+	}
+	public void setAttrDescription(String attrDescription) {
+		this.attrDescription = attrDescription;
+		valueChanged("attrDescription");
+	}
+	public String getAttrDisplayName() {
+		return attrDisplayName;
+	}
+	public void setAttrDisplayName(String attrDisplayName) {
+		this.attrDisplayName = attrDisplayName;
+		valueChanged("attrDisplayName");
+	}
+	public String getDefaultValue() {
+		return defaultValue;
+	}
+	public void setDefaultValue(String defaultValue) {
+		this.defaultValue = defaultValue;
+		valueChanged("defaultValue");
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/DefaultPolicyPartitioner.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/DefaultPolicyPartitioner.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/DefaultPolicyPartitioner.java
new file mode 100644
index 0000000..1143b11
--- /dev/null
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/DefaultPolicyPartitioner.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.policy;
+
+
+public class DefaultPolicyPartitioner implements PolicyPartitioner{
+	@Override
+	public int partition(int numTotalPartitions, String policyType,
+			String policyId) {
+		final int prime = 31;
+		int result = 1;
+		result = result * prime + policyType.hashCode();
+		result = result < 0 ? result*-1 : result;
+		result = result * prime + policyId.hashCode();
+		result = result < 0 ? result*-1 : result;
+		return result % numTotalPartitions;
+	}
+}