You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2016/06/02 07:08:14 UTC

[35/46] incubator-eagle git commit: [EAGLE-325] Initialize next-gen alert engine code on branch-0.5

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/bin/start-integration2.sh
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/bin/start-integration2.sh b/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/bin/start-integration2.sh
new file mode 100755
index 0000000..5821309
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/bin/start-integration2.sh
@@ -0,0 +1,33 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+echo $(dirname $0)
+
+export MAVEN_OPTS=-Xms256M -Xmx1024M
+
+#start topology
+echo "starting topology..."
+cd $(dirname $0)/../../alert-engine/alert-engine-base/
+
+
+echo " as dev tests, tail -f test.log | grep AlertStreamEvent for alert stream....."
+
+
+mvn test -Dtest=org.apache.eagle.alert.engine.e2e.Integration2 | tee test.log
+
+# tail log output
+# tail -f test.log | grep AlertStreamEvent
+

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/bin/start-metadata.sh
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/bin/start-metadata.sh b/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/bin/start-metadata.sh
new file mode 100755
index 0000000..1732c04
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/bin/start-metadata.sh
@@ -0,0 +1,20 @@
+#!/usr/bin/env bash
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+cd $(dirname $0)/../../alert-metadata-parent/alert-metadata-service
+
+mvn jetty:run 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/bin/start-sampleclient1.sh
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/bin/start-sampleclient1.sh b/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/bin/start-sampleclient1.sh
new file mode 100755
index 0000000..df60739
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/bin/start-sampleclient1.sh
@@ -0,0 +1,20 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+cd $(dirname $0)/../../alert-assembly/
+
+java -cp ${MAVEN_REPO}/org/apache/storm/storm-core/0.9.3/storm-core-0.9.3.jar:target/alert-engine-0.0.1-SNAPSHOT-alert-assembly.jar org.apache.eagle.alert.engine.e2e.SampleClient1
+

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/bin/start-sampleclient2.sh
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/bin/start-sampleclient2.sh b/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/bin/start-sampleclient2.sh
new file mode 100755
index 0000000..d5480a9
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/bin/start-sampleclient2.sh
@@ -0,0 +1,20 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+cd $(dirname $0)/../../alert-assembly/
+
+java -cp ${MAVEN_REPO}/org/apache/storm/storm-core/0.9.3/storm-core-0.9.3.jar:target/alert-engine-0.0.1-SNAPSHOT-alert-assembly.jar org.apache.eagle.alert.engine.e2e.SampleClient2
+

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/bin/start-zk-kafka.sh
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/bin/start-zk-kafka.sh b/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/bin/start-zk-kafka.sh
new file mode 100755
index 0000000..c58de65
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/bin/start-zk-kafka.sh
@@ -0,0 +1,28 @@
+#!/usr/bin/env bash
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+cd $(dirname $0)/..
+
+echo "Starting zookeeper"
+bin/zookeeper-server-start.sh -daemon conf/zookeeper-server.properties
+sleep 1
+bin/zookeeper-server-status.sh
+
+echo "Starting kafka"
+bin/kafka-server-start.sh -daemon conf/kafka-server.properties
+sleep 1
+bin/kafka-server-status.sh
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/bin/stop-zk-kafka.sh
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/bin/stop-zk-kafka.sh b/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/bin/stop-zk-kafka.sh
new file mode 100755
index 0000000..dc8a898
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/bin/stop-zk-kafka.sh
@@ -0,0 +1,28 @@
+#!/usr/bin/env bash
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+cd $(dirname $0)/..
+
+echo "Stopping zookeeper"
+bin/zookeeper-server-stop.sh -daemon conf/zookeeper-server.properties
+sleep 1
+bin/zookeeper-server-status.sh
+
+echo "Stopping kafka"
+bin/kafka-server-stop.sh -daemon conf/kafka-server.properties
+sleep 1
+bin/kafka-server-status.sh
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/bin/zookeeper-server-start.sh
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/bin/zookeeper-server-start.sh b/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/bin/zookeeper-server-start.sh
new file mode 100755
index 0000000..a68c271
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/bin/zookeeper-server-start.sh
@@ -0,0 +1,50 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+if [ $# -lt 1 ];
+then
+	echo "USAGE: $0 [-daemon] conf/zookeeper-server.properties"
+	exit 1
+fi
+base_dir=$(dirname $0)
+
+if [ "x$LOG4J_OPTS" = "x" ]; then
+    export LOG4J_OPTS="-Dlog4j.configuration=file:$base_dir/../conf/log4j.properties"
+fi
+
+if [ "x$EAGLE_HEAP_OPTS" = "x" ]; then
+    export EAGLE_HEAP_OPTS="-Xmx512M -Xms512M"
+fi
+
+EXTRA_ARGS="-name zookeeper -loggc"
+
+COMMAND=$1
+case $COMMAND in
+  -daemon)
+     EXTRA_ARGS="-daemon "$EXTRA_ARGS
+     shift
+     ;;
+ *)
+     ;;
+esac
+
+$base_dir/zookeeper-server-status.sh 1>/dev/null 2>&1
+if [ "$?" == "" ];then
+	echo "Zookeeper is still running, please stop firstly before starting"
+	exit 0
+else
+	exec $base_dir/run-class.sh $EXTRA_ARGS org.apache.zookeeper.server.quorum.QuorumPeerMain "$@"
+fi
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/bin/zookeeper-server-status.sh
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/bin/zookeeper-server-status.sh b/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/bin/zookeeper-server-status.sh
new file mode 100755
index 0000000..223a310
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/bin/zookeeper-server-status.sh
@@ -0,0 +1,24 @@
+#!/bin/sh
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+PIDS=$(ps ax | grep java | grep -i QuorumPeerMain | grep -v grep | awk '{print $1}')
+
+if [ -z "$PIDS" ]; then
+  echo "No zookeeper server is running"
+  exit 1
+else
+  echo "Zookeeper server is running at $PIDS"
+  exit 0
+fi
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/bin/zookeeper-server-stop.sh
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/bin/zookeeper-server-stop.sh b/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/bin/zookeeper-server-stop.sh
new file mode 100755
index 0000000..4155182
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/bin/zookeeper-server-stop.sh
@@ -0,0 +1,24 @@
+#!/bin/sh
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+PIDS=$(ps ax | grep java | grep -i QuorumPeerMain | grep -v grep | awk '{print $1}')
+
+if [ -z "$PIDS" ]; then
+  echo "No zookeeper server to stop"
+  exit 1
+else
+  kill -s TERM $PIDS
+fi
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/conf/cli-log4j.properties
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/conf/cli-log4j.properties b/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/conf/cli-log4j.properties
new file mode 100644
index 0000000..d59ded6
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/conf/cli-log4j.properties
@@ -0,0 +1,21 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+log4j.rootLogger=INFO, stdout
+
+# standard output
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/conf/kafka-server.properties
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/conf/kafka-server.properties b/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/conf/kafka-server.properties
new file mode 100644
index 0000000..a9e0010
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/conf/kafka-server.properties
@@ -0,0 +1,115 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# see kafka.server.KafkaConfig for additional details and defaults
+
+############################# Server Basics #############################
+
+# The id of the broker. This must be set to a unique integer for each broker.
+broker.id=0
+
+############################# Socket Server Settings #############################
+
+# The address the socket server listens on. It will get the value returned from
+# java.net.InetAddress.getCanonicalHostName() if not configured.
+#   FORMAT:
+#     listeners = security_protocol://host_name:port
+#   EXAMPLE:
+#     listeners = PLAINTEXT://your.host.name:9092
+#listeners=PLAINTEXT://:9092
+
+# Hostname and port the broker will advertise to producers and consumers. If not set,
+# it uses the value for "listeners" if configured.  Otherwise, it will use the value
+# returned from java.net.InetAddress.getCanonicalHostName().
+#advertised.listeners=PLAINTEXT://your.host.name:9092
+
+# The number of threads handling network requests
+num.network.threads=3
+
+# The number of threads doing disk I/O
+num.io.threads=8
+
+# The send buffer (SO_SNDBUF) used by the socket server
+socket.send.buffer.bytes=102400
+
+# The receive buffer (SO_RCVBUF) used by the socket server
+socket.receive.buffer.bytes=102400
+
+# The maximum size of a request that the socket server will accept (protection against OOM)
+socket.request.max.bytes=104857600
+
+
+############################# Log Basics #############################
+
+# A comma seperated list of directories under which to store log files
+log.dirs=/tmp/dev-kafka-logs
+
+# The default number of log partitions per topic. More partitions allow greater
+# parallelism for consumption, but this will also result in more files across
+# the brokers.
+num.partitions=1
+
+# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
+# This value is recommended to be increased for installations with data dirs located in RAID array.
+num.recovery.threads.per.data.dir=1
+
+############################# Log Flush Policy #############################
+
+# Messages are immediately written to the filesystem but by default we only fsync() to sync
+# the OS cache lazily. The following configurations control the flush of data to disk.
+# There are a few important trade-offs here:
+#    1. Durability: Unflushed data may be lost if you are not using replication.
+#    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
+#    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks.
+# The settings below allow one to configure the flush policy to flush data after a period of time or
+# every N messages (or both). This can be done globally and overridden on a per-topic basis.
+
+# The number of messages to accept before forcing a flush of data to disk
+#log.flush.interval.messages=10000
+
+# The maximum amount of time a message can sit in a log before we force a flush
+#log.flush.interval.ms=1000
+
+############################# Log Retention Policy #############################
+
+# The following configurations control the disposal of log segments. The policy can
+# be set to delete segments after a period of time, or after a given size has accumulated.
+# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
+# from the end of the log.
+
+# The minimum age of a log file to be eligible for deletion
+log.retention.hours=168
+
+# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
+# segments don't drop below log.retention.bytes.
+#log.retention.bytes=1073741824
+
+# The maximum size of a log segment file. When this size is reached a new log segment will be created.
+log.segment.bytes=1073741824
+
+# The interval at which log segments are checked to see if they can be deleted according
+# to the retention policies
+log.retention.check.interval.ms=300000
+
+############################# Zookeeper #############################
+
+# Zookeeper connection string (see zookeeper docs for details).
+# This is a comma separated host:port pairs, each corresponding to a zk
+# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
+# You can also append an optional chroot string to the urls to specify the
+# root directory for all kafka znodes.
+zookeeper.connect=localhost:2181
+
+# Timeout in ms for connecting to zookeeper
+zookeeper.connection.timeout.ms=6000
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/conf/log4j.properties
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/conf/log4j.properties b/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/conf/log4j.properties
new file mode 100644
index 0000000..d59ded6
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/conf/log4j.properties
@@ -0,0 +1,21 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+log4j.rootLogger=INFO, stdout
+
+# standard output
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/conf/zookeeper-server.properties
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/conf/zookeeper-server.properties b/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/conf/zookeeper-server.properties
new file mode 100644
index 0000000..dd71ffc
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/conf/zookeeper-server.properties
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# the directory where the snapshot is stored.
+dataDir=/tmp/dev-zookeeper-data
+# the port at which the clients will connect
+clientPort=2181
+# disable the per-ip limit on the number of connections since this is a non-production config
+maxClientCnxns=0
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/pom.xml b/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/pom.xml
new file mode 100644
index 0000000..bb5b0e3
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/pom.xml
@@ -0,0 +1,108 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- ~ Licensed to the Apache Software Foundation (ASF) under one or more
+	~ contributor license agreements. See the NOTICE file distributed with ~
+	this work for additional information regarding copyright ownership. ~ The
+	ASF licenses this file to You under the Apache License, Version 2.0 ~ (the
+	"License"); you may not use this file except in compliance with ~ the License.
+	You may obtain a copy of the License at ~ ~ http://www.apache.org/licenses/LICENSE-2.0
+	~ ~ Unless required by applicable law or agreed to in writing, software ~
+	distributed under the License is distributed on an "AS IS" BASIS, ~ WITHOUT
+	WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ~ See the
+	License for the specific language governing permissions and ~ limitations
+	under the License. -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.eagle</groupId>
+        <artifactId>eagle-alert</artifactId>
+        <version>0.4.0-incubating-SNAPSHOT</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+
+    <artifactId>alert-devtools</artifactId>
+
+    <properties>
+        <maven-scala.version>2.15.0</maven-scala.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.eagle</groupId>
+            <artifactId>alert-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-kafka</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>${kafka.artifact.id}</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-databind</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <sourceDirectory>src/main/java</sourceDirectory>
+
+        <plugins>
+            <plugin>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <version>${maven-compiler.version}</version>
+            </plugin>
+            <!-- <plugin>
+                <groupId>org.scala-tools</groupId>
+                <artifactId>maven-scala-plugin</artifactId>
+                <version>${maven-scala.version}</version>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>compile</goal>
+                            <goal>testCompile</goal>
+                        </goals>
+                        <configuration>
+                            <args>
+                                <arg>-make:transitive</arg>
+                                <arg>-dependencyfile</arg>
+                                <arg>${project.build.directory}/.scala_dependencies</arg>
+                            </args>
+                            <sourceDir>src/main/scala</sourceDir>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin> -->
+
+            <plugin>
+                <artifactId>maven-dependency-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <phase>install</phase>
+                        <goals>
+                            <goal>copy-dependencies</goal>
+                        </goals>
+                        <configuration>
+                            <outputDirectory>${project.build.directory}/lib</outputDirectory>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/src/main/java/org/apache/eagle/alert/tools/KafkaConsumerOffset.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/src/main/java/org/apache/eagle/alert/tools/KafkaConsumerOffset.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/src/main/java/org/apache/eagle/alert/tools/KafkaConsumerOffset.java
new file mode 100644
index 0000000..8f98d71
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/src/main/java/org/apache/eagle/alert/tools/KafkaConsumerOffset.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.tools;
+
+import java.util.Map;
+
+public class KafkaConsumerOffset {
+    public Map<String, String> topology;
+    public Long offset;
+    public Long partition;
+    public Map<String, String> broker;
+    public String topic;
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/src/main/java/org/apache/eagle/alert/tools/KafkaConsumerOffsetFetcher.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/src/main/java/org/apache/eagle/alert/tools/KafkaConsumerOffsetFetcher.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/src/main/java/org/apache/eagle/alert/tools/KafkaConsumerOffsetFetcher.java
new file mode 100644
index 0000000..4fe471c
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/src/main/java/org/apache/eagle/alert/tools/KafkaConsumerOffsetFetcher.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.tools;
+
+import com.fasterxml.jackson.databind.Module;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.jsontype.NamedType;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryNTimes;
+import org.apache.eagle.alert.config.ZKConfig;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class KafkaConsumerOffsetFetcher {
+    public CuratorFramework curator;
+    public String zkRoot;
+    public ObjectMapper mapper;
+    public String zkPathToPartition;
+
+    public KafkaConsumerOffsetFetcher(ZKConfig config, String ... parameters) {
+        try {
+            this.curator = CuratorFrameworkFactory.newClient(config.zkQuorum, config.zkSessionTimeoutMs, 15000,
+                    new RetryNTimes(config.zkRetryTimes, config.zkRetryInterval));
+            curator.start();
+            this.zkRoot = config.zkRoot;
+            mapper = new ObjectMapper();
+            Module module = new SimpleModule("offset").registerSubtypes(new NamedType(KafkaConsumerOffset.class));
+            mapper.registerModule(module);
+            zkPathToPartition = String.format(config.zkRoot, parameters);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public Map<String, Long> fetch() throws Exception {
+        Map<String, Long> map = new HashMap<String, Long>();
+        if (curator.checkExists().forPath(zkPathToPartition) != null) {
+            List<String> partitions = curator.getChildren().forPath(zkPathToPartition);
+            for (String partition : partitions) {
+                String partitionPath = zkPathToPartition + "/" + partition;
+                String data = new String(curator.getData().forPath(partitionPath));
+                KafkaConsumerOffset offset = mapper.readValue(data, KafkaConsumerOffset.class);
+                map.put(partition, offset.offset);
+            }
+        }
+        return map;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/src/main/java/org/apache/eagle/alert/tools/KafkaLatestOffsetFetcher.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/src/main/java/org/apache/eagle/alert/tools/KafkaLatestOffsetFetcher.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/src/main/java/org/apache/eagle/alert/tools/KafkaLatestOffsetFetcher.java
new file mode 100644
index 0000000..b685e5c
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/src/main/java/org/apache/eagle/alert/tools/KafkaLatestOffsetFetcher.java
@@ -0,0 +1,97 @@
+package org.apache.eagle.alert.tools;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import java.util.*;
+import kafka.api.PartitionOffsetRequestInfo;
+import kafka.common.TopicAndPartition;
+import kafka.javaapi.*;
+import kafka.javaapi.consumer.SimpleConsumer;
+
+public class KafkaLatestOffsetFetcher {
+
+    private List<String> brokerList;
+    private int port;
+
+    public KafkaLatestOffsetFetcher(String brokerList) {
+        this.brokerList = new ArrayList<>();
+        String[] brokers = brokerList.split(",");
+        for (String broker : brokers) {
+            this.brokerList.add(broker.split(":")[0]);
+        }
+        this.port = Integer.valueOf(brokers[0].split(":")[1]);
+    }
+
+    public Map<Integer, Long> fetch(String topic, int partitionCount) {
+        Map<Integer, PartitionMetadata> metadatas = fetchPartitionMetadata(brokerList, port, topic, partitionCount);
+        Map<Integer, Long> ret = new HashMap<>();
+        for (int partition = 0; partition < partitionCount; partition++) {
+            PartitionMetadata metadata = metadatas.get(partition);
+            if (metadata == null || metadata.leader() == null) {
+                ret.put(partition, -1L);
+                //throw new RuntimeException("Can't find Leader for Topic and Partition. Exiting");
+            }
+            String leadBroker = metadata.leader().host();
+            String clientName = "Client_" + topic + "_" + partition;
+            SimpleConsumer consumer = new SimpleConsumer(leadBroker, port, 100000, 64 * 1024, clientName);
+            long latestOffset = getLatestOffset(consumer, topic, partition, clientName);
+            if (consumer != null) consumer.close();
+            ret.put(partition, latestOffset);
+        }
+        return ret;
+    }
+
+    public long getLatestOffset(SimpleConsumer consumer, String topic, int partition, String clientName) {
+        TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
+        Map<TopicAndPartition, kafka.api.PartitionOffsetRequestInfo> requestInfo = new HashMap<>();
+        requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(kafka.api.OffsetRequest.LatestTime(), 1));
+        kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
+        OffsetResponse response = consumer.getOffsetsBefore(request);
+        if (response.hasError()) {
+            throw new RuntimeException("Error fetching data offset from the broker. Reason: " + response.errorCode(topic, partition) );
+        }
+        long[] offsets = response.offsets(topic, partition);
+        return offsets[0];
+    }
+
+    private Map<Integer, PartitionMetadata> fetchPartitionMetadata(List<String> brokerList, int port, String topic, int partitionCount) {
+        Map<Integer, PartitionMetadata> partitionMetadata = new HashMap<>();
+        for (String broker : brokerList) {
+            SimpleConsumer consumer = null;
+            try {
+                consumer = new SimpleConsumer(broker, port, 100000, 64 * 1024, "leaderLookup");
+                List<String> topics = Collections.singletonList(topic);
+                TopicMetadataRequest req = new TopicMetadataRequest(topics);
+                kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);
+
+                List<TopicMetadata> metaData = resp.topicsMetadata();
+                for (TopicMetadata item : metaData) {
+                    for (PartitionMetadata part : item.partitionsMetadata()) {
+                        partitionMetadata.put(part.partitionId(), part);
+                    }
+                }
+                if (partitionMetadata.size() == partitionCount) break;
+            } catch (Exception e) {
+                throw new RuntimeException("Error communicating with Broker [" + broker + "] " + "to find Leader for [" + topic + "] Reason: ", e);
+            } finally {
+                if (consumer != null) consumer.close();
+            }
+        }
+        return partitionMetadata;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/src/main/scala/org/apache/eagle/contrib/kafka/ProducerTool.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/src/main/scala/org/apache/eagle/contrib/kafka/ProducerTool.scala b/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/src/main/scala/org/apache/eagle/contrib/kafka/ProducerTool.scala
new file mode 100644
index 0000000..830e9ac
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/src/main/scala/org/apache/eagle/contrib/kafka/ProducerTool.scala
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.contrib.kafka
+
+import java.io._
+import java.nio.charset.StandardCharsets
+import java.util.Properties
+import java.util.concurrent.Executors
+
+import joptsimple._
+import kafka.message._
+import kafka.producer.ConsoleProducer.{LineMessageReader, MessageReader}
+import kafka.producer.{KeyedMessage, Producer, ProducerConfig}
+import kafka.serializer._
+import org.apache.commons.io.FileUtils
+
+import scala.collection.JavaConversions._
+
+object ProducerTool {
+  def main(args: Array[String]) {
+    val parser = new OptionParser
+    val topicOpt = parser.accepts("topic", "REQUIRED: The topic id to produce messages to.")
+      .withRequiredArg
+      .describedAs("topic")
+      .ofType(classOf[String])
+    val brokerListOpt = parser.accepts("broker-list", "REQUIRED: The broker list string in the form HOST1:PORT1,HOST2:PORT2.")
+      .withRequiredArg
+      .describedAs("broker-list")
+      .ofType(classOf[String])
+    val syncOpt = parser.accepts("sync", "If set message send requests to the brokers are synchronously, one at a time as they arrive.")
+    val compressOpt = parser.accepts("compress", "If set, messages batches are sent compressed")
+    val batchSizeOpt = parser.accepts("batch-size", "Number of messages to send in a single batch if they are not being sent synchronously.")
+      .withRequiredArg
+      .describedAs("size")
+      .ofType(classOf[java.lang.Integer])
+      .defaultsTo(200)
+    val messageSendMaxRetriesOpt = parser.accepts("message-send-max-retries", "Brokers can fail receiving the message for multiple reasons, and being unavailable transiently is just one of them. This property specifies the number of retires before the producer give up and drop this message.")
+      .withRequiredArg
+      .ofType(classOf[java.lang.Integer])
+      .defaultsTo(3)
+    val retryBackoffMsOpt = parser.accepts("retry-backoff-ms", "Before each retry, the producer refreshes the metadata of relevant topics. Since leader election takes a bit of time, this property specifies the amount of time that the producer waits before refreshing the metadata.")
+      .withRequiredArg
+      .ofType(classOf[java.lang.Long])
+      .defaultsTo(100l)
+    val sendTimeoutOpt = parser.accepts("timeout", "If set and the producer is running in asynchronous mode, this gives the maximum amount of time" +
+      " a message will queue awaiting suffient batch size. The value is given in ms.")
+      .withRequiredArg
+      .describedAs("timeout_ms")
+      .ofType(classOf[java.lang.Long])
+      .defaultsTo(1000l)
+    val queueSizeOpt = parser.accepts("queue-size", "If set and the producer is running in asynchronous mode, this gives the maximum amount of " +
+      " messages will queue awaiting suffient batch size.")
+      .withRequiredArg
+      .describedAs("queue_size")
+      .ofType(classOf[java.lang.Long])
+      .defaultsTo(10000l)
+    val queueEnqueueTimeoutMsOpt = parser.accepts("queue-enqueuetimeout-ms", "Timeout for event enqueue")
+      .withRequiredArg
+      .describedAs("queue enqueuetimeout ms")
+      .ofType(classOf[java.lang.Long])
+      .defaultsTo(Int.MaxValue.toLong)
+    val requestRequiredAcksOpt = parser.accepts("request-required-acks", "The required acks of the producer requests")
+      .withRequiredArg
+      .describedAs("request required acks")
+      .ofType(classOf[java.lang.Integer])
+      .defaultsTo(0)
+    val requestTimeoutMsOpt = parser.accepts("request-timeout-ms", "The ack timeout of the producer requests. Value must be non-negative and non-zero")
+      .withRequiredArg
+      .describedAs("request timeout ms")
+      .ofType(classOf[java.lang.Integer])
+      .defaultsTo(1500)
+    val valueEncoderOpt = parser.accepts("value-serializer", "The class name of the message encoder implementation to use for serializing values.")
+      .withRequiredArg
+      .describedAs("encoder_class")
+      .ofType(classOf[java.lang.String])
+      .defaultsTo(classOf[StringEncoder].getName)
+    val keyEncoderOpt = parser.accepts("key-serializer", "The class name of the message encoder implementation to use for serializing keys.")
+      .withRequiredArg
+      .describedAs("encoder_class")
+      .ofType(classOf[java.lang.String])
+      .defaultsTo(classOf[StringEncoder].getName)
+    val messageReaderOpt = parser.accepts("line-reader", "The class name of the class to use for reading lines from standard in. " +
+      "By default each line is read as a separate message.")
+      .withRequiredArg
+      .describedAs("reader_class")
+      .ofType(classOf[java.lang.String])
+      .defaultsTo(classOf[LineMessageReader].getName)
+    val socketBufferSizeOpt = parser.accepts("socket-buffer-size", "The size of the tcp RECV size.")
+      .withRequiredArg
+      .describedAs("size")
+      .ofType(classOf[java.lang.Integer])
+      .defaultsTo(1024 * 100)
+    val propertyOpt = parser.accepts("property", "A mechanism to pass user-defined properties in the form key=value to the message reader. " +
+      "This allows custom configuration for a user-defined message reader.")
+      .withRequiredArg
+      .describedAs("prop")
+      .ofType(classOf[String])
+
+    val dataOpt = parser.accepts("data", "Input message data content")
+      .withRequiredArg()
+      .describedAs("input message data")
+      .ofType(classOf[String])
+
+    val fileOpt = parser.accepts("file", "Input message file name")
+      .withRequiredArg()
+      .describedAs("input message file")
+      .ofType(classOf[String])
+
+    val replicationOpt = parser.accepts("replication", "Message data replication count")
+      .withRequiredArg()
+      .describedAs("message replication count, default: 1")
+      .ofType(classOf[java.lang.Integer])
+      .defaultsTo(1)
+
+    val options = parser.parse(args: _*)
+    for (arg <- List(topicOpt, brokerListOpt)) {
+      if (!options.has(arg)) {
+        System.err.println("Missing required argument \"" + arg + "\"")
+        parser.printHelpOn(System.err)
+        System.exit(1)
+      }
+    }
+
+    val topic = options.valueOf(topicOpt)
+    val brokerList = options.valueOf(brokerListOpt)
+    val sync = options.has(syncOpt)
+    val compress = options.has(compressOpt)
+    val batchSize = options.valueOf(batchSizeOpt)
+    val sendTimeout = options.valueOf(sendTimeoutOpt)
+    val queueSize = options.valueOf(queueSizeOpt)
+    val queueEnqueueTimeoutMs = options.valueOf(queueEnqueueTimeoutMsOpt)
+    val requestRequiredAcks = options.valueOf(requestRequiredAcksOpt)
+    val requestTimeoutMs = options.valueOf(requestTimeoutMsOpt)
+    val keyEncoderClass = options.valueOf(keyEncoderOpt)
+    val valueEncoderClass = options.valueOf(valueEncoderOpt)
+    val readerClass = options.valueOf(messageReaderOpt)
+    val socketBuffer = options.valueOf(socketBufferSizeOpt)
+    val cmdLineProps = parseLineReaderArgs(options.valuesOf(propertyOpt))
+    val messageData = options.valuesOf(dataOpt)
+    val messageFile = options.valuesOf(fileOpt)
+    val messageReplication = options.valueOf(replicationOpt)
+
+    cmdLineProps.put("topic", topic)
+    val props = new Properties()
+    props.put("metadata.broker.list", brokerList)
+    val codec = if (compress) DefaultCompressionCodec.codec else NoCompressionCodec.codec
+    props.put("compression.codec", codec.toString)
+    props.put("producer.type", if (sync) "sync" else "async")
+    if (options.has(batchSizeOpt))
+      props.put("batch.num.messages", batchSize.toString)
+
+    props.put("message.send.max.retries", options.valueOf(messageSendMaxRetriesOpt).toString)
+    props.put("retry.backoff.ms", options.valueOf(retryBackoffMsOpt).toString)
+    props.put("queue.buffering.max.ms", sendTimeout.toString)
+    props.put("queue.buffering.max.messages", queueSize.toString)
+    props.put("queue.enqueue.timeout.ms", queueEnqueueTimeoutMs.toString)
+    props.put("request.required.acks", requestRequiredAcks.toString)
+    props.put("request.timeout.ms", requestTimeoutMs.toString)
+    props.put("key.serializer.class", keyEncoderClass)
+    props.put("serializer.class", valueEncoderClass)
+    props.put("send.buffer.bytes", socketBuffer.toString)
+
+    val reader = Class.forName(readerClass).newInstance().asInstanceOf[MessageReader[AnyRef, AnyRef]]
+
+    if (messageData.size()>0) {
+      reader.init(new ByteArrayInputStream(messageData.get(0).getBytes(StandardCharsets.UTF_8)), cmdLineProps)
+    } else if (messageFile.size()>0) {
+      reader.init(FileUtils.openInputStream(new File(messageFile.get(0))), cmdLineProps)
+    } else {
+      reader.init(System.in, cmdLineProps)
+    }
+
+    try {
+      val executor = Executors.newCachedThreadPool()
+      val producer = new Producer[AnyRef, AnyRef](new ProducerConfig(props))
+      Runtime.getRuntime.addShutdownHook(new Thread() {
+        override def run(): Unit = {
+          producer.close()
+          executor.shutdown()
+        }
+      })
+
+      var message: KeyedMessage[AnyRef, AnyRef] = null
+      do {
+        message = reader.readMessage()
+        if (message != null) {
+          var i = 0
+          while (i < messageReplication) {
+            executor.submit(new Runnable {
+              override def run(): Unit = producer.send(message)
+            }).get()
+            i += 1
+          }
+          System.out.println("Produced %d messages".format(messageReplication))
+        }
+      } while (message != null)
+    } catch {
+      case e: Exception =>
+        e.printStackTrace()
+        System.exit(1)
+    }
+    System.exit(0)
+  }
+
+  def parseLineReaderArgs(args: Iterable[String]): Properties = {
+    val splits = args.map(_ split "=").filterNot(_ == null).filterNot(_.length == 0)
+    if (!splits.forall(_.length == 2)) {
+      System.err.println("Invalid line reader properties: " + args.mkString(" "))
+      System.exit(1)
+    }
+    val props = new Properties
+    for (a <- splits)
+      props.put(a(0), a(1))
+    props
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/src/test/java/org/apache/eagle/alert/tools/TestKafkaOffset.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/src/test/java/org/apache/eagle/alert/tools/TestKafkaOffset.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/src/test/java/org/apache/eagle/alert/tools/TestKafkaOffset.java
new file mode 100644
index 0000000..e4d9f8f
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/src/test/java/org/apache/eagle/alert/tools/TestKafkaOffset.java
@@ -0,0 +1,69 @@
+package org.apache.eagle.alert.tools;
+/*
+ *
+ *    Licensed to the Apache Software Foundation (ASF) under one or more
+ *    contributor license agreements.  See the NOTICE file distributed with
+ *    this work for additional information regarding copyright ownership.
+ *    The ASF licenses this file to You under the Apache License, Version 2.0
+ *    (the "License"); you may not use this file except in compliance with
+ *    the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *    Unless required by applicable law or agreed to in writing, software
+ *    distributed under the License is distributed on an "AS IS" BASIS,
+ *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *    See the License for the specific language governing permissions and
+ *    limitations under the License.
+ *
+ */
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import org.apache.eagle.alert.config.ZKConfig;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.util.Map;
+
+public class TestKafkaOffset {
+    @Ignore
+    @Test
+    public void test() throws Exception {
+        System.setProperty("config.resource", "/kafka-offset-test.application.conf");
+        Config config = ConfigFactory.load();
+        ZKConfig zkConfig = new ZKConfig();
+        zkConfig.zkQuorum = config.getString("dataSourceConfig.zkQuorum");
+        zkConfig.zkRoot = config.getString("dataSourceConfig.transactionZKRoot");
+        zkConfig.zkSessionTimeoutMs = config.getInt("dataSourceConfig.zkSessionTimeoutMs");
+        zkConfig.connectionTimeoutMs = config.getInt("dataSourceConfig.connectionTimeoutMs");
+        zkConfig.zkRetryTimes = config.getInt("dataSourceConfig.zkRetryTimes");
+        zkConfig.zkRetryInterval = config.getInt("dataSourceConfig.zkRetryInterval");
+
+        String topic = "testTopic1";
+        String topology = "alertUnitTopology_1";
+
+        while(true) {
+            KafkaConsumerOffsetFetcher consumerOffsetFetcher = new KafkaConsumerOffsetFetcher(zkConfig, topic, topology);
+            String kafkaBrokerList = config.getString("dataSourceConfig.kafkaBrokerList");
+            KafkaLatestOffsetFetcher latestOffsetFetcher = new KafkaLatestOffsetFetcher(kafkaBrokerList);
+
+            Map<String, Long> consumedOffset = consumerOffsetFetcher.fetch();
+            if(consumedOffset.size() == 0){
+                System.out.println("no any consumer offset found for this topic " + topic);
+            }
+            Map<Integer, Long> latestOffset = latestOffsetFetcher.fetch(topic, consumedOffset.size());
+            if(latestOffset.size() == 0){
+                System.out.println("no any latest offset found for this topic " + topic);
+            }
+            for (Map.Entry<String, Long> entry : consumedOffset.entrySet()) {
+                String partition = entry.getKey();
+                Integer partitionNumber = Integer.valueOf(partition.split("_")[1]);
+                Long lag = latestOffset.get(partitionNumber) - entry.getValue();
+                System.out.println(String.format("parition %s, total: %d, consumed: %d, lag: %d",
+                        partition, latestOffset.get(partitionNumber), entry.getValue(), lag));
+            }
+            Thread.sleep(10000);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/src/test/resources/kafka-offset-test.application.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/src/test/resources/kafka-offset-test.application.conf b/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/src/test/resources/kafka-offset-test.application.conf
new file mode 100644
index 0000000..4d2d4ff
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/src/test/resources/kafka-offset-test.application.conf
@@ -0,0 +1,25 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+"dataSourceConfig":{
+  "kafkaBrokerList" : "kafka-broker:9092",
+  "zkQuorum" : "zk-admin:2181",
+  "transactionZKRoot" : "/consumers/eagle_consumer/%s/%s",
+  "zkSessionTimeoutMs" : 10000,
+  "connectionTimeoutMs" : 10000,
+  "zkRetryTimes" : 3,
+  "zkRetryInterval" : 3000
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/src/test/resources/log4j.properties b/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/src/test/resources/log4j.properties
new file mode 100644
index 0000000..d59ded6
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-devtools/src/test/resources/log4j.properties
@@ -0,0 +1,21 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+log4j.rootLogger=INFO, stdout
+
+# standard output
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/.gitignore
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/.gitignore b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/.gitignore
new file mode 100644
index 0000000..b83d222
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/.gitignore
@@ -0,0 +1 @@
+/target/

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/pom.xml b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/pom.xml
new file mode 100644
index 0000000..d6ae04c
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/pom.xml
@@ -0,0 +1,149 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- ~ /* ~ * Licensed to the Apache Software Foundation (ASF) under one
+	or more ~ * contributor license agreements. See the NOTICE file distributed
+	with ~ * this work for additional information regarding copyright ownership.
+	~ * The ASF licenses this file to You under the Apache License, Version 2.0
+	~ * (the "License"); you may not use this file except in compliance with
+	~ * the License. You may obtain a copy of the License at ~ * ~ * http://www.apache.org/licenses/LICENSE-2.0
+	~ * ~ * Unless required by applicable law or agreed to in writing, software
+	~ * distributed under the License is distributed on an "AS IS" BASIS, ~ *
+	WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+	~ * See the License for the specific language governing permissions and ~
+	* limitations under the License. ~ */ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.eagle</groupId>
+		<artifactId>eagle-alert</artifactId>
+		<version>0.4.0-incubating-SNAPSHOT</version>
+		<relativePath>../pom.xml</relativePath>
+	</parent>
+
+	<artifactId>alert-engine</artifactId>
+	<packaging>jar</packaging>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.eagle</groupId>
+			<artifactId>alert-common</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.storm</groupId>
+			<artifactId>storm-kafka</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.storm</groupId>
+			<artifactId>storm-core</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.kafka</groupId>
+			<artifactId>${kafka.artifact.id}</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>com.sun.jersey</groupId>
+			<artifactId>jersey-client</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>org.codehaus.jackson</groupId>
+			<artifactId>jackson-jaxrs</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>com.netflix.archaius</groupId>
+			<artifactId>archaius-core</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>org.wso2.siddhi</groupId>
+			<artifactId>siddhi-core</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>org.wso2.siddhi</groupId>
+			<artifactId>siddhi-query-api</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>org.wso2.siddhi</groupId>
+			<artifactId>siddhi-query-compiler</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>org.wso2.siddhi</groupId>
+			<artifactId>siddhi-extension-regex</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>org.wso2.siddhi</groupId>
+			<artifactId>siddhi-extension-string</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>junit</groupId>
+			<artifactId>junit</artifactId>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.kafka</groupId>
+			<artifactId>kafka-clients</artifactId>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>com.101tec</groupId>
+			<artifactId>zkclient</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>joda-time</groupId>
+			<artifactId>joda-time</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>com.typesafe</groupId>
+			<artifactId>config</artifactId>
+		</dependency>
+
+		<dependency>
+			<groupId>org.mockito</groupId>
+			<artifactId>mockito-all</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>com.google.guava</groupId>
+			<artifactId>guava</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>io.dropwizard.metrics</groupId>
+			<artifactId>metrics-jvm</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>org.mapdb</groupId>
+			<artifactId>mapdb</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>joda-time</groupId>
+			<artifactId>joda-time</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.velocity</groupId>
+			<artifactId>velocity</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>javax.mail</groupId>
+			<artifactId>mail</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.kafka</groupId>
+			<artifactId>kafka-clients</artifactId>
+		</dependency>
+	</dependencies>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-jar-plugin</artifactId>
+				<version>2.6</version>
+				<executions>
+					<execution>
+						<goals>
+							<goal>test-jar</goal>
+						</goals>
+					</execution>
+				</executions>
+			</plugin>
+		</plugins>
+	</build>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/AlertStreamCollector.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/AlertStreamCollector.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/AlertStreamCollector.java
new file mode 100755
index 0000000..7219271
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/AlertStreamCollector.java
@@ -0,0 +1,27 @@
+package org.apache.eagle.alert.engine;
+
+import org.apache.eagle.alert.engine.model.AlertStreamEvent;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+public interface AlertStreamCollector extends Collector<AlertStreamEvent> {
+    /**
+     * No need to be thread-safe, but should be called on in synchronous like in Storm bolt execute method
+     */
+    void flush();
+    void close();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/Collector.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/Collector.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/Collector.java
new file mode 100755
index 0000000..2f136ab
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/Collector.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine;
+
+@FunctionalInterface
+public interface Collector<T> {
+    /**
+     * Must make sure thread-safe
+     *
+     * @param t
+     */
+    void emit(T t);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/PartitionedEventCollector.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/PartitionedEventCollector.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/PartitionedEventCollector.java
new file mode 100755
index 0000000..3a99e98
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/PartitionedEventCollector.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine;
+
+import org.apache.eagle.alert.engine.model.PartitionedEvent;
+
+/**
+ * Executed in thread-safe trace
+ */
+public interface PartitionedEventCollector extends Collector<PartitionedEvent> {
+    /**
+     * @param event to be dropped
+     */
+    void drop(PartitionedEvent event);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/StreamContext.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/StreamContext.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/StreamContext.java
new file mode 100644
index 0000000..609378b
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/StreamContext.java
@@ -0,0 +1,27 @@
+package org.apache.eagle.alert.engine;
+
+import backtype.storm.metric.api.MultiCountMetric;
+
+import com.typesafe.config.Config;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+public interface StreamContext {
+    MultiCountMetric counter();
+
+    Config config();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/StreamContextImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/StreamContextImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/StreamContextImpl.java
new file mode 100644
index 0000000..9ebcb60
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/StreamContextImpl.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.eagle.alert.engine;
+
+import backtype.storm.metric.api.MultiCountMetric;
+import backtype.storm.task.TopologyContext;
+
+import com.typesafe.config.Config;
+
+public class StreamContextImpl implements StreamContext {
+    private final Config config;
+    private final MultiCountMetric counter;
+
+    public StreamContextImpl(Config config, MultiCountMetric counter, TopologyContext context) {
+        this.counter=counter;
+        this.config = config;
+    }
+
+    @Override
+    public MultiCountMetric counter() {
+        return this.counter;
+    }
+
+    @Override
+    public Config config() {
+        return this.config;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/UnitTopologyMain.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/UnitTopologyMain.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/UnitTopologyMain.java
new file mode 100644
index 0000000..2bca329
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/UnitTopologyMain.java
@@ -0,0 +1,59 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.alert.engine;
+
+import org.apache.eagle.alert.config.ZKConfig;
+import org.apache.eagle.alert.config.ZKConfigBuilder;
+import org.apache.eagle.alert.engine.coordinator.impl.ZKMetadataChangeNotifyService;
+import org.apache.eagle.alert.engine.runner.UnitTopologyRunner;
+
+import backtype.storm.generated.StormTopology;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+/**
+ * Since 5/3/16. Make sure unit topology can be started either from command line
+ * or from remote A few parameters for starting unit topology 1. number of spout
+ * tasks 2. number of router bolts 3. number of alert bolts 4. number of publish
+ * bolts
+ *
+ * Connections 1. spout and router bolt 2. router bolt and alert bolt 3. alert
+ * bolt and publish bolt
+ */
+public class UnitTopologyMain {
+
+    public static void main(String[] args) {
+        Config config = ConfigFactory.load();
+        ZKConfig zkConfig = ZKConfigBuilder.getZKConfig(config);
+        String topologyId = config.getString("topology.name");
+        ZKMetadataChangeNotifyService changeNotifyService = new ZKMetadataChangeNotifyService(zkConfig, topologyId);
+
+        new UnitTopologyRunner(changeNotifyService).run(topologyId, config);
+    }
+
+    public static StormTopology createTopology(Config config) {
+        ZKConfig zkConfig = ZKConfigBuilder.getZKConfig(config);
+        String topologyId = config.getString("topology.name");
+        ZKMetadataChangeNotifyService changeNotifyService = new ZKMetadataChangeNotifyService(zkConfig, topologyId);
+
+        return new UnitTopologyRunner(changeNotifyService).buildTopology(topologyId, config);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/IMetadataChangeNotifyService.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/IMetadataChangeNotifyService.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/IMetadataChangeNotifyService.java
new file mode 100644
index 0000000..22fe56a
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/IMetadataChangeNotifyService.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.coordinator;
+
+import java.io.Closeable;
+import java.io.Serializable;
+
+import org.apache.eagle.alert.engine.publisher.AlertPublishSpecListener;
+import org.apache.eagle.alert.engine.router.AlertBoltSpecListener;
+import org.apache.eagle.alert.engine.router.SpoutSpecListener;
+import org.apache.eagle.alert.engine.router.StreamRouterBoltSpecListener;
+
+import com.typesafe.config.Config;
+
+/**
+ * IMetadataChangeNotifyService defines the following features
+ * 1) initialization
+ * 2) register metadata change listener
+ *
+ * In distributed environment for example storm platform,
+ * subclass implementing this interface should have the following lifecycle
+ * 1. object is created in client machine
+ * 2. object is serialized and transferred to other machine
+ * 3. object is created through deserialization
+ * 4. invoke init() method to do initialization
+ * 5. invoke various registerListener to get notified of config change
+ * 6. invoke close() to release system resource
+ */
+public interface IMetadataChangeNotifyService extends Closeable,Serializable {
+    /**
+     *
+     * @param config
+     */
+    void init(Config config, MetadataType type);
+
+    void registerListener(SpoutSpecListener listener);
+
+    void registerListener(AlertBoltSpecListener listener);
+
+    void registerListener(StreamRouterBoltSpecListener listener);
+
+    void registerListener(AlertPublishSpecListener listener);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/MetadataType.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/MetadataType.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/MetadataType.java
new file mode 100644
index 0000000..ac68e26
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/MetadataType.java
@@ -0,0 +1,30 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.alert.engine.coordinator;
+
+/**
+ * Since 5/4/16.
+ */
+public enum MetadataType {
+    SPOUT,
+    STREAM_ROUTER_BOLT,
+    ALERT_BOLT,
+    ALERT_PUBLISH_BOLT
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamDefinitionNotFoundException.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamDefinitionNotFoundException.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamDefinitionNotFoundException.java
new file mode 100644
index 0000000..927eb8c
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamDefinitionNotFoundException.java
@@ -0,0 +1,25 @@
+package org.apache.eagle.alert.engine.coordinator;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+public class StreamDefinitionNotFoundException extends Exception {
+    private static final long serialVersionUID = 6027811718016485808L;
+
+    public StreamDefinitionNotFoundException(String streamId){
+        super("Stream definition not found: "+streamId);
+    }
+}
\ No newline at end of file