You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ra...@apache.org on 2016/08/16 08:31:41 UTC

[1/3] incubator-eagle git commit: EAGLE-421: JMX Metric APP use new application framework

Repository: incubator-eagle
Updated Branches:
  refs/heads/develop 46afec395 -> d3a7e480a


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1517c132/eagle-hadoop-metric/src/test/java/org/apache/eagle/hadoop/metric/TestHadoopMetricSiddhiQL.java
----------------------------------------------------------------------
diff --git a/eagle-hadoop-metric/src/test/java/org/apache/eagle/hadoop/metric/TestHadoopMetricSiddhiQL.java b/eagle-hadoop-metric/src/test/java/org/apache/eagle/hadoop/metric/TestHadoopMetricSiddhiQL.java
deleted file mode 100644
index c8096d6..0000000
--- a/eagle-hadoop-metric/src/test/java/org/apache/eagle/hadoop/metric/TestHadoopMetricSiddhiQL.java
+++ /dev/null
@@ -1,354 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.hadoop.metric;
-
-import org.junit.Assert;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.wso2.siddhi.core.ExecutionPlanRuntime;
-import org.wso2.siddhi.core.SiddhiManager;
-import org.wso2.siddhi.core.event.Event;
-import org.wso2.siddhi.core.query.output.callback.QueryCallback;
-import org.wso2.siddhi.core.stream.input.InputHandler;
-import org.wso2.siddhi.core.stream.output.StreamCallback;
-import org.wso2.siddhi.query.api.expression.constant.DoubleConstant;
-
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * Created on 1/17/16.
- */
-public class TestHadoopMetricSiddhiQL {
-
-    @Ignore
-    @Test
-    public void testNameNodeLag() throws Exception {
-        String ql = "define stream s (host string, timestamp long, metric string, component string, site string, value string);" +
-                " @info(name='query') " +
-                " from s[metric=='hadoop.namenode.dfs.lastwrittentransactionid' and host=='a' ]#window.externalTime(timestamp, 5 min) select * insert into tmp1;" +
-                " from s[metric=='hadoop.namenode.dfs.lastwrittentransactionid' and host=='b' ]#window.externalTime(timestamp, 5 min) select * insert into tmp2;" +
-                " from tmp1 , tmp2 select tmp1.timestamp as t1time, tmp2.timestamp as t2time, max(convert(tmp1.value, 'long')) - max(convert(tmp2.value, 'long')) as gap insert into tmp3;" +
-                " from tmp3[gap > 100] select * insert into tmp;"
-                ;
-
-        System.out.println("test name node log with multiple stream defined!");
-        testQL(ql, generateNameNodeLagEvents(), -1, true);
-    }
-
-    @Ignore
-    @Test
-    public void testNameNodeLag2_patternMatching() throws Exception {
-        String ql =
-            " define stream s (host string, timestamp long, metric string, component string, site string, value string); " +
-            " @info(name='query') " +
-            " from every a = s[metric=='hadoop.namenode.dfs.lastwrittentransactionid'] " +
-            "         -> b = s[metric=='hadoop.namenode.dfs.lastwrittentransactionid' and b.host != a.host " +
-                    " and (convert(a.value, 'long') + 100) < convert(value, 'long') ] " +
-            " within 5 min select a.host as hostA, b.host as hostB insert into tmp; ";
-
-        testQL(ql, generateNameNodeLagEvents(), -1);
-    }
-
-    private void testQL(String ql, List<Event> events, int i) throws Exception {
-        testQL(ql, events, i, false);
-    }
-
-    private void testQL(String ql, List<Event> events, int eventHappenCount, boolean useStreamCallback) throws InterruptedException {
-        SiddhiManager sm = new SiddhiManager();
-        ExecutionPlanRuntime runtime = sm.createExecutionPlanRuntime(ql);
-
-        InputHandler input = runtime.getInputHandler("s");
-
-        final AtomicInteger count = new AtomicInteger(0);
-        final CountDownLatch latch = new CountDownLatch(1);
-        // use stream call back or query callback
-        if (useStreamCallback) {
-            runtime.addCallback("tmp", new StreamCallback() {
-                AtomicInteger round = new AtomicInteger();
-
-                @Override
-                public void receive(Event[] events) {
-                    count.incrementAndGet();
-                    round.incrementAndGet();
-                    System.out.println("event round: " + round.get() + " event count : " + events.length);
-                    printEvents(events);
-                    latch.countDown();
-                }
-            });
-        } else {
-            runtime.addCallback("query", new QueryCallback() {
-                AtomicInteger round = new AtomicInteger();
-
-                @Override
-                public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
-                    count.incrementAndGet();
-                    round.incrementAndGet();
-                    System.out.println("event round: " + round.get() + " event count : " + inEvents.length);
-                    printEvents(inEvents);
-                    latch.countDown();
-                }
-            });
-        }
-
-        runtime.start();
-
-        for (Event e : events) {
-            input.send(e);
-        }
-
-        latch.await(10, TimeUnit.SECONDS);
-        Thread.sleep(3000);
-
-        System.out.println("callback count=" + count.get());
-        if (eventHappenCount >= 0) {
-            Assert.assertEquals(eventHappenCount, count.get());
-        } else {
-            Assert.assertTrue(count.get() > 0);
-        }
-
-        runtime.shutdown();
-        sm.shutdown();
-    }
-
-    private void printEvents(Event[] inEvents) {
-        for (Event e : inEvents) {
-            for(Object o : e.getData()) {
-                System.out.print(o);
-                System.out.print('\t');
-            }
-            System.out.println();
-        }
-    }
-
-    private List<Event> generateNameNodeLagEvents() {
-        List<Event> events = new LinkedList<>();
-
-        long base1 = System.currentTimeMillis();
-        long tbase1 = 1000;
-        long tbase2 = 1000;
-
-        int SIZE = 10;
-        // master / slave in sync, no events for these
-        for (int i = 0;i < SIZE; i++) {
-            base1 += 1000;
-            tbase1 += 100;
-            Event e = new Event();
-            e.setData(new Object[] {"a", base1, "hadoop.namenode.dfs.lastwrittentransactionid", "namenode", "sandbox", String.valueOf(tbase1)});
-            events.add(e);
-
-            tbase2 += 100;
-            e = new Event();
-            e.setData(new Object[] {"b", base1, "hadoop.namenode.dfs.lastwrittentransactionid", "namenode", "sandbox", String.valueOf(tbase2)});
-            events.add(e);
-        }
-
-
-        {
-            // make sure flush previous windows
-
-            base1 += 6 * 60 * 1000;
-            tbase1 = 3000;
-            Event e = new Event();
-            e.setData(new Object[]{"a", base1, "hadoop.namenode.dfs.lastwrittentransactionid", "namenode", "sandbox", String.valueOf(tbase1)});
-            events.add(e);
-
-            tbase2 = tbase1 + 110; // > 100, trigger an event
-            e = new Event();
-            e.setData(new Object[]{"b", base1, "hadoop.namenode.dfs.lastwrittentransactionid", "namenode", "sandbox", String.valueOf(tbase2)});
-            events.add(e);
-
-            // trigger event
-//            base1 = base1 + 6 * 60 * 1000;
-//            e = new Event();
-//            e.setData(new Object[]{"b", base1, "hadoop.namenode.dfs.lastwrittentransactionid", "namenode", "sandbox", String.valueOf(tbase2)});
-//            events.add(e);
-        }
-
-        return events;
-    }
-
-    /**
-    E.g. Alert if temperature of a room increases by 5 degrees within 10 min.
-            from every( e1=TempStream ) -> e2=TempStream[e1.roomNo==roomNo and (e1.temp + 5) <= temp ]
-                within 10 min
-            select e1.roomNo, e1.temp as initialTemp, e2.temp as finalTemp
-            insert into AlertStream;
-     */
-    @Ignore
-    @Test
-    public void testCase4_LiveDataNodeJoggle() throws Exception {
-
-        String ql = "define stream s (host string, timestamp long, metric string, component string, site string, value string);" +
-                " @info(name='query') " +
-                " from every (e1 = s[metric == \"hadoop.namenode.fsnamesystemstate.numlivedatanodes\" ]) -> " +
-                "             e2 = s[metric == e1.metric and host == e1.host and (convert(e1.value, \"long\") + 5) <= convert(value, \"long\") ]" +
-                " within 5 min " +
-                " select e1.metric, e1.host, e1.value as lowNum, e1.timestamp as start, e2.value as highNum, e2.timestamp as end " +
-                " insert into tmp;"
-                ;
-
-        testQL(ql, generateDataNodeJoggleEvents(), -1);
-    }
-
-    private List<Event> generateDataNodeJoggleEvents() {
-        List<Event> events = new LinkedList<>();
-
-        long base1 = System.currentTimeMillis();
-        long tbase1 = 1000;
-        long tbase2 = 5000;
-
-        int SIZE = 10;
-        // master / slave in sync
-        for (int i = 0;i < SIZE; i++) {
-            base1 += 1000;
-
-            Event e = new Event();
-            e.setData(new Object[] {"a", base1, "hadoop.namenode.fsnamesystemstate.numlivedatanodes", "namenode", "sandbox", String.valueOf(tbase1)});
-            events.add(e);
-
-            // inject b events, to test host a not disturb by this metric stream
-            e = new Event();
-            e.setData(new Object[] {"b", base1, "hadoop.namenode.fsnamesystemstate.numlivedatanodes", "namenode", "sandbox", String.valueOf(tbase2)});
-            events.add(e);
-        }
-
-        {
-            // insert an invalid
-            base1 += 1 * 60 * 1000;
-            tbase1 = 3000;
-            Event e = new Event();
-            e.setData(new Object[]{"a", base1, "hadoop.namenode.fsnamesystemstate.numlivedatanodes", "namenode", "sandbox", String.valueOf(tbase1)});
-            events.add(e);
-
-            // trigger event, we dont really care about this event value, just make sure above metri triggered
-            base1 = base1 + 100;
-            e = new Event();
-            e.setData(new Object[]{"b", base1, "hadoop.namenode.dfs.lastwrittentransactionid", "namenode", "sandbox", String.valueOf(tbase2)});
-            events.add(e);
-        }
-
-        return events;
-    }
-
-    @Test
-    public void testMissingBlocks() throws Exception {
-        String sql = " define stream s (host string, timestamp long, metric string, component string, site string, value double); " +
-                " @info(name='query') " +
-                " from s[metric == \"hadoop.namenode.dfs.missingblocks\" and convert(value, 'long') > 0]#window.externalTime(timestamp, 10 min) select metric, host, value, timestamp, component, site insert into tmp; ";
-
-        System.out.println(sql);
-
-        testQL(sql, generateMBEvents(), -1);
-    }
-
-    private List<Event> generateMBEvents() {
-        List<Event> events = new LinkedList<>();
-
-        long base1 = System.currentTimeMillis();
-        int SIZE = 3;
-        // master / slave in sync
-        for (int i = 0;i < SIZE; i++) {
-            base1 = base1 +1000;
-
-            Event e = new Event();
-            e.setData(new Object[] {"a", base1, "hadoop.namenode.dfs.missingblocks", "namenode", "sandbox", 0.0});
-            events.add(e);
-
-            // inject b events, to test host a not disturb by this metric stream
-            e = new Event();
-            e.setData(new Object[] {"b", base1, "hadoop.namenode.dfs.missingblocks", "namenode", "sandbox", 1.0});
-            events.add(e);
-        }
-        return events;
-    }
-
-    @Test
-    public void testLastCheckpointTime() throws Exception {
-        String ql = " define stream s (host string, timestamp long, metric string, component string, site string, value double); " +
-                " @info(name='query') " +
-                " from s[metric == \"hadoop.namenode.dfs.lastcheckpointtime\" and (convert(value, \"long\") + 18000000) < timestamp]#window.externalTime(timestamp ,10 min) select metric, host, value, timestamp, component, site insert into tmp;";
-
-        testQL(ql, generateLCPEvents(), -1);
-    }
-
-    private List<Event> generateLCPEvents() {
-        List<Event> events = new LinkedList<>();
-
-        long base1 = System.currentTimeMillis();
-        int SIZE = 3;
-        // master / slave in sync
-        for (int i = 0;i < SIZE; i++) {
-            base1 = base1 +1000;
-
-            Event e = new Event();
-            e.setData(new Object[] {"a", base1, "hadoop.namenode.dfs.lastcheckpointtime", "namenode", "sandbox", Double.valueOf(base1 - 18000000 - 1)});
-            events.add(e);
-
-            // inject b events, to test host a not disturb by this metric stream
-            e = new Event();
-            e.setData(new Object[] {"b", base1, "hadoop.namenode.dfs.lastcheckpointtime", "namenode", "sandbox", Double.valueOf(base1 - 18000000 - 1)});
-            events.add(e);
-        }
-        return events;
-    }
-
-    @Test
-    public void testNoActiveNamenodeFor3Times() throws Exception {
-        String sql = " define stream s (host string, timestamp long, metric string, component string, site string, value double); " +
-                " @info(name='query') " +
-                " from s[metric == \"hadoop.namenode.hastate.active.count\"]#window.length(3) select  metric, host, value, timestamp, component, site, avg(convert(value, \"long\")) as avgValue, count() as cnt having avgValue==0 and cnt==3  insert into tmp;";
-//        " from s[metric == \"hadoop.namenode.hastate.active.count\"]#window.length(3) select  metric, host, value, timestamp, component, site, min(convert(value, \"long\")) as minValue, max(convert(value, \"long\")) as maxValue, count() as cnt having minValue==0 and maxValue==0 and cnt==3  insert into tmp;";
-
-        System.out.println(sql);
-
-        testQL(sql, generateMBEvents_times_0(1), 0);
-        testQL(sql, generateMBEvents_times_0(2), 0);
-        testQL(sql, generateMBEvents_times_0(3), 1);
-    }
-
-    private List<Event> generateMBEvents_times_0(int times_0) {
-        List<Event> events = new LinkedList<>();
-
-        long base1 = System.currentTimeMillis();
-        double[] values = new double[3];
-        if(times_0 == 1){
-            values[0] = 1.0;
-            values[1] = 0.0;
-            values[2] = 1.0;
-        }else if(times_0 == 2){
-            values[0] = 1.0;
-            values[1] = 0.0;
-            values[2] = 0.0;
-        }else if(times_0 == 3){
-            values[0] = 0.0;
-            values[1] = 0.0;
-            values[2] = 0.0;
-        }
-        for(int i=0; i<3; i++) {
-            // master / slave in sync
-            base1 = base1 + 1000;
-            Event e = new Event();
-            e.setData(new Object[]{"a", base1, "hadoop.namenode.hastate.active.count", "namenode", "sandbox", values[i]});
-            events.add(e);
-        }
-        return events;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1517c132/eagle-hadoop-metric/src/test/resources/cassandra.json
----------------------------------------------------------------------
diff --git a/eagle-hadoop-metric/src/test/resources/cassandra.json b/eagle-hadoop-metric/src/test/resources/cassandra.json
deleted file mode 100644
index 645e314..0000000
--- a/eagle-hadoop-metric/src/test/resources/cassandra.json
+++ /dev/null
@@ -1,13 +0,0 @@
-{
-  "host": "/192.168.6.227",
-  "source": "/192.168.6.227",
-  "user": "jaspa",
-  "timestamp": 1455574202864,
-  "category": "QUERY",
-  "type": "CQL_SELECT",
-  "ks": "dg_keyspace",
-  "cf": "customer_details",
-  "operation": "CQL_SELECT",
-  "masked_columns": ["bank", "ccno", "email", "ip", "name", "sal", "ssn ", "tel", "url"],
-  "other_columns": ["id", "npi"]
-}
\ No newline at end of file


[3/3] incubator-eagle git commit: EAGLE-421: JMX Metric APP use new application framework

Posted by ra...@apache.org.
EAGLE-421: JMX Metric APP use new application framework

Remove the topology code since JMX app could be done in new alert engine directly.

This closes #344

Squashed commit of the following:

commit 1517c132658408952e790e27565496b9587da06b
Author: Ralph, Su <su...@gmail.com>
Date:   Mon Aug 15 11:28:56 2016 +0800

    EAGLE-421: JMX Metric APP use new application framework

    Remove the topology code since JMX app could be done in new alert engine directly.
Merge branch 'eagle421' into develop


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/d3a7e480
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/d3a7e480
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/d3a7e480

Branch: refs/heads/develop
Commit: d3a7e480af180a6e72f30dd826577e60f2e0e5f9
Parents: 46afec3 1517c13
Author: Ralph, Su <su...@gmail.com>
Authored: Tue Aug 16 16:31:27 2016 +0800
Committer: Ralph, Su <su...@gmail.com>
Committed: Tue Aug 16 16:31:27 2016 +0800

----------------------------------------------------------------------

----------------------------------------------------------------------



[2/3] incubator-eagle git commit: EAGLE-421: JMX Metric APP use new application framework

Posted by ra...@apache.org.
EAGLE-421: JMX Metric APP use new application framework

Remove the topology code since JMX app could be done in new alert engine directly.


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/1517c132
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/1517c132
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/1517c132

Branch: refs/heads/develop
Commit: 1517c132658408952e790e27565496b9587da06b
Parents: b4732cb
Author: Ralph, Su <su...@gmail.com>
Authored: Mon Aug 15 11:28:56 2016 +0800
Committer: Ralph, Su <su...@gmail.com>
Committed: Mon Aug 15 11:28:56 2016 +0800

----------------------------------------------------------------------
 eagle-hadoop-metric/pom.xml                     |  14 +-
 .../hadoop/metric/HadoopJmxApplication.java     |  74 ----
 .../eagle/hadoop/metric/JsonParserBolt.java     |  62 ----
 .../src/main/resources/application.conf         |  51 ---
 .../resources/capacityused-policy-import.sh     |  51 ---
 .../src/main/resources/datasources.json         |  19 +
 .../src/main/resources/eagle-env.sh             |  44 ---
 .../src/main/resources/hadoop-metric-init.sh    | 180 ----------
 .../src/main/resources/hadoopjmx.yaml           |  18 -
 .../src/main/resources/hastate-policy-import.sh |  51 ---
 .../lastcheckpointtime-policy-import.sh         |  51 ---
 .../src/main/resources/log4j.properties         |  35 --
 .../resources/missingblock-policy-import.sh     |  51 ---
 .../main/resources/namenodelag-policy-import.sh |  49 ---
 .../main/resources/nodecount-policy-import.sh   |  51 ---
 .../src/main/resources/policies.json            | 169 +++++++++
 .../src/main/resources/publishments.json        |  33 ++
 .../resources/safemodecheck-policy-import.sh    |  51 ---
 .../sanbox/capacityused-policy-import.sh        |  51 ---
 .../resources/sanbox/hastate-policy-import.sh   |  51 ---
 .../sanbox/lastcheckpointtime-policy-import.sh  |  51 ---
 .../sanbox/missingblock-policy-import.sh        |  51 ---
 .../sanbox/namenodehanoactive-policy-import.sh  |  52 ---
 ...nodehawithmorethanoneactive-policy-import.sh |  53 ---
 .../sanbox/namenodelag-policy-import.sh         |  49 ---
 .../resources/sanbox/nodecount-policy-import.sh |  51 ---
 .../resourcemanagerhanoactive-policy-import.sh  |  53 ---
 ...agerhawithmorethanoneactive-policy-import.sh |  54 ---
 .../sanbox/safemodecheck-policy-import.sh       |  51 ---
 .../src/main/resources/streamdefinitions.json   |  47 +++
 .../metric/HadoopJmxMetricDeserializerTest.java |  36 --
 .../hadoop/metric/TestHadoopMetricSiddhiQL.java | 354 -------------------
 .../src/test/resources/cassandra.json           |  13 -
 33 files changed, 271 insertions(+), 1800 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1517c132/eagle-hadoop-metric/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-hadoop-metric/pom.xml b/eagle-hadoop-metric/pom.xml
index 15eea00..b397e37 100644
--- a/eagle-hadoop-metric/pom.xml
+++ b/eagle-hadoop-metric/pom.xml
@@ -15,7 +15,8 @@
   See the License for the specific language governing permissions and
   limitations under the License.
 -->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <parent>
         <artifactId>eagle-parent</artifactId>
         <groupId>org.apache.eagle</groupId>
@@ -25,15 +26,6 @@
 
     <artifactId>eagle-hadoop-metric</artifactId>
     <dependencies>
-        <dependency>
-            <groupId>org.apache.eagle</groupId>
-            <artifactId>eagle-stream-process-api</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.eagle</groupId>
-            <artifactId>eagle-app-base</artifactId>
-            <version>${project.version}</version>
-        </dependency>
     </dependencies>
+
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1517c132/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/HadoopJmxApplication.java
----------------------------------------------------------------------
diff --git a/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/HadoopJmxApplication.java b/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/HadoopJmxApplication.java
deleted file mode 100644
index 40d1a24..0000000
--- a/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/HadoopJmxApplication.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.hadoop.metric;
-
-import backtype.storm.generated.StormTopology;
-import backtype.storm.topology.BoltDeclarer;
-import backtype.storm.topology.IRichSpout;
-import backtype.storm.topology.TopologyBuilder;
-import backtype.storm.tuple.Fields;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import org.apache.eagle.app.StormApplication;
-import org.apache.eagle.app.environment.impl.StormEnvironment;
-import org.apache.eagle.app.sink.StormStreamSink;
-import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSpoutProvider;
-import storm.kafka.StringScheme;
-
-/**
- * Since 8/12/16.
- * This application just pass through data from jmx metric
- * For persistence or alert purpose, it is not necessary to start application
- * But keep this application in case of future business process
- *
- * Note: this application should be run as multiple instances based on different topic for data source
- */
-public class HadoopJmxApplication extends StormApplication {
-    public final static String SPOUT_TASK_NUM = "topology.numOfSpoutTasks";
-    public final static String PARSER_TASK_NUM = "topology.numOfParserTasks";
-    public final static String SINK_TASK_NUM = "topology.numOfSinkTasks";
-
-    @Override
-    public StormTopology execute(Config config, StormEnvironment environment) {
-        int numOfSpoutTasks = config.getInt(SPOUT_TASK_NUM);
-        int numOfParserTasks = config.getInt(PARSER_TASK_NUM);
-        int numOfSinkTasks = config.getInt(SINK_TASK_NUM);
-
-        TopologyBuilder builder = new TopologyBuilder();
-
-        KafkaSpoutProvider provider = new KafkaSpoutProvider();
-        IRichSpout spout = provider.getSpout(config);
-        builder.setSpout("ingest", spout, numOfSpoutTasks);
-
-        JsonParserBolt bolt = new JsonParserBolt();
-        BoltDeclarer boltDeclarer = builder.setBolt("parserBolt", bolt, numOfParserTasks);
-        boltDeclarer.fieldsGrouping("ingest", new Fields(StringScheme.STRING_SCHEME_KEY));
-
-        StormStreamSink sinkBolt = environment.getStreamSink("hadoop_jmx_stream",config);
-        BoltDeclarer kafkaBoltDeclarer = builder.setBolt("kafkaSink", sinkBolt, numOfSinkTasks);
-        kafkaBoltDeclarer.fieldsGrouping("parserBolt", new Fields("f1"));
-
-        return builder.createTopology();
-    }
-
-    public static void main(String[] args){
-        Config config = ConfigFactory.load();
-        HadoopJmxApplication app = new HadoopJmxApplication();
-        app.run(config);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1517c132/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/JsonParserBolt.java
----------------------------------------------------------------------
diff --git a/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/JsonParserBolt.java b/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/JsonParserBolt.java
deleted file mode 100644
index 7ca5ba6..0000000
--- a/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/JsonParserBolt.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.hadoop.metric;
-
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichBolt;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Arrays;
-import java.util.Map;
-
-/**
- * Since 8/14/16.
- */
-public class JsonParserBolt extends BaseRichBolt {
-    private Logger LOG = LoggerFactory.getLogger(JsonParserBolt.class);
-    private OutputCollector collector;
-    private ObjectMapper mapper = new ObjectMapper();
-
-    @Override
-    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
-        this.collector = collector;
-    }
-
-    @Override
-    public void execute(Tuple input) {
-        String msg = input.getString(0);
-        try {
-            Map ret = mapper.readValue(msg, Map.class);
-            collector.emit(Arrays.asList(ret));
-        }catch(Exception ex){
-            LOG.error("error in passing json message", ex);
-        }finally{
-            collector.ack(input);
-        }
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        declarer.declare(new Fields("msg"));
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1517c132/eagle-hadoop-metric/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-hadoop-metric/src/main/resources/application.conf b/eagle-hadoop-metric/src/main/resources/application.conf
deleted file mode 100644
index e75355f..0000000
--- a/eagle-hadoop-metric/src/main/resources/application.conf
+++ /dev/null
@@ -1,51 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-{
-  "appId" : "HadoopJmxApplication",
-  "mode" : "LOCAL",
-  "siteId" : "testsite",
-  "topology" : {
-    "numOfSpoutTasks" : 2,
-    "numOfParserTasks" : 2,
-    "numOfSinkTasks" : 2
-  },
-  "dataSourceConfig": {
-    "topic" : "jmx_metric",
-    "zkConnection" : "server.eagle.apache.org:2181",
-    "zkConnectionTimeoutMS" : 15000,
-    "consumerGroupId" : "EagleConsumer",
-    "fetchSize" : 1048586,
-    "transactionZKServers" : "server.eagle.apache.org",
-    "transactionZKPort" : 2181,
-    "transactionZKRoot" : "/consumers",
-    "transactionStateUpdateMS" : 2000
-    "schemeCls" : "storm.kafka.StringScheme"
-  },
-  "eagleProps" : {
-    "eagleService": {
-      "host": "localhost",
-      "port": 9090,
-      "username": "admin",
-      "password": "secret"
-    }
-  },
-  "dataSinkConfig": {
-    "topic" : "jmx_metric_parsed",
-    "brokerList" : "server.eagle.apache.org:6667",
-    "serializerClass" : "kafka.serializer.StringEncoder",
-    "keySerializerClass" : "kafka.serializer.StringEncoder"
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1517c132/eagle-hadoop-metric/src/main/resources/capacityused-policy-import.sh
----------------------------------------------------------------------
diff --git a/eagle-hadoop-metric/src/main/resources/capacityused-policy-import.sh b/eagle-hadoop-metric/src/main/resources/capacityused-policy-import.sh
deleted file mode 100644
index 71e2756..0000000
--- a/eagle-hadoop-metric/src/main/resources/capacityused-policy-import.sh
+++ /dev/null
@@ -1,51 +0,0 @@
-#!/bin/bash
-
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with`
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-source $(dirname $0)/eagle-env.sh
-source $(dirname $0)/hadoop-metric-init.sh
-
-
-##### add policies ##########
-echo ""
-echo "Importing policy: capacityUsedPolicy "
-curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' \
- "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertDefinitionService" \
- -d '
- [
-     {
-       "prefix": "alertdef",
-       "tags": {
-         "site": "sandbox",
-         "application": "hadoopJmxMetricDataSource",
-         "policyId": "capacityUsedPolicy",
-         "alertExecutorId": "hadoopJmxMetricAlertExecutor",
-         "policyType": "siddhiCEPEngine"
-       },
-       "description": "jmx metric ",
-       "policyDef": "{\"expression\":\"from hadoopJmxMetricEventStream[metric == \\\"hadoop.namenode.fsnamesystemstate.capacityused\\\" and convert(value, \\\"long\\\") > 0]#window.externalTime(timestamp ,10 min) select metric, host, value, timestamp, component, site insert into tmp; \",\"type\":\"siddhiCEPEngine\"}",
-       "enabled": true,
-       "dedupeDef": "{\"alertDedupIntervalMin\":10,\"emailDedupIntervalMin\":10}",
-       "notificationDef": "[{\"sender\":\"eagle@apache.org\",\"recipients\":\"eagle@apache.org\",\"subject\":\"missing block found.\",\"flavor\":\"email\",\"id\":\"email_1\",\"tplFileName\":\"\"}]"
-     }
- ]
- '
-
- ## Finished
-echo ""
-echo "Finished initialization for eagle topology"
-
-exit 0

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1517c132/eagle-hadoop-metric/src/main/resources/datasources.json
----------------------------------------------------------------------
diff --git a/eagle-hadoop-metric/src/main/resources/datasources.json b/eagle-hadoop-metric/src/main/resources/datasources.json
new file mode 100644
index 0000000..5c10696
--- /dev/null
+++ b/eagle-hadoop-metric/src/main/resources/datasources.json
@@ -0,0 +1,19 @@
+[
+  {
+    "name": "hadoop_jmx_datasource",
+    "type": "KAFKA",
+    "properties": {
+    },
+    "topic": "hadoop_jmx_metrics",
+    "schemeCls": "org.apache.eagle.alert.engine.scheme.JsonScheme",
+    "codec": {
+      "streamNameSelectorProp": {
+        "userProvidedStreamName" : "hadoopJmxMetricEventStream",
+        "streamNameFormat":"%s"
+      },
+      "streamNameSelectorCls":"org.apache.eagle.alert.engine.scheme.JsonStringStreamNameSelector",
+      "timestampColumn": "timestamp",
+      "timestampFormat": ""
+    }
+  }
+]
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1517c132/eagle-hadoop-metric/src/main/resources/eagle-env.sh
----------------------------------------------------------------------
diff --git a/eagle-hadoop-metric/src/main/resources/eagle-env.sh b/eagle-hadoop-metric/src/main/resources/eagle-env.sh
deleted file mode 100755
index 79ff5fa..0000000
--- a/eagle-hadoop-metric/src/main/resources/eagle-env.sh
+++ /dev/null
@@ -1,44 +0,0 @@
-#!/bin/bash
-
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-# set EAGLE_HOME
-export EAGLE_HOME=$(dirname $0)/..
-
-# The java implementation to use. please use jdk 1.7 or later
-# export JAVA_HOME=${JAVA_HOME}
-# export JAVA_HOME=/usr/java/jdk1.7.0_80/
-
-# nimbus.host, default is localhost
-export EAGLE_NIMBUS_HOST=localhost
-
-# EAGLE_SERVICE_HOST, default is `hostname -f`
-export EAGLE_SERVICE_HOST=localhost
-
-# EAGLE_SERVICE_PORT, default is 9099
-export EAGLE_SERVICE_PORT=9099
-
-# EAGLE_SERVICE_USER
-export EAGLE_SERVICE_USER=admin
-
-# EAGLE_SERVICE_PASSWORD
-export EAGLE_SERVICE_PASSWD=secret
-
-export EAGLE_CLASSPATH=$EAGLE_HOME/conf
-# Add eagle shared library jars
-for file in $EAGLE_HOME/lib/share/*;do
-	EAGLE_CLASSPATH=$EAGLE_CLASSPATH:$file
-done

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1517c132/eagle-hadoop-metric/src/main/resources/hadoop-metric-init.sh
----------------------------------------------------------------------
diff --git a/eagle-hadoop-metric/src/main/resources/hadoop-metric-init.sh b/eagle-hadoop-metric/src/main/resources/hadoop-metric-init.sh
deleted file mode 100755
index 849f462..0000000
--- a/eagle-hadoop-metric/src/main/resources/hadoop-metric-init.sh
+++ /dev/null
@@ -1,180 +0,0 @@
-#!/bin/bash
-
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with`
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-source $(dirname $0)/eagle-env.sh
-
-#####################################################################
-#            Import stream metadata for HDFS
-#####################################################################
-
-## AlertDataSource: data sources bound to sites
-echo "Importing AlertDataSourceService for persist... "
-
-curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' \
- "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=SiteApplicationService" \
-  -d '
-  [
-     {
-        "tags":{
-           "site":"sandbox",
-           "application":"hadoopJmxMetricDataSource"
-        },
-        "enabled": true,
-        "config": "web.druid.coordinator=coordinatorHost:port\nweb.druid.broker=brokerHost:port"
-     }
-  ]
-  '
-
-curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' \
- "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=ApplicationDescService" \
-  -d '
-  [
-     {
-        "tags":{
-           "application":"hadoopJmxMetricDataSource"
-        },
-        "description":"hadoop jmx metric monitoring",
-        "alias":"JmxMetricMonitor",
-        "groupName":"METRIC",
-        "config":"{}",
-        "features":["common","metadata"]
-     }
-  ]
-  '
-
-## AlertStreamService: alert streams generated from data source
-echo ""
-echo "Importing AlertStreamService for HDFS... "
-curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' \
- "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertStreamService" \
- -d '
- [
-    {
-       "prefix":"alertStream",
-       "tags":{
-          "application":"hadoopJmxMetricDataSource",
-          "streamName":"hadoopJmxMetricEventStream"
-       },
-       "description":"hadoop"
-    }
- ]
- '
-
-## AlertExecutorService: what alert streams are consumed by alert executor
-echo ""
-echo "Importing AlertExecutorService for HDFS... "
-curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' \
- "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertExecutorService" \
- -d '
- [
-    {
-       "prefix":"alertExecutor",
-       "tags":{
-          "application":"hadoopJmxMetricDataSource",
-          "alertExecutorId":"hadoopJmxMetricAlertExecutor",
-          "streamName":"hadoopJmxMetricEventStream"
-       },
-       "description":"aggregate executor for hadoop jmx metric event stream"
-    }
- ]
- '
-
-## AlertStreamSchemaService: schema for event from alert stream
-echo ""
-echo "Importing AlertStreamSchemaService for HDFS... "
-curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' \
-"http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertStreamSchemaService" \
- -d '
- [
-    {
-       "prefix": "alertStreamSchema",
-       "tags": {
-          "application": "hadoopJmxMetricDataSource",
-          "streamName": "hadoopJmxMetricEventStream",
-          "attrName": "host"
-       },
-       "attrDescription": "the host that current metric comes form",
-       "attrType": "string",
-       "category": "",
-       "attrValueResolver": ""
-    },
-    {
-       "prefix": "alertStreamSchema",
-       "tags": {
-          "application": "hadoopJmxMetricDataSource",
-          "streamName": "hadoopJmxMetricEventStream",
-          "attrName": "timestamp"
-       },
-       "attrDescription": "the metric timestamp",
-       "attrType": "long",
-       "category": "",
-       "attrValueResolver": ""
-    },
-    {
-       "prefix": "alertStreamSchema",
-       "tags": {
-          "application": "hadoopJmxMetricDataSource",
-          "streamName": "hadoopJmxMetricEventStream",
-          "attrName": "metric"
-       },
-       "attrDescription": "the metric name",
-       "attrType": "string",
-       "category": "",
-       "attrValueResolver": ""
-    },
-    {
-       "prefix": "alertStreamSchema",
-       "tags": {
-          "application": "hadoopJmxMetricDataSource",
-          "streamName": "hadoopJmxMetricEventStream",
-          "attrName": "component"
-       },
-       "attrDescription": "the component that the metric comes from",
-       "attrType": "string",
-       "category": "",
-       "attrValueResolver": ""
-    },
-    {
-       "prefix": "alertStreamSchema",
-       "tags": {
-          "application": "hadoopJmxMetricDataSource",
-          "streamName": "hadoopJmxMetricEventStream",
-          "attrName": "site"
-       },
-       "attrDescription": "the site that the metric belongs to",
-       "attrType": "string",
-       "category": "",
-       "attrValueResolver": ""
-    },
-    {
-       "prefix": "alertStreamSchema",
-       "tags": {
-          "application": "hadoopJmxMetricDataSource",
-          "streamName": "hadoopJmxMetricEventStream",
-          "attrName": "value"
-       },
-       "attrDescription": "the metric value in string presentation",
-       "attrType": "double",
-       "category": "",
-       "attrValueResolver": ""
-    }
- ]
- '
-
-## Finished
-echo ""
-echo "Finished initialization for eagle topology"

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1517c132/eagle-hadoop-metric/src/main/resources/hadoopjmx.yaml
----------------------------------------------------------------------
diff --git a/eagle-hadoop-metric/src/main/resources/hadoopjmx.yaml b/eagle-hadoop-metric/src/main/resources/hadoopjmx.yaml
deleted file mode 100644
index a68a323..0000000
--- a/eagle-hadoop-metric/src/main/resources/hadoopjmx.yaml
+++ /dev/null
@@ -1,18 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-topology.workers: 1
-topology.acker.executors: 1
-topology.tasks: 1
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1517c132/eagle-hadoop-metric/src/main/resources/hastate-policy-import.sh
----------------------------------------------------------------------
diff --git a/eagle-hadoop-metric/src/main/resources/hastate-policy-import.sh b/eagle-hadoop-metric/src/main/resources/hastate-policy-import.sh
deleted file mode 100644
index b125e37..0000000
--- a/eagle-hadoop-metric/src/main/resources/hastate-policy-import.sh
+++ /dev/null
@@ -1,51 +0,0 @@
-#!/bin/bash
-
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with`
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-source $(dirname $0)/eagle-env.sh
-source $(dirname $0)/hadoop-metric-init.sh
-
-
-##### add policies ##########
-echo ""
-echo "Importing policy: haStatePolicy "
-curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' \
- "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertDefinitionService" \
- -d '
- [
-     {
-       "prefix": "alertdef",
-       "tags": {
-         "site": "sandbox",
-         "application": "hadoopJmxMetricDataSource",
-         "policyId": "haStatePolicy",
-         "alertExecutorId": "hadoopJmxMetricAlertExecutor",
-         "policyType": "siddhiCEPEngine"
-       },
-       "description": "jmx metric ",
-       "policyDef": "{\"expression\":\"from every a = hadoopJmxMetricEventStream[metric==\\\"hadoop.namenode.fsnamesystem.hastate\\\"] -> b = hadoopJmxMetricEventStream[metric==a.metric and b.host == a.host and (convert(a.value, \\\"long\\\") != convert(value, \\\"long\\\"))] within 10 min select a.host, a.value as oldHaState, b.value as newHaState, b.timestamp as timestamp, b.metric as metric, b.component as component, b.site as site insert into tmp; \",\"type\":\"siddhiCEPEngine\"}",
-       "enabled": true,
-       "dedupeDef": "{\"alertDedupIntervalMin\":10,\"emailDedupIntervalMin\":10}",
-       "notificationDef": "[{\"sender\":\"eagle@apache.org\",\"recipients\":\"eagle@apache.org\",\"subject\":\"missing block found.\",\"flavor\":\"email\",\"id\":\"email_1\",\"tplFileName\":\"\"}]"
-     }
- ]
- '
-
- ## Finished
-echo ""
-echo "Finished initialization for eagle topology"
-
-exit 0

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1517c132/eagle-hadoop-metric/src/main/resources/lastcheckpointtime-policy-import.sh
----------------------------------------------------------------------
diff --git a/eagle-hadoop-metric/src/main/resources/lastcheckpointtime-policy-import.sh b/eagle-hadoop-metric/src/main/resources/lastcheckpointtime-policy-import.sh
deleted file mode 100644
index 333367c..0000000
--- a/eagle-hadoop-metric/src/main/resources/lastcheckpointtime-policy-import.sh
+++ /dev/null
@@ -1,51 +0,0 @@
-#!/bin/bash
-
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with`
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-source $(dirname $0)/eagle-env.sh
-source $(dirname $0)/hadoop-metric-init.sh
-
-
-##### add policies ##########
-echo ""
-echo "Importing policy: lastCheckPointTimePolicy "
-curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' \
- "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertDefinitionService" \
- -d '
- [
-     {
-       "prefix": "alertdef",
-       "tags": {
-         "site": "sandbox",
-         "application": "hadoopJmxMetricDataSource",
-         "policyId": "lastCheckPointTimePolicy",
-         "alertExecutorId": "hadoopJmxMetricAlertExecutor",
-         "policyType": "siddhiCEPEngine"
-       },
-       "description": "jmx metric ",
-       "policyDef": "{\"expression\":\"from hadoopJmxMetricEventStream[metric == \\\"hadoop.namenode.dfs.lastcheckpointtime\\\" and (convert(value, \\\"long\\\") + 18000000) < timestamp]#window.externalTime(timestamp ,10 min) select metric, host, value, timestamp, component, site insert into tmp; \",\"type\":\"siddhiCEPEngine\"}",
-       "enabled": true,
-       "dedupeDef": "{\"alertDedupIntervalMin\":10,\"emailDedupIntervalMin\":10}",
-       "notificationDef": "[{\"sender\":\"eagle@apache.org\",\"recipients\":\"eagle@apache.org\",\"subject\":\"last check point time lag found.\",\"flavor\":\"email\",\"id\":\"email_1\",\"tplFileName\":\"\"}]"
-     }
- ]
- '
-
- ## Finished
-echo ""
-echo "Finished initialization for eagle topology"
-
-exit 0

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1517c132/eagle-hadoop-metric/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/eagle-hadoop-metric/src/main/resources/log4j.properties b/eagle-hadoop-metric/src/main/resources/log4j.properties
deleted file mode 100644
index 149caa7..0000000
--- a/eagle-hadoop-metric/src/main/resources/log4j.properties
+++ /dev/null
@@ -1,35 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-log4j.rootLogger=INFO, stdout, DRFA
-
-eagle.log.dir=./logs
-eagle.log.file=eagle.log
-
-# standard output
-log4j.appender.stdout=org.apache.log4j.ConsoleAppender
-log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
-log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n
-
-# Daily Rolling File Appender
-log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender
-log4j.appender.DRFA.File=${eagle.log.dir}/${eagle.log.file}
-log4j.appender.DRFA.DatePattern=yyyy-MM-dd
-## 30-day backup
-# log4j.appender.DRFA.MaxBackupIndex=30
-log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout
-
-# Pattern format: Date LogLevel LoggerName LogMessage
-log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1517c132/eagle-hadoop-metric/src/main/resources/missingblock-policy-import.sh
----------------------------------------------------------------------
diff --git a/eagle-hadoop-metric/src/main/resources/missingblock-policy-import.sh b/eagle-hadoop-metric/src/main/resources/missingblock-policy-import.sh
deleted file mode 100644
index 9a5e924..0000000
--- a/eagle-hadoop-metric/src/main/resources/missingblock-policy-import.sh
+++ /dev/null
@@ -1,51 +0,0 @@
-#!/bin/bash
-
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with`
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-source $(dirname $0)/eagle-env.sh
-source $(dirname $0)/hadoop-metric-init.sh
-
-
-##### add policies ##########
-echo ""
-echo "Importing policy: missingBlockPolicy "
-curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' \
- "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertDefinitionService" \
- -d '
- [
-     {
-       "prefix": "alertdef",
-       "tags": {
-         "site": "sandbox",
-         "application": "hadoopJmxMetricDataSource",
-         "policyId": "missingBlockPolicy",
-         "alertExecutorId": "hadoopJmxMetricAlertExecutor",
-         "policyType": "siddhiCEPEngine"
-       },
-       "description": "jmx metric ",
-       "policyDef": "{\"expression\":\"from hadoopJmxMetricEventStream[metric == \\\"hadoop.namenode.dfs.missingblocks\\\" and convert(value, \\\"long\\\") > 0]#window.externalTime(timestamp ,10 min) select metric, host, value, timestamp, component, site insert into tmp; \",\"type\":\"siddhiCEPEngine\"}",
-       "enabled": true,
-       "dedupeDef": "{\"alertDedupIntervalMin\":10,\"emailDedupIntervalMin\":10}",
-       "notificationDef": "[{\"sender\":\"eagle@apache.org\",\"recipients\":\"eagle@apache.org\",\"subject\":\"missing block found.\",\"flavor\":\"email\",\"id\":\"email_1\",\"tplFileName\":\"\"}]"
-     }
- ]
- '
-
- ## Finished
-echo ""
-echo "Finished initialization for eagle topology"
-
-exit 0

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1517c132/eagle-hadoop-metric/src/main/resources/namenodelag-policy-import.sh
----------------------------------------------------------------------
diff --git a/eagle-hadoop-metric/src/main/resources/namenodelag-policy-import.sh b/eagle-hadoop-metric/src/main/resources/namenodelag-policy-import.sh
deleted file mode 100644
index 45de4b8..0000000
--- a/eagle-hadoop-metric/src/main/resources/namenodelag-policy-import.sh
+++ /dev/null
@@ -1,49 +0,0 @@
-#!/bin/bash
-
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with`
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-source $(dirname $0)/eagle-env.sh
-source $(dirname $0)/hadoop-metric-init.sh
-
-echo ""
-echo "Importing Policy: NameNodeLagPolicy"
-curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' \
- "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertDefinitionService" \
- -d '
- [
-     {
-       "prefix": "alertdef",
-       "tags": {
-         "site": "sandbox",
-         "application": "hadoopJmxMetricDataSource",
-         "policyId": "NameNodeLagPolicy",
-         "alertExecutorId": "hadoopJmxMetricAlertExecutor",
-         "policyType": "siddhiCEPEngine"
-       },
-       "description": "jmx metric ",
-       "policyDef": "{\"expression\":\"from every a = hadoopJmxMetricEventStream[metric==\\\"hadoop.namenode.journaltransaction.lastappliedorwrittentxid\\\"] -> b = hadoopJmxMetricEventStream[metric==a.metric and b.host != a.host and (max(convert(a.value, \\\"long\\\")) + 100) <= max(convert(value, \\\"long\\\"))] within 5 min select a.host as hostA, a.value as transactIdA, b.host as hostB, b.value as transactIdB insert into tmp; \",\"type\":\"siddhiCEPEngine\"}",
-       "enabled": true,
-       "dedupeDef": "{\"alertDedupIntervalMin\":10,\"emailDedupIntervalMin\":10}",
-       "notificationDef": "[{\"sender\":\"eagle@apache.org\",\"recipients\":\"eagle@apache.org\",\"subject\":\"name node lag found.\",\"flavor\":\"email\",\"id\":\"email_1\",\"tplFileName\":\"\"}]"
-     }
- ]
- '
-
- ## Finished
-echo ""
-echo "Finished initialization for eagle topology"
-
-exit 0

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1517c132/eagle-hadoop-metric/src/main/resources/nodecount-policy-import.sh
----------------------------------------------------------------------
diff --git a/eagle-hadoop-metric/src/main/resources/nodecount-policy-import.sh b/eagle-hadoop-metric/src/main/resources/nodecount-policy-import.sh
deleted file mode 100644
index 589a44e..0000000
--- a/eagle-hadoop-metric/src/main/resources/nodecount-policy-import.sh
+++ /dev/null
@@ -1,51 +0,0 @@
-#!/bin/bash
-
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with`
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-source $(dirname $0)/eagle-env.sh
-source $(dirname $0)/hadoop-metric-init.sh
-
-
-##### add policies ##########
-echo ""
-echo "Importing policy: dataNodeCountPolicy "
-curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' \
- "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertDefinitionService" \
- -d '
- [
-     {
-       "prefix": "alertdef",
-       "tags": {
-         "site": "sandbox",
-         "application": "hadoopJmxMetricDataSource",
-         "policyId": "dataNodeCountPolicy",
-         "alertExecutorId": "hadoopJmxMetricAlertExecutor",
-         "policyType": "siddhiCEPEngine"
-       },
-       "description": "jmx metric ",
-       "policyDef": "{\"expression\":\"from every (e1 = hadoopJmxMetricEventStream[metric == \\\"hadoop.namenode.fsnamesystemstate.numlivedatanodes\\\" ]) -> e2 = hadoopJmxMetricEventStream[metric == e1.metric and host == e1.host and (convert(e1.value, \\\"long\\\") - 5) >= convert(value, \\\"long\\\") ] within 5 min select e1.metric, e1.host, e1.value as highNum, e1.timestamp as start, e2.value as lowNum, e2.timestamp as end insert into tmp; \",\"type\":\"siddhiCEPEngine\"}",
-       "enabled": true,
-       "dedupeDef": "{\"alertDedupIntervalMin\":10,\"emailDedupIntervalMin\":10}",
-       "notificationDef": "[{\"sender\":\"eagle@apache.org\",\"recipients\":\"eagle@apache.org\",\"subject\":\"node count joggling found.\",\"flavor\":\"email\",\"id\":\"email_1\",\"tplFileName\":\"\"}]"
-     }
- ]
- '
-
- ## Finished
-echo ""
-echo "Finished initialization for eagle topology"
-
-exit 0

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1517c132/eagle-hadoop-metric/src/main/resources/policies.json
----------------------------------------------------------------------
diff --git a/eagle-hadoop-metric/src/main/resources/policies.json b/eagle-hadoop-metric/src/main/resources/policies.json
new file mode 100644
index 0000000..811ffb0
--- /dev/null
+++ b/eagle-hadoop-metric/src/main/resources/policies.json
@@ -0,0 +1,169 @@
+[
+  {
+    "name": "haStatePolicy",
+    "description": "ha state check for hadoop name node",
+    "definition": {
+      "inputStreams": [
+        "hadoopJmxMetricEventStream"
+      ],
+      "outputStreams": [
+        "tmp"
+      ],
+      "type": "siddhi",
+      "value": " from every a = hadoopJmxMetricEventStream[metric==\"hadoop.namenode.fsnamesystem.hastate\"] -> b = hadoopJmxMetricEventStream[metric==a.metric and b.host == a.host and (convert(a.value, \"long\\\") != convert(value, \"long\"))] within 10 min select a.host, a.value as oldHaState, b.value as newHaState, b.timestamp as timestamp, b.metric as metric, b.component as component, b.site as site insert into tmp; "
+    },
+    "partitionSpec": [
+      {
+        "streamId": "hadoopJmxMetricEventStream",
+        "type": "GROUPBY",
+        "columns": [
+          "host"
+        ]
+      }
+    ]
+  },
+
+  {
+    "name": "capacityUsedPolicy",
+    "description": "capacity usage check for hadoop cluster",
+    "definition": {
+      "inputStreams": [
+        "hadoopJmxMetricEventStream"
+      ],
+      "outputStreams": [
+        "tmp"
+      ],
+      "type": "siddhi",
+      "value": " from hadoopJmxMetricEventStream[metric == \"hadoop.namenode.fsnamesystemstate.capacityused\" and convert(value, \"long\") > 0]#window.externalTime(timestamp ,10 min) select metric, host, value, timestamp, component, site insert into tmp; "
+    },
+    "partitionSpec": [
+      {
+        "streamId": "hadoopJmxMetricEventStream",
+        "type": "GROUPBY",
+        "columns": [
+          "host"
+        ]
+      }
+    ]
+  },
+
+  {
+    "name": "lastCheckPointTimePolicy",
+    "description": "last check point interval check for hadoop name node pair",
+    "definition": {
+      "inputStreams": [
+        "hadoopJmxMetricEventStream"
+      ],
+      "outputStreams": [
+        "tmp"
+      ],
+      "type": "siddhi",
+      "value": " from hadoopJmxMetricEventStream[metric == \"hadoop.namenode.dfs.lastcheckpointtime\" and (convert(value, \"long\") + 18000000) < timestamp]#window.externalTime(timestamp ,10 min) select metric, host, value, timestamp, component, site insert into tmp;  "
+    },
+    "partitionSpec": [
+      {
+        "streamId": "hadoopJmxMetricEventStream",
+        "type": "GROUPBY",
+        "columns": [
+          "host"
+        ]
+      }
+    ]
+  },
+
+  {
+    "name": "missingBlockPolicy",
+    "description": "missing block number check for hadoop cluster",
+    "definition": {
+      "inputStreams": [
+        "hadoopJmxMetricEventStream"
+      ],
+      "outputStreams": [
+        "tmp"
+      ],
+      "type": "siddhi",
+      "value": " from hadoopJmxMetricEventStream[metric == \"hadoop.namenode.dfs.missingblocks\" and convert(value, \"long\") > 0]#window.externalTime(timestamp ,10 min) select metric, host, value, timestamp, component, site insert into tmp;  "
+    },
+    "partitionSpec": [
+      {
+        "streamId": "hadoopJmxMetricEventStream",
+        "type": "GROUPBY",
+        "columns": [
+          "host"
+        ]
+      }
+    ]
+  },
+
+  {
+    "name": "namenodeTxLagPolicy",
+    "description": "name node tx log lag check",
+    "definition": {
+      "inputStreams": [
+        "hadoopJmxMetricEventStream"
+      ],
+      "outputStreams": [
+        "tmp"
+      ],
+      "type": "siddhi",
+      "value": " from every a = hadoopJmxMetricEventStream[metric==\"hadoop.namenode.journaltransaction.lastappliedorwrittentxid\"] -> b = hadoopJmxMetricEventStream[metric==a.metric and b.host != a.host and (max(convert(a.value, \"long\")) + 100) <= max(convert(value, \"long\"))] within 5 min select a.host as hostA, a.value as transactIdA, b.host as hostB, b.value as transactIdB insert into tmp; "
+    },
+    "partitionSpec": [
+      {
+        "streamId": "hadoopJmxMetricEventStream",
+        "type": "GROUPBY",
+        "columns": [
+          "host"
+        ]
+      }
+    ]
+  },
+
+  {
+    "name": "nodecountPolicy",
+    "description": "data node number check for hadoop cluster",
+    "definition": {
+      "inputStreams": [
+        "hadoopJmxMetricEventStream"
+      ],
+      "outputStreams": [
+        "tmp"
+      ],
+      "type": "siddhi",
+      "value": " from every (e1 = hadoopJmxMetricEventStream[metric == \"hadoop.namenode.fsnamesystemstate.numlivedatanodes\" ]) -> e2 = hadoopJmxMetricEventStream[metric == e1.metric and host == e1.host and (convert(e1.value, \"long\") - 5) >= convert(value, \"long\") ] within 5 min select e1.metric, e1.host, e1.value as highNum, e1.timestamp as start, e2.value as lowNum, e2.timestamp as end insert into tmp;  "
+    },
+    "partitionSpec": [
+      {
+        "streamId": "hadoopJmxMetricEventStream",
+        "type": "GROUPBY",
+        "columns": [
+          "host"
+        ]
+      }
+    ]
+  },
+
+  {
+    "name": "nameNodeSafeModeCheckPolicy",
+    "description": "safe mode check for name node",
+    "definition": {
+      "inputStreams": [
+        "hadoopJmxMetricEventStream"
+      ],
+      "outputStreams": [
+        "tmp"
+      ],
+      "type": "siddhi",
+      "value": " from hadoopJmxMetricEventStream[component==\"namenode\" and metric == \"hadoop.namenode.fsnamesystemstate.fsstate\" and convert(value, \"long\") > 0]#window.externalTime(timestamp ,10 min) select metric, host, value, timestamp, component, site insert into tmp;  "
+    },
+    "partitionSpec": [
+      {
+        "streamId": "hadoopJmxMetricEventStream",
+        "type": "GROUPBY",
+        "columns": [
+          "host"
+        ]
+      }
+    ]
+  }
+]
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1517c132/eagle-hadoop-metric/src/main/resources/publishments.json
----------------------------------------------------------------------
diff --git a/eagle-hadoop-metric/src/main/resources/publishments.json b/eagle-hadoop-metric/src/main/resources/publishments.json
new file mode 100644
index 0000000..10d16a4
--- /dev/null
+++ b/eagle-hadoop-metric/src/main/resources/publishments.json
@@ -0,0 +1,33 @@
+[
+  {
+    "name":"jmxStreamOutput_email",
+    "type":"org.apache.eagle.alert.engine.publisher.impl.AlertEmailPublisher",
+    "policyIds": [
+      "perfmon_cpu_host_check", "perfmon_cpu_pool_check"
+    ],
+    "properties": {
+      "subject":"Hadoop Jmx Metrics Alert",
+      "template":"",
+      "sender": "sender@corp.com",
+      "recipients": "services@corp.com",
+      "smtp.server":"smtp.mailhost.com",
+      "connection": "plaintext",
+      "smtp.port": "25"
+    },
+    "dedupIntervalMin" : "PT1M",
+    "serializer" : "org.apache.eagle.alert.engine.publisher.impl.StringEventSerializer"
+  },
+  {
+    "type": "org.apache.eagle.alert.engine.publisher.impl.AlertKafkaPublisher",
+    "name":"jmxStreamOutput_kafka",
+    "policyIds": [
+      "perfmon_cpu_host_check"
+    ],
+    "dedupIntervalMin": "PT1M",
+    "properties":{
+      "kafka_broker":"localhost:9092",
+      "topic":"jmx_alerts"
+    },
+    "serializer" : "org.apache.eagle.alert.engine.publisher.impl.StringEventSerializer"
+  }
+]

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1517c132/eagle-hadoop-metric/src/main/resources/safemodecheck-policy-import.sh
----------------------------------------------------------------------
diff --git a/eagle-hadoop-metric/src/main/resources/safemodecheck-policy-import.sh b/eagle-hadoop-metric/src/main/resources/safemodecheck-policy-import.sh
deleted file mode 100644
index 32a6bee..0000000
--- a/eagle-hadoop-metric/src/main/resources/safemodecheck-policy-import.sh
+++ /dev/null
@@ -1,51 +0,0 @@
-#!/bin/bash
-
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with`
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-source $(dirname $0)/eagle-env.sh
-source $(dirname $0)/hadoop-metric-init.sh
-
-
-##### add policies ##########
-echo ""
-echo "Importing policy: safeModePolicy "
-curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' \
- "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertDefinitionService" \
- -d '
- [
-     {
-       "prefix": "alertdef",
-       "tags": {
-         "site": "sandbox",
-         "application": "hadoopJmxMetricDataSource",
-         "policyId": "safeModePolicy",
-         "alertExecutorId": "hadoopJmxMetricAlertExecutor",
-         "policyType": "siddhiCEPEngine"
-       },
-       "description": "jmx metric ",
-       "policyDef": "{\"expression\":\"from hadoopJmxMetricEventStream[component==\\\"namenode\\\" and metric == \\\"hadoop.namenode.fsnamesystemstate.fsstate\\\" and convert(value, \\\"long\\\") > 0]#window.externalTime(timestamp ,10 min) select metric, host, value, timestamp, component, site insert into tmp; \",\"type\":\"siddhiCEPEngine\"}",
-       "enabled": true,
-       "dedupeDef": "{\"alertDedupIntervalMin\":10,\"emailDedupIntervalMin\":10}",
-       "notificationDef": "[{\"sender\":\"eagle@apache.org\",\"recipients\":\"eagle@apache.org\",\"subject\":\"missing block found.\",\"flavor\":\"email\",\"id\":\"email_1\",\"tplFileName\":\"\"}]"
-     }
- ]
- '
-
- ## Finished
-echo ""
-echo "Finished initialization for eagle topology"
-
-exit 0

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1517c132/eagle-hadoop-metric/src/main/resources/sanbox/capacityused-policy-import.sh
----------------------------------------------------------------------
diff --git a/eagle-hadoop-metric/src/main/resources/sanbox/capacityused-policy-import.sh b/eagle-hadoop-metric/src/main/resources/sanbox/capacityused-policy-import.sh
deleted file mode 100755
index ad52275..0000000
--- a/eagle-hadoop-metric/src/main/resources/sanbox/capacityused-policy-import.sh
+++ /dev/null
@@ -1,51 +0,0 @@
-#!/bin/bash
-
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with`
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-source $(dirname $0)/eagle-env.sh
-source $(dirname $0)/hadoop-metric-init.sh
-
-
-##### add policies ##########
-echo ""
-echo "Importing policy: capacityUsedPolicy "
-curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' \
- "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertDefinitionService" \
- -d '
- [
-     {
-       "prefix": "alertdef",
-       "tags": {
-         "site": "sandbox",
-         "application": "hadoopJmxMetricDataSource",
-         "policyId": "capacityUsedPolicy",
-         "alertExecutorId": "hadoopJmxMetricAlertExecutor",
-         "policyType": "siddhiCEPEngine"
-       },
-       "description": "jmx metric ",
-       "policyDef": "{\"expression\":\"from hadoopJmxMetricEventStream[metric == \\\"hadoop.namenode.fsnamesystemstate.capacityused\\\" and convert(value, \\\"long\\\") > 0]#window.externalTime(timestamp ,10 min) select metric, host, value, timestamp, component, site insert into tmp; \",\"type\":\"siddhiCEPEngine\"}",
-       "enabled": true,
-       "dedupeDef": "{\"alertDedupIntervalMin\":10,\"emailDedupIntervalMin\":10}",
-       "notificationDef": "[{\"notificationType\":\"eagleStore\"},{\"sender\":\"eagle@apache.org\",\"recipients\":\"eagle@apache.org\",\"subject\":\"missing block found.\",\"flavor\":\"email\",\"id\":\"email_1\",\"tplFileName\":\"\"}]"
-     }
- ]
- '
-
- ## Finished
-echo ""
-echo "Finished initialization for eagle topology"
-
-exit 0

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1517c132/eagle-hadoop-metric/src/main/resources/sanbox/hastate-policy-import.sh
----------------------------------------------------------------------
diff --git a/eagle-hadoop-metric/src/main/resources/sanbox/hastate-policy-import.sh b/eagle-hadoop-metric/src/main/resources/sanbox/hastate-policy-import.sh
deleted file mode 100755
index 7801c09..0000000
--- a/eagle-hadoop-metric/src/main/resources/sanbox/hastate-policy-import.sh
+++ /dev/null
@@ -1,51 +0,0 @@
-#!/bin/bash
-
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with`
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-source $(dirname $0)/eagle-env.sh
-source $(dirname $0)/hadoop-metric-init.sh
-
-
-##### add policies ##########
-echo ""
-echo "Importing policy: haStatePolicy "
-curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' \
- "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertDefinitionService" \
- -d '
- [
-     {
-       "prefix": "alertdef",
-       "tags": {
-         "site": "sandbox",
-         "application": "hadoopJmxMetricDataSource",
-         "policyId": "haStatePolicy",
-         "alertExecutorId": "hadoopJmxMetricAlertExecutor",
-         "policyType": "siddhiCEPEngine"
-       },
-       "description": "jmx metric ",
-       "policyDef": "{\"expression\":\"from every a = hadoopJmxMetricEventStream[metric==\\\"hadoop.namenode.fsnamesystem.hastate\\\"] -> b = hadoopJmxMetricEventStream[metric==a.metric and b.host == a.host and (convert(a.value, \\\"long\\\") != convert(value, \\\"long\\\"))] within 10 min select a.host, a.value as oldHaState, b.value as newHaState, b.timestamp as timestamp, b.metric as metric, b.component as component, b.site as site insert into tmp; \",\"type\":\"siddhiCEPEngine\"}",
-       "enabled": true,
-       "dedupeDef": "{\"alertDedupIntervalMin\":10,\"emailDedupIntervalMin\":10}",
-       "notificationDef": "[{\"notificationType\":\"eagleStore\"},{\"sender\":\"eagle@apache.org\",\"recipients\":\"eagle@apache.org\",\"subject\":\"missing block found.\",\"flavor\":\"email\",\"id\":\"email_1\",\"tplFileName\":\"\"}]"
-     }
- ]
- '
-
- ## Finished
-echo ""
-echo "Finished initialization for eagle topology"
-
-exit 0

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1517c132/eagle-hadoop-metric/src/main/resources/sanbox/lastcheckpointtime-policy-import.sh
----------------------------------------------------------------------
diff --git a/eagle-hadoop-metric/src/main/resources/sanbox/lastcheckpointtime-policy-import.sh b/eagle-hadoop-metric/src/main/resources/sanbox/lastcheckpointtime-policy-import.sh
deleted file mode 100755
index d3811aa..0000000
--- a/eagle-hadoop-metric/src/main/resources/sanbox/lastcheckpointtime-policy-import.sh
+++ /dev/null
@@ -1,51 +0,0 @@
-#!/bin/bash
-
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with`
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-source $(dirname $0)/eagle-env.sh
-source $(dirname $0)/hadoop-metric-init.sh
-
-
-##### add policies ##########
-echo ""
-echo "Importing policy: lastCheckPointTimePolicy "
-curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' \
- "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertDefinitionService" \
- -d '
- [
-     {
-       "prefix": "alertdef",
-       "tags": {
-         "site": "sandbox",
-         "application": "hadoopJmxMetricDataSource",
-         "policyId": "lastCheckPointTimePolicy",
-         "alertExecutorId": "hadoopJmxMetricAlertExecutor",
-         "policyType": "siddhiCEPEngine"
-       },
-       "description": "jmx metric ",
-       "policyDef": "{\"expression\":\"from hadoopJmxMetricEventStream[metric == \\\"hadoop.namenode.dfs.lastcheckpointtime\\\" and (convert(value, \\\"long\\\") + 18000000) < timestamp]#window.externalTime(timestamp ,10 min) select metric, host, value, timestamp, component, site insert into tmp; \",\"type\":\"siddhiCEPEngine\"}",
-       "enabled": true,
-       "dedupeDef": "{\"alertDedupIntervalMin\":10,\"emailDedupIntervalMin\":10}",
-       "notificationDef": "[{\"notificationType\":\"eagleStore\"},{\"sender\":\"eagle@apache.org\",\"recipients\":\"eagle@apache.org\",\"subject\":\"last check point time lag found.\",\"flavor\":\"email\",\"id\":\"email_1\",\"tplFileName\":\"\"}]"
-     }
- ]
- '
-
- ## Finished
-echo ""
-echo "Finished initialization for eagle topology"
-
-exit 0

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1517c132/eagle-hadoop-metric/src/main/resources/sanbox/missingblock-policy-import.sh
----------------------------------------------------------------------
diff --git a/eagle-hadoop-metric/src/main/resources/sanbox/missingblock-policy-import.sh b/eagle-hadoop-metric/src/main/resources/sanbox/missingblock-policy-import.sh
deleted file mode 100755
index be51597..0000000
--- a/eagle-hadoop-metric/src/main/resources/sanbox/missingblock-policy-import.sh
+++ /dev/null
@@ -1,51 +0,0 @@
-#!/bin/bash
-
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with`
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-source $(dirname $0)/eagle-env.sh
-source $(dirname $0)/hadoop-metric-init.sh
-
-
-##### add policies ##########
-echo ""
-echo "Importing policy: missingBlockPolicy "
-curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' \
- "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertDefinitionService" \
- -d '
- [
-     {
-       "prefix": "alertdef",
-       "tags": {
-         "site": "sandbox",
-         "application": "hadoopJmxMetricDataSource",
-         "policyId": "missingBlockPolicy",
-         "alertExecutorId": "hadoopJmxMetricAlertExecutor",
-         "policyType": "siddhiCEPEngine"
-       },
-       "description": "jmx metric ",
-       "policyDef": "{\"expression\":\"from hadoopJmxMetricEventStream[metric == \\\"hadoop.namenode.dfs.missingblocks\\\" and convert(value, \\\"long\\\") > 0]#window.externalTime(timestamp ,10 min) select metric, host, value, timestamp, component, site insert into tmp; \",\"type\":\"siddhiCEPEngine\"}",
-       "enabled": true,
-       "dedupeDef": "{\"alertDedupIntervalMin\":10,\"emailDedupIntervalMin\":10}",
-       "notificationDef": "[{\"notificationType\":\"eagleStore\"},{\"sender\":\"eagle@apache.org\",\"recipients\":\"eagle@apache.org\",\"subject\":\"missing block found.\",\"flavor\":\"email\",\"id\":\"email_1\",\"tplFileName\":\"\"}]"
-     }
- ]
- '
-
- ## Finished
-echo ""
-echo "Finished initialization for eagle topology"
-
-exit 0

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1517c132/eagle-hadoop-metric/src/main/resources/sanbox/namenodehanoactive-policy-import.sh
----------------------------------------------------------------------
diff --git a/eagle-hadoop-metric/src/main/resources/sanbox/namenodehanoactive-policy-import.sh b/eagle-hadoop-metric/src/main/resources/sanbox/namenodehanoactive-policy-import.sh
deleted file mode 100755
index 33a7210..0000000
--- a/eagle-hadoop-metric/src/main/resources/sanbox/namenodehanoactive-policy-import.sh
+++ /dev/null
@@ -1,52 +0,0 @@
-#!/bin/bash
-
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with`
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-source $(dirname $0)/eagle-env.sh
-source $(dirname $0)/hadoop-metric-init.sh
-
-
-##### add policies ##########
-
-echo ""
-echo "Importing policy: NamenodeHAHasNoActive "
-curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' \
- "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertDefinitionService" \
- -d '
- [
-     {
-       "prefix": "alertdef",
-       "tags": {
-         "site": "sandbox",
-         "application": "hadoopJmxMetricDataSource",
-         "policyId": "NamenodeHAHasNoActive",
-         "alertExecutorId": "hadoopJmxMetricAlertExecutor",
-         "policyType": "siddhiCEPEngine"
-       },
-       "description": "jmx metric ",
-       "policyDef": "{\"expression\":\"from hadoopJmxMetricEventStream[metric == \\\"hadoop.namenode.hastate.active.count\\\" and value == 0 ]select * insert into tmp; \",\"type\":\"siddhiCEPEngine\"}",
-       "enabled": true,
-       "dedupeDef": "{\"alertDedupIntervalMin\":10,\"emailDedupIntervalMin\":10}",
-       "notificationDef": "[{\"notificationType\":\"eagleStore\"},{\"sender\":\"eagle@apache.org\",\"recipients\":\"eagle@apache.org\",\"subject\":\"missing block found.\",\"flavor\":\"email\",\"id\":\"email_1\",\"tplFileName\":\"\"}]"
-     }
- ]
- '
-
- ## Finished
-echo ""
-echo "Finished initialization for eagle topology"
-
-exit 0

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1517c132/eagle-hadoop-metric/src/main/resources/sanbox/namenodehawithmorethanoneactive-policy-import.sh
----------------------------------------------------------------------
diff --git a/eagle-hadoop-metric/src/main/resources/sanbox/namenodehawithmorethanoneactive-policy-import.sh b/eagle-hadoop-metric/src/main/resources/sanbox/namenodehawithmorethanoneactive-policy-import.sh
deleted file mode 100755
index 2afa09e..0000000
--- a/eagle-hadoop-metric/src/main/resources/sanbox/namenodehawithmorethanoneactive-policy-import.sh
+++ /dev/null
@@ -1,53 +0,0 @@
-#!/bin/bash
-
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with`
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-source $(dirname $0)/eagle-env.sh
-source $(dirname $0)/hadoop-metric-init.sh
-
-
-##### add policies ##########
-
-echo ""
-echo "Importing policy: NamenodeHAWithMoreThanOneActive "
-curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' \
- "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertDefinitionService" \
- -d '
- [
-     {
-       "prefix": "alertdef",
-       "tags": {
-         "site": "sandbox",
-         "application": "hadoopJmxMetricDataSource",
-         "policyId": "NamenodeHAWithMoreThanOneActive",
-         "alertExecutorId": "hadoopJmxMetricAlertExecutor",
-         "policyType": "siddhiCEPEngine"
-       },
-       "description": "jmx metric ",
-       "policyDef": "{\"expression\":\"from hadoopJmxMetricEventStream[metric == \\\"hadoop.namenode.hastate.active.count\\\" and value > 1]select * insert into tmp; \",\"type\":\"siddhiCEPEngine\"}",
-       "enabled": true,
-       "dedupeDef": "{\"alertDedupIntervalMin\":10,\"emailDedupIntervalMin\":10}",
-       "notificationDef": "[{\"notificationType\":\"eagleStore\"},{\"sender\":\"eagle@apache.org\",\"recipients\":\"eagle@apache.org\",\"subject\":\"missing block found.\",\"flavor\":\"email\",\"id\":\"email_1\",\"tplFileName\":\"\"}]"
-     }
- ]
- '
-
- ## Finished
-echo ""
-echo "Finished initialization for eagle topology"
-
-exit 0
-

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1517c132/eagle-hadoop-metric/src/main/resources/sanbox/namenodelag-policy-import.sh
----------------------------------------------------------------------
diff --git a/eagle-hadoop-metric/src/main/resources/sanbox/namenodelag-policy-import.sh b/eagle-hadoop-metric/src/main/resources/sanbox/namenodelag-policy-import.sh
deleted file mode 100755
index 2ccd37e..0000000
--- a/eagle-hadoop-metric/src/main/resources/sanbox/namenodelag-policy-import.sh
+++ /dev/null
@@ -1,49 +0,0 @@
-#!/bin/bash
-
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with`
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-source $(dirname $0)/eagle-env.sh
-source $(dirname $0)/hadoop-metric-init.sh
-
-echo ""
-echo "Importing Policy: NameNodeLagPolicy"
-curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' \
- "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertDefinitionService" \
- -d '
- [
-     {
-       "prefix": "alertdef",
-       "tags": {
-         "site": "sandbox",
-         "application": "hadoopJmxMetricDataSource",
-         "policyId": "NameNodeLagPolicy",
-         "alertExecutorId": "hadoopJmxMetricAlertExecutor",
-         "policyType": "siddhiCEPEngine"
-       },
-       "description": "jmx metric ",
-       "policyDef": "{\"expression\":\"from every a = hadoopJmxMetricEventStream[metric==\\\"hadoop.namenode.journaltransaction.lastappliedorwrittentxid\\\"] -> b = hadoopJmxMetricEventStream[metric==a.metric and b.host != a.host and (max(convert(a.value, \\\"long\\\")) + 100) <= max(convert(value, \\\"long\\\"))] within 5 min select a.host as hostA, a.value as transactIdA, b.host as hostB, b.value as transactIdB insert into tmp; \",\"type\":\"siddhiCEPEngine\"}",
-       "enabled": true,
-       "dedupeDef": "{\"alertDedupIntervalMin\":10,\"emailDedupIntervalMin\":10}",
-       "notificationDef": "[{\"notificationType\":\"eagleStore\"},{\"sender\":\"eagle@apache.org\",\"recipients\":\"eagle@apache.org\",\"subject\":\"name node lag found.\",\"flavor\":\"email\",\"id\":\"email_1\",\"tplFileName\":\"\"}]"
-     }
- ]
- '
-
- ## Finished
-echo ""
-echo "Finished initialization for eagle topology"
-
-exit 0

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1517c132/eagle-hadoop-metric/src/main/resources/sanbox/nodecount-policy-import.sh
----------------------------------------------------------------------
diff --git a/eagle-hadoop-metric/src/main/resources/sanbox/nodecount-policy-import.sh b/eagle-hadoop-metric/src/main/resources/sanbox/nodecount-policy-import.sh
deleted file mode 100755
index 90e0114..0000000
--- a/eagle-hadoop-metric/src/main/resources/sanbox/nodecount-policy-import.sh
+++ /dev/null
@@ -1,51 +0,0 @@
-#!/bin/bash
-
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with`
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-source $(dirname $0)/eagle-env.sh
-source $(dirname $0)/hadoop-metric-init.sh
-
-
-##### add policies ##########
-echo ""
-echo "Importing policy: dataNodeCountPolicy "
-curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' \
- "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertDefinitionService" \
- -d '
- [
-     {
-       "prefix": "alertdef",
-       "tags": {
-         "site": "sandbox",
-         "application": "hadoopJmxMetricDataSource",
-         "policyId": "dataNodeCountPolicy",
-         "alertExecutorId": "hadoopJmxMetricAlertExecutor",
-         "policyType": "siddhiCEPEngine"
-       },
-       "description": "jmx metric ",
-       "policyDef": "{\"expression\":\"from every (e1 = hadoopJmxMetricEventStream[metric == \\\"hadoop.namenode.fsnamesystemstate.numlivedatanodes\\\" ]) -> e2 = hadoopJmxMetricEventStream[metric == e1.metric and host == e1.host and (convert(e1.value, \\\"long\\\") - 5) >= convert(value, \\\"long\\\") ] within 5 min select e1.metric, e1.host, e1.value as highNum, e1.timestamp as start, e2.value as lowNum, e2.timestamp as end insert into tmp; \",\"type\":\"siddhiCEPEngine\"}",
-       "enabled": true,
-       "dedupeDef": "{\"alertDedupIntervalMin\":10,\"emailDedupIntervalMin\":10}",
-       "notificationDef": "[{\"notificationType\":\"eagleStore\"},{\"sender\":\"eagle@apache.org\",\"recipients\":\"eagle@apache.org\",\"subject\":\"node count joggling found.\",\"flavor\":\"email\",\"id\":\"email_1\",\"tplFileName\":\"\"}]"
-     }
- ]
- '
-
- ## Finished
-echo ""
-echo "Finished initialization for eagle topology"
-
-exit 0

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1517c132/eagle-hadoop-metric/src/main/resources/sanbox/resourcemanagerhanoactive-policy-import.sh
----------------------------------------------------------------------
diff --git a/eagle-hadoop-metric/src/main/resources/sanbox/resourcemanagerhanoactive-policy-import.sh b/eagle-hadoop-metric/src/main/resources/sanbox/resourcemanagerhanoactive-policy-import.sh
deleted file mode 100755
index 95b9c35..0000000
--- a/eagle-hadoop-metric/src/main/resources/sanbox/resourcemanagerhanoactive-policy-import.sh
+++ /dev/null
@@ -1,53 +0,0 @@
-#
-#!/bin/bash
-
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with`
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-source $(dirname $0)/eagle-env.sh
-source $(dirname $0)/hadoop-metric-init.sh
-
-
-##### add policies ##########
-
-echo ""
-echo "Importing policy: ResourceManagerHAHasNoActive "
-curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' \
- "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertDefinitionService" \
- -d '
- [
-     {
-       "prefix": "alertdef",
-       "tags": {
-         "site": "sandbox",
-         "application": "hadoopJmxMetricDataSource",
-         "policyId": "ResourceManagerHAHasNoActive",
-         "alertExecutorId": "hadoopJmxMetricAlertExecutor",
-         "policyType": "siddhiCEPEngine"
-       },
-       "description": "jmx metric ",
-       "policyDef": "{\"expression\":\"from hadoopJmxMetricEventStream[metric == \\\"hadoop.resourcemanager.hastate.active.count\\\" and value == 0 ]select * insert into tmp; \",\"type\":\"siddhiCEPEngine\"}",
-       "enabled": true,
-       "dedupeDef": "{\"alertDedupIntervalMin\":10,\"emailDedupIntervalMin\":10}",
-       "notificationDef": "[{\"notificationType\":\"eagleStore\"},{\"sender\":\"eagle@apache.org\",\"recipients\":\"eagle@apache.org\",\"subject\":\"missing block found.\",\"flavor\":\"email\",\"id\":\"email_1\",\"tplFileName\":\"\"}]"
-     }
- ]
- '
-
- ## Finished
-echo ""
-echo "Finished initialization for eagle topology"
-
-exit 0

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1517c132/eagle-hadoop-metric/src/main/resources/sanbox/resourcemanagerhawithmorethanoneactive-policy-import.sh
----------------------------------------------------------------------
diff --git a/eagle-hadoop-metric/src/main/resources/sanbox/resourcemanagerhawithmorethanoneactive-policy-import.sh b/eagle-hadoop-metric/src/main/resources/sanbox/resourcemanagerhawithmorethanoneactive-policy-import.sh
deleted file mode 100755
index 269003c..0000000
--- a/eagle-hadoop-metric/src/main/resources/sanbox/resourcemanagerhawithmorethanoneactive-policy-import.sh
+++ /dev/null
@@ -1,54 +0,0 @@
-#
-#!/bin/bash
-
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with`
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-source $(dirname $0)/eagle-env.sh
-source $(dirname $0)/hadoop-metric-init.sh
-
-
-##### add policies ##########
-
-echo ""
-echo "Importing policy: ResourceManagerHAHasMoreThanOneActive "
-curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' \
- "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertDefinitionService" \
- -d '
- [
-     {
-       "prefix": "alertdef",
-       "tags": {
-         "site": "sandbox",
-         "application": "hadoopJmxMetricDataSource",
-         "policyId": "ResourceManagerHAHasMoreThanOneActive",
-         "alertExecutorId": "hadoopJmxMetricAlertExecutor",
-         "policyType": "siddhiCEPEngine",
-         "description":" Resource Manager HA Has More than one Active"
-       },
-       "description": "jmx metric ",
-       "policyDef": "{\"expression\":\"from hadoopJmxMetricEventStream[metric == \\\"hadoop.resourcemanager.hastate.active.count\\\" and value > 1 ]select * insert into tmp; \",\"type\":\"siddhiCEPEngine\"}",
-       "enabled": true,
-       "dedupeDef": "{\"alertDedupIntervalMin\":10,\"emailDedupIntervalMin\":10}",
-       "notificationDef": "[{\"notificationType\":\"eagleStore\"},{\"sender\":\"eagle@apache.org\",\"recipients\":\"eagle@apache.org\",\"subject\":\"missing block found.\",\"flavor\":\"email\",\"id\":\"email_1\",\"tplFileName\":\"\"}]"
-     }
- ]
- '
-
- ## Finished
-echo ""
-echo "Finished initialization for eagle topology"
-
-exit 0

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1517c132/eagle-hadoop-metric/src/main/resources/sanbox/safemodecheck-policy-import.sh
----------------------------------------------------------------------
diff --git a/eagle-hadoop-metric/src/main/resources/sanbox/safemodecheck-policy-import.sh b/eagle-hadoop-metric/src/main/resources/sanbox/safemodecheck-policy-import.sh
deleted file mode 100755
index 8f1d14f..0000000
--- a/eagle-hadoop-metric/src/main/resources/sanbox/safemodecheck-policy-import.sh
+++ /dev/null
@@ -1,51 +0,0 @@
-#!/bin/bash
-
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with`
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-source $(dirname $0)/eagle-env.sh
-source $(dirname $0)/hadoop-metric-init.sh
-
-
-##### add policies ##########
-echo ""
-echo "Importing policy: safeModePolicy "
-curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' \
- "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertDefinitionService" \
- -d '
- [
-     {
-       "prefix": "alertdef",
-       "tags": {
-         "site": "sandbox",
-         "application": "hadoopJmxMetricDataSource",
-         "policyId": "safeModePolicy",
-         "alertExecutorId": "hadoopJmxMetricAlertExecutor",
-         "policyType": "siddhiCEPEngine"
-       },
-       "description": "jmx metric ",
-       "policyDef": "{\"expression\":\"from hadoopJmxMetricEventStream[component==\\\"namenode\\\" and metric == \\\"hadoop.namenode.fsnamesystemstate.fsstate\\\" and convert(value, \\\"long\\\") > 0]#window.externalTime(timestamp ,10 min) select metric, host, value, timestamp, component, site insert into tmp; \",\"type\":\"siddhiCEPEngine\"}",
-       "enabled": true,
-       "dedupeDef": "{\"alertDedupIntervalMin\":10,\"emailDedupIntervalMin\":10}",
-       "notificationDef": "[{\"notificationType\":\"eagleStore\"},{\"sender\":\"eagle@apache.org\",\"recipients\":\"eagle@apache.org\",\"subject\":\"missing block found.\",\"flavor\":\"email\",\"id\":\"email_1\",\"tplFileName\":\"\"}]"
-     }
- ]
- '
-
- ## Finished
-echo ""
-echo "Finished initialization for eagle topology"
-
-exit 0

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1517c132/eagle-hadoop-metric/src/main/resources/streamdefinitions.json
----------------------------------------------------------------------
diff --git a/eagle-hadoop-metric/src/main/resources/streamdefinitions.json b/eagle-hadoop-metric/src/main/resources/streamdefinitions.json
new file mode 100644
index 0000000..66a2c98
--- /dev/null
+++ b/eagle-hadoop-metric/src/main/resources/streamdefinitions.json
@@ -0,0 +1,47 @@
+[
+  {
+    "streamId": "hadoopJmxMetricEventStream",
+    "dataSource": "hadoop_jmx_datasource",
+    "description": "the data stream for hadoop jmx metrics",
+    "validate": false,
+    "timeseries": false,
+    "columns": [
+      {
+        "name": "host",
+        "type": "STRING",
+        "defaultValue": "",
+        "required": true
+      },
+      {
+        "name": "timestamp",
+        "type": "LONG",
+        "defaultValue": 0,
+        "required": true
+      },
+      {
+        "name": "metric",
+        "type": "STRING",
+        "defaultValue": "default_hadoop_jmx_metric_name",
+        "required": true
+      },
+      {
+        "name": "component",
+        "type": "STRING",
+        "defaultValue": "namenode",
+        "required": true
+      },
+      {
+        "name": "site",
+        "type": "STRING",
+        "defaultValue": "hadoop",
+        "required": true
+      },
+      {
+        "name": "value",
+        "type": "DOUBLE",
+        "defaultValue": 0.0,
+        "required": true
+      }
+    ]
+  }
+]
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1517c132/eagle-hadoop-metric/src/test/java/org/apache/eagle/hadoop/metric/HadoopJmxMetricDeserializerTest.java
----------------------------------------------------------------------
diff --git a/eagle-hadoop-metric/src/test/java/org/apache/eagle/hadoop/metric/HadoopJmxMetricDeserializerTest.java b/eagle-hadoop-metric/src/test/java/org/apache/eagle/hadoop/metric/HadoopJmxMetricDeserializerTest.java
deleted file mode 100644
index 4c7fe6d..0000000
--- a/eagle-hadoop-metric/src/test/java/org/apache/eagle/hadoop/metric/HadoopJmxMetricDeserializerTest.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.hadoop.metric;
-
-import org.junit.Test;
-
-/**
- * Created on 1/19/16.
- */
-public class HadoopJmxMetricDeserializerTest {
-    @Test
-    public void test() {
-//        HadoopJmxMetricDeserializer des = new HadoopJmxMetricDeserializer(null);
-//
-//        String m = "{\"host\": \"hostname-1\", \"timestamp\": 1453208956395, \"metric\": \"hadoop.namenode.dfs.lastwrittentransactionid\", \"component\": \"namenode\", \"site\": \"sandbox\", \"value\": \"49716\"}";
-//        Object obj = des.deserialize(m.getBytes());
-//        Assert.assertTrue(obj instanceof Map);
-//        Map<String, Object> metric = (Map<String, Object>) obj;
-//        Assert.assertEquals("hostname-1" ,metric.get("host"));
-//        Assert.assertEquals(1453208956395l ,metric.get("timestamp"));
-    }
-}