You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by yo...@apache.org on 2016/08/08 00:46:15 UTC

[1/3] incubator-eagle git commit: HBase audit monitoring with new app framework https://issues.apache.org/jira/browse/EAGLE-420 Author: Yong Zhang Reviewer: Hao Chen

Repository: incubator-eagle
Updated Branches:
  refs/heads/develop 1d842563a -> 660bfbd3f


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-security/eagle-security-hbase-auditlog/src/main/resources/META-INF/metadata.xml
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hbase-auditlog/src/main/resources/META-INF/metadata.xml b/eagle-security/eagle-security-hbase-auditlog/src/main/resources/META-INF/metadata.xml
new file mode 100644
index 0000000..665b5d7
--- /dev/null
+++ b/eagle-security/eagle-security-hbase-auditlog/src/main/resources/META-INF/metadata.xml
@@ -0,0 +1,243 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+  ~ /*
+  ~  * Licensed to the Apache Software Foundation (ASF) under one or more
+  ~  * contributor license agreements.  See the NOTICE file distributed with
+  ~  * this work for additional information regarding copyright ownership.
+  ~  * The ASF licenses this file to You under the Apache License, Version 2.0
+  ~  * (the "License"); you may not use this file except in compliance with
+  ~  * the License.  You may obtain a copy of the License at
+  ~  * <p/>
+  ~  * http://www.apache.org/licenses/LICENSE-2.0
+  ~  * <p/>
+  ~  * Unless required by applicable law or agreed to in writing, software
+  ~  * distributed under the License is distributed on an "AS IS" BASIS,
+  ~  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~  * See the License for the specific language governing permissions and
+  ~  * limitations under the License.
+  ~  */
+  -->
+
+<application>
+    <type>HBaseAuditLogApplication</type>
+    <name>HBase Audit Log Monitoring Application</name>
+    <version>0.5.0-incubating</version>
+    <appClass>org.apache.eagle.security.hbase.HBaseAuditLogApplication</appClass>
+    <viewPath>/apps/example</viewPath>
+    <configuration>
+        <property>
+            <name>dataSourceConfig.topic</name>
+            <displayName>dataSourceConfig.topic</displayName>
+            <value>sandbox_hbase_audit_log</value>
+            <description>data source topic</description>
+        </property>
+        <property>
+            <name>dataSourceConfig.zkConnection</name>
+            <displayName>dataSourceConfig.zkConnection</displayName>
+            <value>localhost</value>
+            <description>zk connection</description>
+        </property>
+        <property>
+            <name>dataSourceConfig.zkConnectionTimeoutMS</name>
+            <displayName>dataSourceConfig.zkConnectionTimeoutMS</displayName>
+            <value>15000</value>
+            <description>zk connection timeout in milliseconds</description>
+        </property>
+        <property>
+            <name>dataSourceConfig.fetchSize</name>
+            <displayName>dataSourceConfig.fetchSize</displayName>
+            <value>1048586</value>
+            <description>kafka fetch size</description>
+        </property>
+        <property>
+            <name>dataSourceConfig.transactionZKServers</name>
+            <displayName>dataSourceConfig.transactionZKServers</displayName>
+            <value>localhost</value>
+            <description>zookeeper server for offset transaction</description>
+        </property>
+        <property>
+            <name>dataSourceConfig.transactionZKPort</name>
+            <displayName>dataSourceConfig.transactionZKPort</displayName>
+            <value>2181</value>
+            <description>zookeeper server port for offset transaction</description>
+        </property>
+        <property>
+            <name>dataSourceConfig.transactionZKRoot</name>
+            <displayName>dataSourceConfig.transactionZKRoot</displayName>
+            <value>/consumers</value>
+            <description>offset transaction root</description>
+        </property>
+        <property>
+            <name>dataSourceConfig.consumerGroupId</name>
+            <displayName>dataSourceConfig.consumerGroupId</displayName>
+            <value>eagle.hbaseaudit.consumer</value>
+            <description>kafka consumer group Id</description>
+        </property>
+        <property>
+            <name>dataSourceConfig.transactionStateUpdateMS</name>
+            <displayName>dataSourceConfig.transactionStateUpdateMS</displayName>
+            <value>2000</value>
+            <description>zk upate</description>
+        </property>
+        <property>
+            <name>dataSourceConfig.schemeCls</name>
+            <displayName>dataSourceConfig.schemeCls</displayName>
+            <value>storm.kafka.StringScheme</value>
+            <description>scheme class</description>
+        </property>
+        <property>
+            <name>dataSourceConfig.transactionZKPort</name>
+            <displayName>dataSourceConfig.transactionZKPort</displayName>
+            <value>2181</value>
+            <description>zookeeper server port for offset transaction</description>
+        </property>
+        <property>
+            <name>dataSourceConfig.transactionZKPort</name>
+            <displayName>dataSourceConfig.transactionZKPort</displayName>
+            <value>2181</value>
+            <description>zookeeper server port for offset transaction</description>
+        </property>
+        <property>
+            <name>topology.numOfSpoutTasks</name>
+            <displayName>topology.numOfSpoutTasks</displayName>
+            <value>2</value>
+            <description>number of spout tasks</description>
+        </property>
+        <property>
+            <name>topology.numOfParserTasks</name>
+            <displayName>topology.numOfParserTasks</displayName>
+            <value>2</value>
+            <description>number of parser tasks</description>
+        </property>
+        <property>
+            <name>topology.numOfJoinTasks</name>
+            <displayName>topology.numOfJoinTasks</displayName>
+            <value>2</value>
+            <description>number of external join tasks</description>
+        </property>
+        <property>
+            <name>topology.numOfSinkTasks</name>
+            <displayName>topology.numOfSinkTasks</displayName>
+            <value>2</value>
+            <description>number of sink tasks</description>
+        </property>
+        <property>
+            <name>eagleProps.dataJoinPollIntervalSec</name>
+            <displayName>eagleProps.dataJoinPollIntervalSec</displayName>
+            <value>30</value>
+            <description>interval in seconds for polling</description>
+        </property>
+        <property>
+            <name>eagleProps.eagleService.host</name>
+            <displayName>eagleProps.eagleService.host</displayName>
+            <value>localhost</value>
+            <description>eagle service host</description>
+        </property>
+        <property>
+            <name>eagleProps.eagleService.port</name>
+            <displayName>eagleProps.eagleService.port</displayName>
+            <value>8080</value>
+            <description>eagle service port</description>
+        </property>
+        <property>
+            <name>eagleProps.eagleService.username</name>
+            <displayName>eagleProps.eagleService.username</displayName>
+            <value>admin</value>
+            <description>eagle service username</description>
+        </property>
+        <property>
+            <name>eagleProps.eagleService.port</name>
+            <displayName>eagleProps.eagleService.password</displayName>
+            <value>secret</value>
+            <description>eagle service password</description>
+        </property>
+        <property>
+            <name>dataSinkConfig.topic</name>
+            <displayName>dataSinkConfig.topic</displayName>
+            <value>sandbox_hbase_audit_log_parsed</value>
+            <description>topic for kafka data sink</description>
+        </property>
+        <property>
+            <name>dataSinkConfig.brokerList</name>
+            <displayName>dataSinkConfig.brokerList</displayName>
+            <value>sandbox.hortonworks.com:6667</value>
+            <description>kafka broker list</description>
+        </property>
+        <property>
+            <name>dataSinkConfig.serializerClass</name>
+            <displayName>dataSinkConfig.serializerClass</displayName>
+            <value>kafka.serializer.StringEncoder</value>
+            <description>serializer class Kafka message value</description>
+        </property>
+        <property>
+            <name>dataSinkConfig.keySerializerClass</name>
+            <displayName>dataSinkConfig.keySerializerClass</displayName>
+            <value>kafka.serializer.StringEncoder</value>
+            <description>serializer class Kafka message key</description>
+        </property>
+        <property>
+            <name>metadata.store</name>
+            <displayName>metadata.store</displayName>
+            <value>org.apache.eagle.security.service.InMemMetadataDaoImpl</value>
+            <description>implementation class for metadata store</description>
+        </property>
+    </configuration>
+    <streams>
+        <stream>
+            <streamId>hbase_audit_log_stream</streamId>
+            <description>HBase Audit Log Stream</description>
+            <validate>true</validate>
+            <timeseries>true</timeseries>
+            <columns>
+                <column>
+                    <name>action</name>
+                    <type>string</type>
+                </column>
+                <column>
+                    <name>host</name>
+                    <type>string</type>
+                </column>
+                <column>
+                    <name>status</name>
+                    <type>string</type>
+                </column>
+                <column>
+                    <name>timestamp</name>
+                    <type>long</type>
+                </column>
+            </columns>
+        </stream>
+    </streams>
+    <docs>
+        <install>
+# Step 1: Create source kafka topic named "${site}_example_source_topic"
+
+./bin/kafka-topics.sh --create --topic example_source_topic --replication-factor 1 --replication 1
+
+# Step 2: Set up data collector to flow data into kafka topic in
+
+./bin/logstash -f log_collector.conf
+
+## `log_collector.conf` sample as following:
+
+input {
+
+}
+filter {
+
+}
+output{
+
+}
+
+# Step 3: start application
+
+# Step 4: monitor with featured portal or alert with policies
+        </install>
+        <uninstall>
+# Step 1: stop and uninstall application
+# Step 2: delete kafka topic named "${site}_example_source_topic"
+# Step 3: stop logstash
+        </uninstall>
+    </docs>
+</application>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-security/eagle-security-hbase-auditlog/src/main/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hbase-auditlog/src/main/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider b/eagle-security/eagle-security-hbase-auditlog/src/main/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider
new file mode 100644
index 0000000..e96f225
--- /dev/null
+++ b/eagle-security/eagle-security-hbase-auditlog/src/main/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider
@@ -0,0 +1,35 @@
+#
+# /*
+#  * Licensed to the Apache Software Foundation (ASF) under one or more
+#  * contributor license agreements.  See the NOTICE file distributed with
+#  * this work for additional information regarding copyright ownership.
+#  * The ASF licenses this file to You under the Apache License, Version 2.0
+#  * (the "License"); you may not use this file except in compliance with
+#  * the License.  You may obtain a copy of the License at
+#  * <p/>
+#  * http://www.apache.org/licenses/LICENSE-2.0
+#  * <p/>
+#  * Unless required by applicable law or agreed to in writing, software
+#  * distributed under the License is distributed on an "AS IS" BASIS,
+#  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  * See the License for the specific language governing permissions and
+#  * limitations under the License.
+#  */
+#
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.eagle.security.hbase.HBaseAuditLogAppProvider

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-security/eagle-security-hbase-auditlog/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hbase-auditlog/src/main/resources/application.conf b/eagle-security/eagle-security-hbase-auditlog/src/main/resources/application.conf
index bd38c83..5990450 100644
--- a/eagle-security/eagle-security-hbase-auditlog/src/main/resources/application.conf
+++ b/eagle-security/eagle-security-hbase-auditlog/src/main/resources/application.conf
@@ -14,8 +14,10 @@
 # limitations under the License.
 
 {
+  "appId" : "HBaseAuditLogApp",
+  "mode" : "LOCAL",
+  "siteId" : "testsite",
   "topology" : {
-    "localMode" : true,
     "numOfTotalWorkers" : 2,
     "numOfSpoutTasks" : 2,
     "numOfParserTasks" : 2,
@@ -39,7 +41,7 @@
     "dataJoinPollIntervalSec" : 30,
     "eagleService": {
       "host": "localhost",
-      "port": 58080
+      "port": 9090
       "username": "admin",
       "password": "secret"
     }
@@ -50,4 +52,4 @@
     "serializerClass" : "kafka.serializer.StringEncoder",
     "keySerializerClass" : "kafka.serializer.StringEncoder"
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-security/eagle-security-hbase-auditlog/src/main/resources/application.conf.bak
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hbase-auditlog/src/main/resources/application.conf.bak b/eagle-security/eagle-security-hbase-auditlog/src/main/resources/application.conf.bak
deleted file mode 100644
index 5c44574..0000000
--- a/eagle-security/eagle-security-hbase-auditlog/src/main/resources/application.conf.bak
+++ /dev/null
@@ -1,66 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-{
-  "envContextConfig" : {
-    "env" : "storm",
-    "mode" : "local",
-    "topologyName" : "sandbox-hbaseSecurityLog-topology",
-    "stormConfigFile" : "security-auditlog-storm.yaml",
-    "parallelismConfig" : {
-      "kafkaMsgConsumer" : 1,
-      "hbaseSecurityLogAlertExecutor*" : 1
-    }
-  },
-  "dataSourceConfig": {
-    "topic" : "sandbox_hbase_security_log",
-    "zkConnection" : "sandbox.hortonworks.com:2181",
-    "zkConnectionTimeoutMS" : 15000,
-    "consumerGroupId" : "EagleConsumer",
-    "fetchSize" : 1048586,
-    "deserializerClass" : "org.apache.eagle.security.hbase.HbaseAuditLogKafkaDeserializer",
-    "transactionZKServers" : "sandbox.hortonworks.com",
-    "transactionZKPort" : 2181,
-    "transactionZKRoot" : "/consumers",
-    "consumerGroupId" : "eagle.hbasesecurity.consumer",
-    "transactionStateUpdateMS" : 2000
-  },
-  "alertExecutorConfigs" : {
-     "hbaseSecurityLogAlertExecutor" : {
-       "parallelism" : 1,
-       "partitioner" : "org.apache.eagle.policy.DefaultPolicyPartitioner"
-       "needValidation" : "true"
-     }
-  },
-  "eagleProps" : {
-    "site" : "sandbox",
-    "application": "hbaseSecurityLog",
-    "dataJoinPollIntervalSec" : 30,
-    "mailHost" : "mailHost.com",
-    "mailSmtpPort":"25",
-    "mailDebug" : "true",
-    "eagleService": {
-      "host": "localhost",
-      "port": 9098
-      "username": "admin",
-      "password": "secret"
-    }
-  },
-  "dynamicConfigSource" : {
-    "enabled" : true,
-    "initDelayMillis" : 0,
-    "delayMillis" : 30000
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-security/eagle-security-hbase-auditlog/src/main/resources/metadata.xml
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hbase-auditlog/src/main/resources/metadata.xml b/eagle-security/eagle-security-hbase-auditlog/src/main/resources/metadata.xml
deleted file mode 100644
index 4b48c0a..0000000
--- a/eagle-security/eagle-security-hbase-auditlog/src/main/resources/metadata.xml
+++ /dev/null
@@ -1,91 +0,0 @@
-<?xml version="1.0" encoding="UTF-8" ?>
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one or more
-  ~ contributor license agreements.  See the NOTICE file distributed with
-  ~ this work for additional information regarding copyright ownership.
-  ~ The ASF licenses this file to You under the Apache License, Version 2.0
-  ~ (the "License"); you may not use this file except in compliance with
-  ~ the License.  You may obtain a copy of the License at
-  ~
-  ~    http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing, software
-  ~ distributed under the License is distributed on an "AS IS" BASIS,
-  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  ~ See the License for the specific language governing permissions and
-  ~ limitations under the License.
-  -->
-
-<application>
-    <type>HBaseAuditLogApplication</type>
-    <name>HBase Audit Log Monitoring Application</name>
-    <version>0.5.0-incubating</version>
-    <appClass>org.apache.eagle.security.hbase.HBaseAuditLogApplication</appClass>
-    <viewPath>/apps/example</viewPath>
-    <configuration>
-        <property>
-            <name>message</name>
-            <displayName>Message</displayName>
-            <value>Hello, example application!</value>
-            <description>Just an sample configuration property</description>
-        </property>
-    </configuration>
-    <streams>
-        <stream>
-            <streamId>hbase_audit_log_stream</streamId>
-            <description>HBase Audit Log Stream</description>
-            <validate>true</validate>
-            <timeseries>true</timeseries>
-            <columns>
-                <column>
-                    <name>action</name>
-                    <type>string</type>
-                </column>
-                <column>
-                    <name>host</name>
-                    <type>string</type>
-                </column>
-                <column>
-                    <name>status</name>
-                    <type>string</type>
-                </column>
-                <column>
-                    <name>timestamp</name>
-                    <type>long</type>
-                </column>
-            </columns>
-        </stream>
-    </streams>
-    <docs>
-        <install>
-# Step 1: Create source kafka topic named "${site}_example_source_topic"
-
-./bin/kafka-topics.sh --create --topic example_source_topic --replication-factor 1 --replication 1
-
-# Step 2: Set up data collector to flow data into kafka topic in
-
-./bin/logstash -f log_collector.conf
-
-## `log_collector.conf` sample as following:
-
-input {
-
-}
-filter {
-
-}
-output{
-
-}
-
-# Step 3: start application
-
-# Step 4: monitor with featured portal or alert with policies
-        </install>
-        <uninstall>
-# Step 1: stop and uninstall application
-# Step 2: delete kafka topic named "${site}_example_source_topic"
-# Step 3: stop logstash
-        </uninstall>
-    </docs>
-</application>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-security/eagle-security-hbase-auditlog/src/main/resources/scripts
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hbase-auditlog/src/main/resources/scripts b/eagle-security/eagle-security-hbase-auditlog/src/main/resources/scripts
new file mode 100644
index 0000000..7d04fdc
--- /dev/null
+++ b/eagle-security/eagle-security-hbase-auditlog/src/main/resources/scripts
@@ -0,0 +1,22 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+./kafka-topics.sh --topic sandbox_hbase_audit_log --zookeeper localhost:2181 --create --replication-factor 1 --partitions 2
+
+./kafka-topics.sh --topic sandbox_hbase_audit_log_parsed --zookeeper localhost:2181 --create --replication-factor 1 --partitions 2
+
+./kafka-console-producer.sh --topic sandbox_hbase_audit_log --broker-list sandbox.hortonworks.com:6667
+
+./kafka-console-consumer.sh --topic sandbox_hbase_audit_log_parsed --zookeeper sandbox.hortonworks.com:2181 --from-beginning

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-security/eagle-security-hbase-auditlog/src/test/java/org/apache/eagle/security/hbase/TestHbaseAuditLogProcessTopology.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hbase-auditlog/src/test/java/org/apache/eagle/security/hbase/TestHbaseAuditLogProcessTopology.java b/eagle-security/eagle-security-hbase-auditlog/src/test/java/org/apache/eagle/security/hbase/TestHbaseAuditLogProcessTopology.java
deleted file mode 100644
index 6abc966..0000000
--- a/eagle-security/eagle-security-hbase-auditlog/src/test/java/org/apache/eagle/security/hbase/TestHbaseAuditLogProcessTopology.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-
-package org.apache.eagle.security.hbase;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import com.typesafe.config.ConfigParseOptions;
-import com.typesafe.config.ConfigSyntax;
-import org.apache.eagle.common.config.EagleConfigConstants;
-import org.junit.Test;
-
-
-public class TestHbaseAuditLogProcessTopology {
-    @Test
-    public void test() throws Exception {
-        //Config baseConfig = ConfigFactory.load("eagle-scheduler.conf");
-        ConfigParseOptions options = ConfigParseOptions.defaults()
-                .setSyntax(ConfigSyntax.PROPERTIES)
-                .setAllowMissing(false);
-        String topoConfigStr = "web.hbase.zookeeper.property.clientPort=2181\nweb.hbase.zookeeper.quorum=sandbox.hortonworks.com\n\napp.envContextConfig.env=storm\napp.envContextConfig.mode=local\napp.dataSourceConfig.topic=sandbox_hbase_security_log\napp.dataSourceConfig.zkConnection=sandbox.hortonworks.com:2181\napp.dataSourceConfig.zkConnectionTimeoutMS=15000\napp.dataSourceConfig.brokerZkPath=/brokers\napp.dataSourceConfig.fetchSize=1048586\napp.dataSourceConfig.transactionZKServers=sandbox.hortonworks.com\napp.dataSourceConfig.transactionZKPort=2181\napp.dataSourceConfig.transactionZKRoot=/consumers\napp.dataSourceConfig.consumerGroupId=eagle.hbasesecurity.consumer\napp.dataSourceConfig.transactionStateUpdateMS=2000\napp.dataSourceConfig.deserializerClass=org.apache.eagle.security.hbase.HbaseAuditLogKafkaDeserializer\napp.eagleProps.site=sandbox\napp.eagleProps.application=hbaseSecurityLog\napp.eagleProps.dataJoinPollIntervalSec=30\napp.eagleProps.mailHost=mailHost.com\napp.eag
 leProps.mailSmtpPort=25\napp.eagleProps.mailDebug=true\napp.eagleProps.eagleService.host=localhost\napp.eagleProps.eagleService.port=9098\napp.eagleProps.eagleService.username=admin\napp.eagleProps.eagleService.password=secret";
-
-        Config topoConfig = ConfigFactory.parseString(topoConfigStr, options);
-        Config conf = topoConfig.getConfig(EagleConfigConstants.APP_CONFIG);
-
-        HbaseAuditLogMonitoringMain topology = new HbaseAuditLogMonitoringMain();
-        //topology.submit("", conf);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-security/eagle-security-hbase-web/src/main/java/org/apache/eagle/service/security/hbase/SensitivityMetadataResource.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hbase-web/src/main/java/org/apache/eagle/service/security/hbase/SensitivityMetadataResource.java b/eagle-security/eagle-security-hbase-web/src/main/java/org/apache/eagle/service/security/hbase/SensitivityMetadataResource.java
new file mode 100644
index 0000000..7b85c69
--- /dev/null
+++ b/eagle-security/eagle-security-hbase-web/src/main/java/org/apache/eagle/service/security/hbase/SensitivityMetadataResource.java
@@ -0,0 +1,64 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  * <p/>
+ *  * http://www.apache.org/licenses/LICENSE-2.0
+ *  * <p/>
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+package org.apache.eagle.service.security.hbase;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.typesafe.config.Config;
+import org.apache.eagle.metadata.model.ApplicationEntity;
+import org.apache.eagle.metadata.service.ApplicationEntityService;
+import org.apache.eagle.security.service.HBaseSensitivityEntity;
+import org.apache.eagle.security.service.ISecurityMetadataDAO;
+import org.apache.eagle.security.service.MetadataDaoFactory;
+
+import javax.ws.rs.*;
+import java.util.Collection;
+import java.util.Iterator;
+
+/**
+ * Since 6/10/16.
+ */
+@Path("/metadata/sensitivity")
+@Singleton
+public class SensitivityMetadataResource {
+    private ApplicationEntityService entityService;
+    private Config config;
+    private ISecurityMetadataDAO dao;
+    @Inject
+    public SensitivityMetadataResource(ApplicationEntityService entityService, Config eagleServerConfig){
+        this.entityService = entityService;
+        this.config = eagleServerConfig;
+        String metadataStoreCls = eagleServerConfig.getString("metadata.store");
+        dao = MetadataDaoFactory.getMetadataDAO(metadataStoreCls);
+    }
+
+    @Path("/hbase")
+    @GET
+    @Produces("application/json")
+    public Collection<HBaseSensitivityEntity> getHBaseSensitivites(@QueryParam("site") String site){
+        return dao.listHBaseSensitivies();
+    }
+
+    @Path("/hbase")
+    @POST
+    @Consumes("application/json")
+    public void addHBaseSensitivities(Collection<HBaseSensitivityEntity> list){
+        dao.addHBaseSensitivity(list);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-server/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-server/pom.xml b/eagle-server/pom.xml
index 6617693..6083616 100644
--- a/eagle-server/pom.xml
+++ b/eagle-server/pom.xml
@@ -58,6 +58,10 @@
                     <groupId>com.sun.jersey</groupId>
                     <artifactId>*</artifactId>
                 </exclusion>
+                <exclusion>
+                    <groupId>com.sun.jersey.contribs</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
             </exclusions>
         </dependency>
         <dependency>
@@ -134,6 +138,16 @@
             <artifactId>eagle-jpm-app</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.eagle</groupId>
+            <artifactId>eagle-security-hbase-auditlog</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.eagle</groupId>
+            <artifactId>eagle-security-hbase-web</artifactId>
+            <version>${project.version}</version>
+        </dependency>
     </dependencies>
     <build>
         <resources>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-server/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-server/src/main/resources/application.conf b/eagle-server/src/main/resources/application.conf
index 19b87b2..ce75909 100644
--- a/eagle-server/src/main/resources/application.conf
+++ b/eagle-server/src/main/resources/application.conf
@@ -51,4 +51,4 @@
 			"nimbusThriftPort": 6627
 		}
 	}
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-server/src/main/resources/configuration.yml
----------------------------------------------------------------------
diff --git a/eagle-server/src/main/resources/configuration.yml b/eagle-server/src/main/resources/configuration.yml
new file mode 100644
index 0000000..c671ade
--- /dev/null
+++ b/eagle-server/src/main/resources/configuration.yml
@@ -0,0 +1,21 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+server:
+  applicationConnectors:
+    - type: http
+      port: 9090
+  adminConnectors:
+    - type: http
+      port: 9091

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-server/src/test/java/org/apache/eagle/server/ServerApplicationTest.java
----------------------------------------------------------------------
diff --git a/eagle-server/src/test/java/org/apache/eagle/server/ServerApplicationTest.java b/eagle-server/src/test/java/org/apache/eagle/server/ServerApplicationTest.java
index 1ef5127..823981f 100644
--- a/eagle-server/src/test/java/org/apache/eagle/server/ServerApplicationTest.java
+++ b/eagle-server/src/test/java/org/apache/eagle/server/ServerApplicationTest.java
@@ -21,6 +21,6 @@ import org.junit.Test;
 public class ServerApplicationTest {
     @Test
     public void testServerMain() throws Exception {
-        ServerMain.main(new String[]{"server"});
+        ServerMain.main(new String[]{"server", "src/test/resources/configuration.yml"});
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-server/src/test/resources/configuration.yml
----------------------------------------------------------------------
diff --git a/eagle-server/src/test/resources/configuration.yml b/eagle-server/src/test/resources/configuration.yml
new file mode 100644
index 0000000..c671ade
--- /dev/null
+++ b/eagle-server/src/test/resources/configuration.yml
@@ -0,0 +1,21 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+server:
+  applicationConnectors:
+    - type: http
+      port: 9090
+  adminConnectors:
+    - type: http
+      port: 9091


[2/3] incubator-eagle git commit: HBase audit monitoring with new app framework https://issues.apache.org/jira/browse/EAGLE-420 Author: Yong Zhang Reviewer: Hao Chen

Posted by yo...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/StreamSinkProvider.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/StreamSinkProvider.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/StreamSinkProvider.java
index 7c64e50..60f49ef 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/StreamSinkProvider.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/StreamSinkProvider.java
@@ -1,46 +1,47 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.sink;
-
-import org.apache.eagle.app.Configuration;
-import org.apache.eagle.metadata.model.StreamSinkConfig;
-
-import java.lang.reflect.ParameterizedType;
-
-public interface StreamSinkProvider<S extends StreamSink<D>,D extends StreamSinkConfig>{
-    /**
-     * @param streamId
-     * @param appConfig
-     * @return
-     */
-    D getSinkConfig(String streamId, Configuration appConfig);
-    S getSink();
-
-    default S getSink(String streamId, Configuration appConfig){
-        S s = getSink();
-        s.init(streamId,getSinkConfig(streamId,appConfig));
-        return s;
-    }
-
-    default Class<? extends S> getSinkType(){
-        return (Class<S>) ((ParameterizedType) getClass().getGenericSuperclass()).getActualTypeArguments()[0];
-    }
-
-    default Class<? extends D> getSinkConfigType(){
-        return (Class<D>) ((ParameterizedType) getClass().getGenericSuperclass()).getActualTypeArguments()[1];
-    }
-}
\ No newline at end of file
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.sink;
+
+import com.typesafe.config.Config;
+import org.apache.eagle.app.Configuration;
+import org.apache.eagle.metadata.model.StreamSinkConfig;
+
+import java.lang.reflect.ParameterizedType;
+
+public interface StreamSinkProvider<S extends StreamSink<D>,D extends StreamSinkConfig>{
+    /**
+     * @param streamId
+     * @param config
+     * @return
+     */
+    D getSinkConfig(String streamId, Config config);
+    S getSink();
+
+    default S getSink(String streamId, Config config){
+        S s = getSink();
+        s.init(streamId,getSinkConfig(streamId,config));
+        return s;
+    }
+
+    default Class<? extends S> getSinkType(){
+        return (Class<S>) ((ParameterizedType) getClass().getGenericSuperclass()).getActualTypeArguments()[0];
+    }
+
+    default Class<? extends D> getSinkConfigType(){
+        return (Class<D>) ((ParameterizedType) getClass().getGenericSuperclass()).getActualTypeArguments()[1];
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/AbstractApplicationProvider.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/AbstractApplicationProvider.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/AbstractApplicationProvider.java
index 20816db..bf1e587 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/AbstractApplicationProvider.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/AbstractApplicationProvider.java
@@ -128,4 +128,4 @@ public abstract class AbstractApplicationProvider<T extends Application> impleme
     public ApplicationDesc getApplicationDesc() {
         return applicationDesc;
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/ApplicationProvider.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/ApplicationProvider.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/ApplicationProvider.java
index ace0c45..be84f0c 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/ApplicationProvider.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/ApplicationProvider.java
@@ -34,4 +34,4 @@ public interface ApplicationProvider<T extends Application> {
      * @return application instance
      */
     T getApplication();
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ServerSimulatorImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ServerSimulatorImpl.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ServerSimulatorImpl.java
index 2c686d9..1ef91ff 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ServerSimulatorImpl.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ServerSimulatorImpl.java
@@ -54,8 +54,11 @@ public class ServerSimulatorImpl extends ServerSimulator {
         SiteEntity siteEntity = getUniqueSite();
         siteResource.createSite(siteEntity);
         Assert.assertNotNull(siteEntity.getUuid());
+        ApplicationOperations.InstallOperation installOperation = new ApplicationOperations.InstallOperation(siteEntity.getSiteId(),appType, ApplicationEntity.Mode.LOCAL);
+        installOperation.setConfiguration(appConfig);
         // Install application
-        ApplicationEntity applicationEntity = applicationResource.installApplication(new ApplicationOperations.InstallOperation(siteEntity.getSiteId(),appType, ApplicationEntity.Mode.LOCAL)).getData();
+        ApplicationEntity applicationEntity =
+                applicationResource.installApplication(installOperation).getData();
         // Start application
         applicationResource.startApplication(new ApplicationOperations.StartOperation(applicationEntity.getUuid()));
     }
@@ -86,4 +89,4 @@ public class ServerSimulatorImpl extends ServerSimulator {
             throw new IllegalStateException(e.getMessage(),e);
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/TestStormApplication.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/TestStormApplication.java b/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/TestStormApplication.java
index 0558454..f58f0aa 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/TestStormApplication.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/TestStormApplication.java
@@ -1,84 +1,90 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app;
-
-import backtype.storm.generated.StormTopology;
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.TopologyBuilder;
-import backtype.storm.topology.base.BaseRichSpout;
-import backtype.storm.tuple.Fields;
-import org.apache.eagle.app.environment.impl.StormEnvironment;
-import org.apache.eagle.app.spi.AbstractApplicationProvider;
-import org.junit.Ignore;
-
-import java.util.Arrays;
-import java.util.Map;
-
-@Ignore
-public class TestStormApplication extends StormApplication<TestStormApplication.TestStormAppConfig>{
-    @Override
-    public StormTopology execute(TestStormAppConfig config, StormEnvironment environment) {
-        TopologyBuilder builder = new TopologyBuilder();
-        builder.setSpout("metric_spout", new RandomEventSpout(), config.getSpoutNum());
-        builder.setBolt("sink_1",environment.getFlattenStreamSink("TEST_STREAM_1",config)).fieldsGrouping("metric_spout",new Fields("metric"));
-        builder.setBolt("sink_2",environment.getFlattenStreamSink("TEST_STREAM_2",config)).fieldsGrouping("metric_spout",new Fields("metric"));
-        return builder.createTopology();
-    }
-
-    public final static class TestStormAppConfig extends Configuration{
-        private int spoutNum = 1;
-
-        public int getSpoutNum() {
-            return spoutNum;
-        }
-
-        public void setSpoutNum(int spoutNum) {
-            this.spoutNum = spoutNum;
-        }
-    }
-
-    private class RandomEventSpout extends BaseRichSpout {
-        private SpoutOutputCollector _collector;
-        @Override
-        public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
-            _collector = spoutOutputCollector;
-        }
-
-        @Override
-        public void nextTuple() {
-            _collector.emit(Arrays.asList("disk.usage",System.currentTimeMillis(),"host_1",56.7));
-            _collector.emit(Arrays.asList("cpu.usage",System.currentTimeMillis(),"host_2",99.8));
-        }
-
-        @Override
-        public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
-            outputFieldsDeclarer.declare(new Fields("metric","timestamp","source","value"));
-        }
-    }
-
-    public final static class Provider extends AbstractApplicationProvider<TestStormApplication> {
-        public Provider(){
-            super("TestApplicationMetadata.xml");
-        }
-        @Override
-        public TestStormApplication getApplication() {
-            return new TestStormApplication();
-        }
-    }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app;
+
+import backtype.storm.generated.StormTopology;
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.topology.base.BaseRichSpout;
+import backtype.storm.tuple.Fields;
+import com.typesafe.config.Config;
+import org.apache.eagle.app.environment.impl.StormEnvironment;
+import org.apache.eagle.app.spi.AbstractApplicationProvider;
+import org.junit.Ignore;
+
+import java.util.Arrays;
+import java.util.Map;
+
+@Ignore
+public class TestStormApplication extends StormApplication<TestStormApplication.TestStormAppConfig>{
+    @Override
+    public StormTopology execute(TestStormAppConfig config, StormEnvironment environment){
+        return null;
+    }
+
+    @Override
+    public StormTopology execute(Config config, StormEnvironment environment) {
+        TopologyBuilder builder = new TopologyBuilder();
+        builder.setSpout("metric_spout", new RandomEventSpout(), config.getInt("spoutNum"));
+        builder.setBolt("sink_1",environment.getFlattenStreamSink("TEST_STREAM_1",config)).fieldsGrouping("metric_spout",new Fields("metric"));
+        builder.setBolt("sink_2",environment.getFlattenStreamSink("TEST_STREAM_2",config)).fieldsGrouping("metric_spout",new Fields("metric"));
+        return builder.createTopology();
+    }
+
+    public final static class TestStormAppConfig extends Configuration{
+        private int spoutNum = 1;
+
+        public int getSpoutNum() {
+            return spoutNum;
+        }
+
+        public void setSpoutNum(int spoutNum) {
+            this.spoutNum = spoutNum;
+        }
+    }
+
+    private class RandomEventSpout extends BaseRichSpout {
+        private SpoutOutputCollector _collector;
+        @Override
+        public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
+            _collector = spoutOutputCollector;
+        }
+
+        @Override
+        public void nextTuple() {
+            _collector.emit(Arrays.asList("disk.usage",System.currentTimeMillis(),"host_1",56.7));
+            _collector.emit(Arrays.asList("cpu.usage",System.currentTimeMillis(),"host_2",99.8));
+        }
+
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
+            outputFieldsDeclarer.declare(new Fields("metric","timestamp","source","value"));
+        }
+    }
+
+    public final static class Provider extends AbstractApplicationProvider<TestStormApplication> {
+        public Provider(){
+            super("TestApplicationMetadata.xml");
+        }
+        @Override
+        public TestStormApplication getApplication() {
+            return new TestStormApplication();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/storm/MockStormApplication.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/storm/MockStormApplication.java b/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/storm/MockStormApplication.java
index 3db5f20..bbdfbfa 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/storm/MockStormApplication.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/storm/MockStormApplication.java
@@ -1,97 +1,94 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.storm;
-
-import backtype.storm.generated.StormTopology;
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.TopologyBuilder;
-import backtype.storm.topology.base.BaseRichSpout;
-import backtype.storm.tuple.Fields;
-import org.apache.eagle.app.Configuration;
-import org.apache.eagle.app.StormApplication;
-import org.apache.eagle.app.environment.impl.StormEnvironment;
-
-import java.util.Arrays;
-import java.util.Map;
-
-public class MockStormApplication extends StormApplication<MockStormApplication.MockStormConfiguration> {
-    private MockStormConfiguration appConfig;
-
-    @Override
-    public StormTopology execute(MockStormConfiguration config, StormEnvironment environment) {
-        this.setAppConfig(config);
-        TopologyBuilder builder = new TopologyBuilder();
-        builder.setSpout("metric_spout", new RandomEventSpout(), config.getSpoutNum());
-        builder.setBolt("sink_1",environment.getFlattenStreamSink("TEST_STREAM_1",config)).fieldsGrouping("metric_spout",new Fields("metric"));
-        builder.setBolt("sink_2",environment.getFlattenStreamSink("TEST_STREAM_2",config)).fieldsGrouping("metric_spout",new Fields("metric"));
-        return builder.createTopology();
-    }
-
-    public MockStormConfiguration getAppConfig() {
-        return appConfig;
-    }
-
-    private void setAppConfig(MockStormConfiguration appConfig) {
-        this.appConfig = appConfig;
-    }
-
-    /**
-     * TODO: Load configuration from name space in application className
-     * Application Configuration
-     */
-    static class MockStormConfiguration extends Configuration {
-        private int spoutNum = 1;
-        private boolean loaded = false;
-
-        public int getSpoutNum() {
-            return spoutNum;
-        }
-
-        public void setSpoutNum(int spoutNum) {
-            this.spoutNum = spoutNum;
-        }
-
-        public boolean isLoaded() {
-            return loaded;
-        }
-
-        public void setLoaded(boolean loaded) {
-            this.loaded = loaded;
-        }
-    }
-
-    private class RandomEventSpout extends BaseRichSpout {
-        private SpoutOutputCollector _collector;
-        @Override
-        public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
-            _collector = spoutOutputCollector;
-        }
-
-        @Override
-        public void nextTuple() {
-            _collector.emit(Arrays.asList("disk.usage",System.currentTimeMillis(),"host_1",56.7));
-            _collector.emit(Arrays.asList("cpu.usage",System.currentTimeMillis(),"host_2",99.8));
-        }
-
-        @Override
-        public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
-            outputFieldsDeclarer.declare(new Fields("metric","timestamp","source","value"));
-        }
-    }
-}
\ No newline at end of file
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.storm;
+
+import backtype.storm.generated.StormTopology;
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.topology.base.BaseRichSpout;
+import backtype.storm.tuple.Fields;
+import com.typesafe.config.Config;
+import org.apache.eagle.app.Configuration;
+import org.apache.eagle.app.StormApplication;
+import org.apache.eagle.app.environment.impl.StormEnvironment;
+
+import java.util.Arrays;
+import java.util.Map;
+
+public class MockStormApplication extends StormApplication<MockStormApplication.MockStormConfiguration> {
+    private MockStormConfiguration appConfig;
+
+    @Override
+    public StormTopology execute(MockStormConfiguration config, StormEnvironment environment) {
+        return null;
+    }
+
+    @Override
+    public StormTopology execute(Config config, StormEnvironment environment) {
+        TopologyBuilder builder = new TopologyBuilder();
+        builder.setSpout("metric_spout", new RandomEventSpout(), config.getInt("spoutNum"));
+        builder.setBolt("sink_1",environment.getFlattenStreamSink("TEST_STREAM_1",config)).fieldsGrouping("metric_spout",new Fields("metric"));
+        builder.setBolt("sink_2",environment.getFlattenStreamSink("TEST_STREAM_2",config)).fieldsGrouping("metric_spout",new Fields("metric"));
+        return builder.createTopology();
+    }
+
+    /**
+     * TODO: Load configuration from name space in application className
+     * Application Configuration
+     */
+    static class MockStormConfiguration extends Configuration {
+        private int spoutNum = 1;
+        private boolean loaded = false;
+
+        public int getSpoutNum() {
+            return spoutNum;
+        }
+
+        public void setSpoutNum(int spoutNum) {
+            this.spoutNum = spoutNum;
+        }
+
+        public boolean isLoaded() {
+            return loaded;
+        }
+
+        public void setLoaded(boolean loaded) {
+            this.loaded = loaded;
+        }
+    }
+
+    private class RandomEventSpout extends BaseRichSpout {
+        private SpoutOutputCollector _collector;
+        @Override
+        public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
+            _collector = spoutOutputCollector;
+        }
+
+        @Override
+        public void nextTuple() {
+            _collector.emit(Arrays.asList("disk.usage",System.currentTimeMillis(),"host_1",56.7));
+            _collector.emit(Arrays.asList("cpu.usage",System.currentTimeMillis(),"host_2",99.8));
+        }
+
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
+            outputFieldsDeclarer.declare(new Fields("metric","timestamp","source","value"));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/storm/MockStormApplicationTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/storm/MockStormApplicationTest.java b/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/storm/MockStormApplicationTest.java
index 8e05b5e..32dae23 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/storm/MockStormApplicationTest.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/storm/MockStormApplicationTest.java
@@ -1,75 +1,51 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.storm;
-
-import com.typesafe.config.ConfigFactory;
-import org.apache.eagle.app.environment.impl.StormEnvironment;
-import org.apache.eagle.app.utils.DynamicJarPathFinder;
-import org.apache.eagle.metadata.model.ApplicationEntity;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.HashMap;
-
-public class MockStormApplicationTest {
-    @Test
-    public void testGetConfigClass(){
-        MockStormApplication mockStormApplication = new MockStormApplication();
-        Assert.assertEquals(MockStormApplication.MockStormConfiguration.class,mockStormApplication.getConfigType());
-    }
-
-    @Test
-    public void testGetConfigFromMap(){
-        MockStormApplication mockStormApplication = new MockStormApplication();
-        mockStormApplication.execute(new HashMap<String,Object>(){
-            {
-                put("spoutNum",1234);
-                put("loaded",true);
-                put("mode", ApplicationEntity.Mode.CLUSTER);
-            }
-        },new StormEnvironment(ConfigFactory.load()));
-        Assert.assertTrue(mockStormApplication.getAppConfig().isLoaded());
-        Assert.assertEquals(1234,mockStormApplication.getAppConfig().getSpoutNum());
-        Assert.assertEquals(ApplicationEntity.Mode.CLUSTER,mockStormApplication.getAppConfig().getMode());
-    }
-
-    @Test
-    public void testGetConfigFromEnvironmentConfigFile(){
-        MockStormApplication mockStormApplication = new MockStormApplication();
-        mockStormApplication.execute(new StormEnvironment(ConfigFactory.load()));
-        Assert.assertTrue(mockStormApplication.getAppConfig().isLoaded());
-        Assert.assertEquals(3,mockStormApplication.getAppConfig().getSpoutNum());
-        Assert.assertEquals(ApplicationEntity.Mode.LOCAL,mockStormApplication.getAppConfig().getMode());
-    }
-
-    @Test
-    public void testRunApplicationWithSysConfig(){
-        new MockStormApplication().run();
-    }
-
-    @Test
-    public void testRunApplicationWithAppConfig() throws InterruptedException {
-        MockStormApplication.MockStormConfiguration appConfig = new MockStormApplication.MockStormConfiguration();
-        appConfig.setJarPath(DynamicJarPathFinder.findPath(MockStormApplication.class));
-        appConfig.setSiteId("test_site");
-        appConfig.setAppId("test_application_storm_topology");
-        appConfig.setMode(ApplicationEntity.Mode.LOCAL);
-        appConfig.setLoaded(true);
-        appConfig.setSpoutNum(4);
-        new MockStormApplication().run(appConfig);
-    }
-}
\ No newline at end of file
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.storm;
+
+import com.typesafe.config.ConfigFactory;
+import org.apache.eagle.app.environment.impl.StormEnvironment;
+import org.apache.eagle.app.utils.DynamicJarPathFinder;
+import org.apache.eagle.metadata.model.ApplicationEntity;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.HashMap;
+
+public class MockStormApplicationTest {
+    @Test
+    public void testGetConfigClass(){
+        MockStormApplication mockStormApplication = new MockStormApplication();
+        Assert.assertEquals(MockStormApplication.MockStormConfiguration.class,mockStormApplication.getConfigType());
+    }
+
+    @Test
+    public void testRunApplicationWithSysConfig(){
+        new MockStormApplication().run();
+    }
+
+    @Test
+    public void testRunApplicationWithAppConfig() throws InterruptedException {
+        MockStormApplication.MockStormConfiguration appConfig = new MockStormApplication.MockStormConfiguration();
+        appConfig.setJarPath(DynamicJarPathFinder.findPath(MockStormApplication.class));
+        appConfig.setSiteId("test_site");
+        appConfig.setAppId("test_application_storm_topology");
+        appConfig.setMode(ApplicationEntity.Mode.LOCAL);
+        appConfig.setLoaded(true);
+        appConfig.setSpoutNum(4);
+        new MockStormApplication().run(appConfig);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-core/eagle-app/eagle-app-base/src/test/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/test/resources/application.conf b/eagle-core/eagle-app/eagle-app-base/src/test/resources/application.conf
index 9f154f9..64f0974 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/test/resources/application.conf
+++ b/eagle-core/eagle-app/eagle-app-base/src/test/resources/application.conf
@@ -52,10 +52,15 @@
 		}
 	}
 
-	"org.apache.eagle.app.storm.MockStormApplication": {
-		"spoutNum": 3
-		"loaded": true
-		"mode":"LOCAL",
-		"appId":"test_topology_name"
+	"appId":"test_topology_name"
+	"spoutNum": 3
+	"loaded": true
+	"mode":"LOCAL"
+
+	"dataSinkConfig": {
+		"topic" : "test_topic",
+		"brokerList" : "sandbox.hortonworks.com:6667",
+		"serializerClass" : "kafka.serializer.StringEncoder",
+		"keySerializerClass" : "kafka.serializer.StringEncoder"
 	}
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-core/eagle-app/eagle-application-service/src/main/java/org/apache/eagle/service/application/ApplicationManagementResource.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-application-service/src/main/java/org/apache/eagle/service/application/ApplicationManagementResource.java b/eagle-core/eagle-app/eagle-application-service/src/main/java/org/apache/eagle/service/application/ApplicationManagementResource.java
index 6e4521d..c651b6b 100644
--- a/eagle-core/eagle-app/eagle-application-service/src/main/java/org/apache/eagle/service/application/ApplicationManagementResource.java
+++ b/eagle-core/eagle-app/eagle-application-service/src/main/java/org/apache/eagle/service/application/ApplicationManagementResource.java
@@ -21,7 +21,6 @@ package org.apache.eagle.service.application;
 
 import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
 import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
-import org.apache.eagle.policy.common.Constants;
 import org.apache.eagle.service.application.dao.ApplicationManagerDAO;
 import org.apache.eagle.service.application.dao.ApplicationManagerDaoImpl;
 import org.apache.eagle.service.application.entity.TopologyExecutionStatus;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericServiceAPIResponseEntity.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericServiceAPIResponseEntity.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericServiceAPIResponseEntity.java
index edb9fc0..2b37d25 100644
--- a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericServiceAPIResponseEntity.java
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericServiceAPIResponseEntity.java
@@ -111,11 +111,11 @@ public class GenericServiceAPIResponseEntity<T>{
 	public String getException() {
 		return exception;
 	}
-	public void setException(String exception) {
-		this.exception = exception;
-	}
+//	public void setException(String exception) {
+//		this.exception = exception;
+//	}
 
-    public void setException(Exception exception){
-        if(exception!=null) this.exception = EagleExceptionWrapper.wrap(exception);
+    public void setException(Exception exceptionObj){
+        if(exception!=null) this.exception = EagleExceptionWrapper.wrap(exceptionObj);
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericServiceAPIResponseEntityDeserializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericServiceAPIResponseEntityDeserializer.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericServiceAPIResponseEntityDeserializer.java
index 85b875d..940ee8a 100644
--- a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericServiceAPIResponseEntityDeserializer.java
+++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/GenericServiceAPIResponseEntityDeserializer.java
@@ -54,7 +54,8 @@ public class GenericServiceAPIResponseEntityDeserializer extends JsonDeserialize
                 else if(SUCCESS_FIELD.equals(field.getKey()) && field.getValue() != null){
                     entity.setSuccess(field.getValue().getValueAsBoolean(false));
                 }else if(EXCEPTION_FIELD.equals(field.getKey()) && field.getValue() != null){
-                    entity.setException(field.getValue().getTextValue());
+//                    entity.setException(field.getValue().getTextValue());
+                    entity.setException(new Exception(field.getValue().getTextValue()));
                 }else if(TYPE_FIELD.endsWith(field.getKey())  && field.getValue() != null){
                     try {
                         entity.setType(Class.forName(field.getValue().getTextValue()));
@@ -81,4 +82,4 @@ public class GenericServiceAPIResponseEntityDeserializer extends JsonDeserialize
         }
         return entity;
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-core/eagle-query/eagle-service-base/src/main/java/org/apache/eagle/service/generic/GenericEntityServiceResource.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-service-base/src/main/java/org/apache/eagle/service/generic/GenericEntityServiceResource.java b/eagle-core/eagle-query/eagle-service-base/src/main/java/org/apache/eagle/service/generic/GenericEntityServiceResource.java
index fb52352..b7f27f5 100644
--- a/eagle-core/eagle-query/eagle-service-base/src/main/java/org/apache/eagle/service/generic/GenericEntityServiceResource.java
+++ b/eagle-core/eagle-query/eagle-service-base/src/main/java/org/apache/eagle/service/generic/GenericEntityServiceResource.java
@@ -439,7 +439,7 @@ public class GenericEntityServiceResource {
                 LOG.error("Data storage is null");
                 throw new IllegalDataStorageException("data storage is null");
             }
-            
+
             QueryResult<?> result = queryStatement.execute(dataStorage);
             if(result.isSuccess()){
                 meta.put(FIRST_TIMESTAMP, result.getFirstTimestamp());
@@ -543,7 +543,7 @@ public class GenericEntityServiceResource {
                 LOG.error("Data storage is null");
                 throw new IllegalDataStorageException("Data storage is null");
             }
-            
+
             DeleteStatement deleteStatement = new DeleteStatement(rawQuery);
             ModifyResult<String> deleteResult = deleteStatement.execute(dataStorage);
             if(deleteResult.isSuccess()){
@@ -627,4 +627,4 @@ public class GenericEntityServiceResource {
         }
         return response;
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-examples/eagle-app-example/src/main/java/org/apache/eagle/app/example/ExampleStormApplication.java
----------------------------------------------------------------------
diff --git a/eagle-examples/eagle-app-example/src/main/java/org/apache/eagle/app/example/ExampleStormApplication.java b/eagle-examples/eagle-app-example/src/main/java/org/apache/eagle/app/example/ExampleStormApplication.java
index 6819e59..d4c0e0c 100644
--- a/eagle-examples/eagle-app-example/src/main/java/org/apache/eagle/app/example/ExampleStormApplication.java
+++ b/eagle-examples/eagle-app-example/src/main/java/org/apache/eagle/app/example/ExampleStormApplication.java
@@ -24,6 +24,7 @@ import backtype.storm.topology.TopologyBuilder;
 import backtype.storm.topology.base.BaseRichSpout;
 import backtype.storm.tuple.Fields;
 import backtype.storm.utils.Utils;
+import com.typesafe.config.Config;
 import org.apache.eagle.app.StormApplication;
 import org.apache.eagle.app.environment.impl.StormEnvironment;
 
@@ -33,8 +34,13 @@ import java.util.Map;
 public class ExampleStormApplication extends StormApplication<ExampleStormConfig> {
     @Override
     public StormTopology execute(ExampleStormConfig config, StormEnvironment environment) {
+        return null;
+    }
+
+    @Override
+    public StormTopology execute(Config config, StormEnvironment environment) {
         TopologyBuilder builder = new TopologyBuilder();
-        builder.setSpout("metric_spout", new RandomEventSpout(), config.getSpoutNum());
+        builder.setSpout("metric_spout", new RandomEventSpout(), config.getInt("spoutNum"));
         builder.setBolt("sink_1",environment.getFlattenStreamSink("SAMPLE_STREAM_1",config)).fieldsGrouping("metric_spout",new Fields("metric"));
         builder.setBolt("sink_2",environment.getFlattenStreamSink("SAMPLE_STREAM_2",config)).fieldsGrouping("metric_spout",new Fields("metric"));
         return builder.createTopology();
@@ -59,4 +65,4 @@ public class ExampleStormApplication extends StormApplication<ExampleStormConfig
             outputFieldsDeclarer.declare(new Fields("metric","timestamp","source","value"));
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-examples/eagle-app-example/src/test/java/org/apache/eagle/app/example/ExampleApplicationProviderTest.java
----------------------------------------------------------------------
diff --git a/eagle-examples/eagle-app-example/src/test/java/org/apache/eagle/app/example/ExampleApplicationProviderTest.java b/eagle-examples/eagle-app-example/src/test/java/org/apache/eagle/app/example/ExampleApplicationProviderTest.java
index 45dd7bd..88dd02d 100644
--- a/eagle-examples/eagle-app-example/src/test/java/org/apache/eagle/app/example/ExampleApplicationProviderTest.java
+++ b/eagle-examples/eagle-app-example/src/test/java/org/apache/eagle/app/example/ExampleApplicationProviderTest.java
@@ -26,10 +26,13 @@ import org.apache.eagle.metadata.model.ApplicationEntity;
 import org.apache.eagle.metadata.model.SiteEntity;
 import org.apache.eagle.metadata.resource.SiteResource;
 import org.junit.Assert;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 
 import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
 
 @RunWith(AppUnitTestRunner.class)
 public class ExampleApplicationProviderTest {
@@ -54,6 +57,7 @@ public class ExampleApplicationProviderTest {
      * @throws InterruptedException
      */
     @Test
+    @Ignore
     public void testApplicationLifecycle() throws InterruptedException {
         // Create local site
         SiteEntity siteEntity = new SiteEntity();
@@ -63,8 +67,10 @@ public class ExampleApplicationProviderTest {
         siteResource.createSite(siteEntity);
         Assert.assertNotNull(siteEntity.getUuid());
 
+        ApplicationOperations.InstallOperation installOperation = new ApplicationOperations.InstallOperation("test_site","EXAMPLE_APPLICATION", ApplicationEntity.Mode.LOCAL);
+        installOperation.setConfiguration(getConf());
         // Install application
-        ApplicationEntity applicationEntity = applicationResource.installApplication(new ApplicationOperations.InstallOperation("test_site","EXAMPLE_APPLICATION", ApplicationEntity.Mode.LOCAL)).getData();
+        ApplicationEntity applicationEntity = applicationResource.installApplication(installOperation).getData();
         // Start application
         applicationResource.startApplication(new ApplicationOperations.StartOperation(applicationEntity.getUuid()));
         // Stop application
@@ -81,16 +87,29 @@ public class ExampleApplicationProviderTest {
 
     @Test
     public void testApplicationQuickRunWithAppType(){
-        simulator.start("EXAMPLE_APPLICATION");
+        simulator.start("EXAMPLE_APPLICATION", getConf());
     }
 
+    @Ignore
     @Test
-    public void testApplicationQuickRunWithAppProvider(){
-        simulator.start(ExampleApplicationProvider.class);
+    public void testApplicationQuickRunWithAppProvider() throws Exception{
+        simulator.start(ExampleApplicationProvider.class, getConf());
     }
 
+    @Ignore
     @Test
-    public void testApplicationQuickRunWithAppProvider2(){
-        simulator.start(ExampleApplicationProvider2.class);
+    public void testApplicationQuickRunWithAppProvider2() throws Exception{
+        simulator.start(ExampleApplicationProvider2.class, getConf());
     }
-}
\ No newline at end of file
+
+    private Map<String, Object> getConf(){
+        Map<String, Object> conf = new HashMap<>();
+        conf.put("dataSinkConfig.topic", "testTopic");
+        conf.put("dataSinkConfig.brokerList", "broker");
+        conf.put("dataSinkConfig.serializerClass", "serializerClass");
+        conf.put("dataSinkConfig.keySerializerClass", "keySerializerClass");
+        conf.put("spoutNum", 2);
+        conf.put("mode", "LOCAL");
+        return conf;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-examples/eagle-app-example/src/test/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-examples/eagle-app-example/src/test/resources/application.conf b/eagle-examples/eagle-app-example/src/test/resources/application.conf
index bfbf20e..2873037 100644
--- a/eagle-examples/eagle-app-example/src/test/resources/application.conf
+++ b/eagle-examples/eagle-app-example/src/test/resources/application.conf
@@ -56,7 +56,15 @@
 		}
 	},
 
-	"org.apache.eagle.app.example.ExampleStormApplication": {
 		"appId": "unit_test_example_app"
+	"spoutNum": 3
+	"loaded": true
+	"mode":"LOCAL"
+
+	"dataSinkConfig": {
+		"topic" : "test_topic",
+		"brokerList" : "sandbox.hortonworks.com:6667",
+		"serializerClass" : "kafka.serializer.StringEncoder",
+		"keySerializerClass" : "kafka.serializer.StringEncoder"
 	}
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-jpm/eagle-jpm-app/src/main/java/org/apache/eagle/app/jpm/JPMApplication.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-app/src/main/java/org/apache/eagle/app/jpm/JPMApplication.java b/eagle-jpm/eagle-jpm-app/src/main/java/org/apache/eagle/app/jpm/JPMApplication.java
index 89e5433..25506cc 100644
--- a/eagle-jpm/eagle-jpm-app/src/main/java/org/apache/eagle/app/jpm/JPMApplication.java
+++ b/eagle-jpm/eagle-jpm-app/src/main/java/org/apache/eagle/app/jpm/JPMApplication.java
@@ -23,6 +23,7 @@ import backtype.storm.topology.OutputFieldsDeclarer;
 import backtype.storm.topology.TopologyBuilder;
 import backtype.storm.topology.base.BaseRichSpout;
 import backtype.storm.tuple.Fields;
+import com.typesafe.config.Config;
 import org.apache.eagle.app.StormApplication;
 import org.apache.eagle.app.environment.impl.StormEnvironment;
 
@@ -32,6 +33,11 @@ import java.util.Map;
 public class JPMApplication extends StormApplication<JPMConfiguration> {
     @Override
     public StormTopology execute(JPMConfiguration config, StormEnvironment environment) {
+        return null;
+    }
+
+    @Override
+    public StormTopology execute(Config config, StormEnvironment environment) {
         TopologyBuilder builder = new TopologyBuilder();
         builder.setSpout("metric_spout", new RandomEventSpout(), 4);
         builder.setBolt("sink_1",environment.getFlattenStreamSink("SAMPLE_STREAM_1",config)).fieldsGrouping("metric_spout",new Fields("metric"));
@@ -57,4 +63,4 @@ public class JPMApplication extends StormApplication<JPMConfiguration> {
             outputFieldsDeclarer.declare(new Fields("metric","timestamp","source","value"));
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-jpm/eagle-jpm-app/src/test/java/org/apache/eagle/app/jpm/JPMApplicationTest.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-app/src/test/java/org/apache/eagle/app/jpm/JPMApplicationTest.java b/eagle-jpm/eagle-jpm-app/src/test/java/org/apache/eagle/app/jpm/JPMApplicationTest.java
index 30bbc96..c955d36 100644
--- a/eagle-jpm/eagle-jpm-app/src/test/java/org/apache/eagle/app/jpm/JPMApplicationTest.java
+++ b/eagle-jpm/eagle-jpm-app/src/test/java/org/apache/eagle/app/jpm/JPMApplicationTest.java
@@ -27,6 +27,9 @@ import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 
+import java.util.HashMap;
+import java.util.Map;
+
 @RunWith(AppUnitTestRunner.class)
 public class JPMApplicationTest {
     @Inject
@@ -53,8 +56,11 @@ public class JPMApplicationTest {
         siteResource.createSite(siteEntity);
         Assert.assertNotNull(siteEntity.getUuid());
 
+        ApplicationOperations.InstallOperation installOperation = new ApplicationOperations.InstallOperation("test_site","JPM_APP", ApplicationEntity.Mode.LOCAL);
+        installOperation.setConfiguration(getConf());
+
         // Install application
-        ApplicationEntity applicationEntity = applicationResource.installApplication(new ApplicationOperations.InstallOperation("test_site","JPM_APP", ApplicationEntity.Mode.LOCAL)).getData();
+        ApplicationEntity applicationEntity = applicationResource.installApplication(installOperation).getData();
         // Start application
         applicationResource.startApplication(new ApplicationOperations.StartOperation(applicationEntity.getUuid()));
         // Stop application
@@ -68,4 +74,15 @@ public class JPMApplicationTest {
             // Expected exception
         }
     }
+
+    private Map<String, Object> getConf(){
+        Map<String, Object> conf = new HashMap<>();
+        conf.put("dataSinkConfig.topic", "testTopic");
+        conf.put("dataSinkConfig.brokerList", "broker");
+        conf.put("dataSinkConfig.serializerClass", "serializerClass");
+        conf.put("dataSinkConfig.keySerializerClass", "keySerializerClass");
+        conf.put("spoutNum", 2);
+        conf.put("mode", "LOCAL");
+        return conf;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-security/eagle-security-common/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-common/pom.xml b/eagle-security/eagle-security-common/pom.xml
index 83971f2..18f9bd0 100644
--- a/eagle-security/eagle-security-common/pom.xml
+++ b/eagle-security/eagle-security-common/pom.xml
@@ -52,6 +52,11 @@
       <artifactId>eagle-alert-service</artifactId>
       <version>${project.version}</version>
     </dependency>
+      <dependency>
+          <groupId>org.apache.eagle</groupId>
+          <artifactId>eagle-metadata-base</artifactId>
+          <version>${project.version}</version>
+      </dependency>
 	<dependency>
 		<groupId>org.json</groupId>
 		<artifactId>json</artifactId>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/service/IMetadataServiceClient.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/service/IMetadataServiceClient.java b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/service/IMetadataServiceClient.java
new file mode 100644
index 0000000..e3c9f95
--- /dev/null
+++ b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/service/IMetadataServiceClient.java
@@ -0,0 +1,32 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  * <p/>
+ *  * http://www.apache.org/licenses/LICENSE-2.0
+ *  * <p/>
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.security.service;
+
+import java.io.Closeable;
+import java.io.Serializable;
+import java.util.Collection;
+
+/**
+ * service stub to get metadata from remote metadata service
+ */
+public interface IMetadataServiceClient extends Closeable, Serializable {
+    Collection<HBaseSensitivityEntity> listHBaseSensitivies();
+    OpResult addHBaseSensitivity(Collection<HBaseSensitivityEntity> h);
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/service/ISecurityMetadataDAO.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/service/ISecurityMetadataDAO.java b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/service/ISecurityMetadataDAO.java
index 45e729e..534fb38 100644
--- a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/service/ISecurityMetadataDAO.java
+++ b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/service/ISecurityMetadataDAO.java
@@ -18,6 +18,7 @@
 package org.apache.eagle.security.service;
 
 import java.util.Collection;
+
 /**
  * Since 6/10/16.
  */

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/service/InMemMetadataDaoImpl.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/service/InMemMetadataDaoImpl.java b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/service/InMemMetadataDaoImpl.java
index 53d7132..27aeb57 100644
--- a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/service/InMemMetadataDaoImpl.java
+++ b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/service/InMemMetadataDaoImpl.java
@@ -28,7 +28,7 @@ import java.util.*;
 /**
  * In memory service for simple service start. Make all service API as
  * synchronized.
- * 
+ *
  * @since Apr 11, 2016
  *
  */
@@ -39,8 +39,7 @@ public class InMemMetadataDaoImpl implements ISecurityMetadataDAO {
     private Map<Pair<String, String>, HBaseSensitivityEntity> hBaseSensitivityEntities = new HashMap<>();
 
     @Inject
-    public InMemMetadataDaoImpl(Config config) {
-        
+    public InMemMetadataDaoImpl() {
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/service/MetadataDaoFactory.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/service/MetadataDaoFactory.java b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/service/MetadataDaoFactory.java
index f17fd43..65e86f0 100644
--- a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/service/MetadataDaoFactory.java
+++ b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/service/MetadataDaoFactory.java
@@ -17,7 +17,6 @@
 package org.apache.eagle.security.service;
 
 import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -28,41 +27,28 @@ import java.lang.reflect.Constructor;
  *
  */
 public class MetadataDaoFactory {
-
-    private static final MetadataDaoFactory INSTANCE = new MetadataDaoFactory();
     private static final Logger LOG = LoggerFactory.getLogger(MetadataDaoFactory.class);
 
-    private ISecurityMetadataDAO dao;
-
-    private MetadataDaoFactory() {
-        Config config = ConfigFactory.load();
-        Config datastoreConfig = config.getConfig("datastore");
-        if (datastoreConfig == null) {
-            LOG.warn("datastore is not configured, use in-memory store !!!");
-            dao = new InMemMetadataDaoImpl(datastoreConfig);
+    public static ISecurityMetadataDAO getMetadataDAO(String storeCls) {
+        ISecurityMetadataDAO dao = null;
+        if (storeCls == null) {
+            LOG.warn("metadata store is not configured, use in-memory store !!!");
+            dao = new InMemMetadataDaoImpl();
         } else {
-            String clsName = datastoreConfig.getString("metadataDao");
             Class<?> clz;
             try {
-                clz = Thread.currentThread().getContextClassLoader().loadClass(clsName);
+                clz = Thread.currentThread().getContextClassLoader().loadClass(storeCls);
                 if (ISecurityMetadataDAO.class.isAssignableFrom(clz)) {
-                    Constructor<?> cotr = clz.getConstructor(Config.class);
-                    dao = (ISecurityMetadataDAO) cotr.newInstance(datastoreConfig);
+                    Constructor<?> cotr = clz.getConstructor();
+                    dao = (ISecurityMetadataDAO) cotr.newInstance();
                 } else {
                     throw new Exception("metadataDao configuration need to be implementation of IMetadataDao! ");
                 }
             } catch (Exception e) {
                 LOG.error("error when initialize the dao, fall back to in memory mode!", e);
-                dao = new InMemMetadataDaoImpl(datastoreConfig);
+                dao = new InMemMetadataDaoImpl();
             }
         }
-    }
-
-    public static MetadataDaoFactory getInstance() {
-        return INSTANCE;
-    }
-
-    public ISecurityMetadataDAO getMetadataDao() {
         return dao;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/service/MetadataServiceClientImpl.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/service/MetadataServiceClientImpl.java b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/service/MetadataServiceClientImpl.java
new file mode 100644
index 0000000..676bde5
--- /dev/null
+++ b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/service/MetadataServiceClientImpl.java
@@ -0,0 +1,114 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  * <p/>
+ *  * http://www.apache.org/licenses/LICENSE-2.0
+ *  * <p/>
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.security.service;
+
+import com.sun.jersey.api.client.Client;
+import com.sun.jersey.api.client.GenericType;
+import com.sun.jersey.api.client.WebResource;
+import com.sun.jersey.api.client.config.ClientConfig;
+import com.sun.jersey.api.client.config.DefaultClientConfig;
+import com.sun.jersey.client.urlconnection.URLConnectionClientHandler;
+import com.typesafe.config.Config;
+import org.codehaus.jackson.jaxrs.JacksonJsonProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.core.MediaType;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+
+public class MetadataServiceClientImpl implements IMetadataServiceClient {
+    private static final long serialVersionUID = 3003976065082684128L;
+
+    private static final Logger LOG = LoggerFactory.getLogger(MetadataServiceClientImpl.class);
+
+    private static final String METADATA_LIST_HBASE_SENSITIVITY_PATH = "/metadata/sensitivity/hbase";
+    private static final String METADATA_ADD_HBASE_SENSITIVITY_PATH = "/metadata/sensitivity/hbase";
+
+    private static final String METADATA_CLEAR_PATH = "/metadata/clear";
+
+    private static final String EAGLE_CORRELATION_CONTEXT = "metadataService.context";
+    private static final String EAGLE_CORRELATION_SERVICE_PORT = "metadataService.port";
+    private static final String EAGLE_CORRELATION_SERVICE_HOST = "metadataService.host";
+
+    protected static final String CONTENT_TYPE = "Content-Type";
+
+    private String host;
+    private int port;
+    private String context;
+    private transient Client client;
+    private String basePath;
+
+    public MetadataServiceClientImpl(Config config) {
+        this(config.getString(EAGLE_CORRELATION_SERVICE_HOST), config.getInt(EAGLE_CORRELATION_SERVICE_PORT), config
+                .getString(EAGLE_CORRELATION_CONTEXT));
+        basePath = buildBasePath();
+    }
+
+    public MetadataServiceClientImpl(String host, int port, String context) {
+        this.host = host;
+        this.port = port;
+        this.context = context;
+        this.basePath = buildBasePath();
+        ClientConfig cc = new DefaultClientConfig();
+        cc.getProperties().put(DefaultClientConfig.PROPERTY_CONNECT_TIMEOUT, 60 * 1000);
+        cc.getProperties().put(DefaultClientConfig.PROPERTY_READ_TIMEOUT, 60 * 1000);
+        cc.getClasses().add(JacksonJsonProvider.class);
+        cc.getProperties().put(URLConnectionClientHandler.PROPERTY_HTTP_URL_CONNECTION_SET_METHOD_WORKAROUND, true);
+        this.client = Client.create(cc);
+        client.addFilter(new com.sun.jersey.api.client.filter.GZIPContentEncodingFilter());
+    }
+
+    private String buildBasePath() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("http://");
+        sb.append(host);
+        sb.append(":");
+        sb.append(port);
+        sb.append(context);
+        return sb.toString();
+    }
+
+    private <T> List<T> list(String path, GenericType<List<T>> type) {
+        WebResource r = client.resource(basePath + path);
+        LOG.info("query URL {}", basePath + path);
+        List<T> ret = r.accept(MediaType.APPLICATION_JSON_TYPE).type(MediaType.APPLICATION_JSON).get(type);
+        return ret;
+    }
+
+    @Override
+    public void close() throws IOException {
+        client.destroy();
+    }
+
+    @Override
+    public Collection<HBaseSensitivityEntity> listHBaseSensitivies() {
+        return list(METADATA_LIST_HBASE_SENSITIVITY_PATH, new GenericType<List<HBaseSensitivityEntity>>() {
+        });
+    }
+
+    @Override
+    public OpResult addHBaseSensitivity(Collection<HBaseSensitivityEntity> h) {
+        WebResource r = client.resource(basePath + METADATA_ADD_HBASE_SENSITIVITY_PATH);
+        r.accept(MediaType.APPLICATION_JSON_TYPE).type(MediaType.APPLICATION_JSON).post(h);
+        return new OpResult();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/service/SensitivityMetadataResource.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/service/SensitivityMetadataResource.java b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/service/SensitivityMetadataResource.java
deleted file mode 100644
index 05440fb..0000000
--- a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/service/SensitivityMetadataResource.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.security.service;
-
-import javax.ws.rs.Path;
-
-/**
- * Since 6/10/16.
- */
-@Path("/metadata/sensitivity")
-public class SensitivityMetadataResource {
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/topo/NewKafkaSourcedSpoutProvider.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/topo/NewKafkaSourcedSpoutProvider.java b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/topo/NewKafkaSourcedSpoutProvider.java
index e60e59e..3401b3c 100644
--- a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/topo/NewKafkaSourcedSpoutProvider.java
+++ b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/topo/NewKafkaSourcedSpoutProvider.java
@@ -56,8 +56,6 @@ public class NewKafkaSourcedSpoutProvider implements StormSpoutProvider {
         String groupId = context.getString("consumerGroupId");
         // Kafka fetch size
         int fetchSize = context.getInt("fetchSize");
-        // Kafka deserializer class
-        String deserClsName = context.getString("deserializerClass");
         // Kafka broker zk connection
         String zkConnString = context.getString("zkConnection");
         // transaction zkRoot

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogAppConf.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogAppConf.java b/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogAppConf.java
new file mode 100644
index 0000000..662311c
--- /dev/null
+++ b/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogAppConf.java
@@ -0,0 +1,28 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  * <p/>
+ *  * http://www.apache.org/licenses/LICENSE-2.0
+ *  * <p/>
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.security.hbase;
+
+import org.apache.eagle.app.Configuration;
+
+/**
+ * Since 8/5/16.
+ */
+public class HBaseAuditLogAppConf extends Configuration{
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogAppProvider.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogAppProvider.java b/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogAppProvider.java
new file mode 100644
index 0000000..051d8c4
--- /dev/null
+++ b/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogAppProvider.java
@@ -0,0 +1,38 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  * <p/>
+ *  * http://www.apache.org/licenses/LICENSE-2.0
+ *  * <p/>
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.security.hbase;
+
+import com.typesafe.config.Config;
+import org.apache.eagle.app.spi.AbstractApplicationProvider;
+import org.apache.eagle.security.service.MetadataDaoFactory;
+
+/**
+ * Since 8/5/16.
+ */
+public class HBaseAuditLogAppProvider extends AbstractApplicationProvider<HBaseAuditLogApplication> {
+    public HBaseAuditLogAppProvider() {
+        super("/META-INF/metadata.xml");
+    }
+
+    @Override
+    public HBaseAuditLogApplication getApplication() {
+        return new HBaseAuditLogApplication();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogApplication.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogApplication.java b/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogApplication.java
index 49393e1..3d80308 100644
--- a/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogApplication.java
+++ b/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogApplication.java
@@ -16,27 +16,35 @@
  */
 package org.apache.eagle.security.hbase;
 
+import backtype.storm.generated.StormTopology;
 import backtype.storm.topology.BoltDeclarer;
 import backtype.storm.topology.IRichSpout;
 import backtype.storm.topology.TopologyBuilder;
 import backtype.storm.tuple.Fields;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
+import org.apache.eagle.app.StormApplication;
+import org.apache.eagle.app.environment.impl.StormEnvironment;
+import org.apache.eagle.app.sink.StormStreamSink;
 import org.apache.eagle.security.topo.NewKafkaSourcedSpoutProvider;
 import storm.kafka.StringScheme;
-import storm.kafka.bolt.KafkaBolt;
 
 /**
  * Since 7/27/16.
  */
-public class HBaseAuditLogApplication{
+public class HBaseAuditLogApplication extends StormApplication<HBaseAuditLogAppConf> {
     public final static String SPOUT_TASK_NUM = "topology.numOfSpoutTasks";
     public final static String PARSER_TASK_NUM = "topology.numOfParserTasks";
     public final static String JOIN_TASK_NUM = "topology.numOfJoinTasks";
     public final static String SINK_TASK_NUM = "topology.numOfSinkTasks";
-    protected void buildApp(TopologyBuilder builder) {
-        System.setProperty("config.resource", "/application.conf");
-        Config config = ConfigFactory.load();
+    @Override
+    public StormTopology execute(HBaseAuditLogAppConf config1, StormEnvironment environment) {
+        return null;
+    }
+
+    @Override
+    public StormTopology execute(Config config, StormEnvironment environment) {
+        TopologyBuilder builder = new TopologyBuilder();
         NewKafkaSourcedSpoutProvider provider = new NewKafkaSourcedSpoutProvider();
         IRichSpout spout = provider.getSpout(config);
 
@@ -55,8 +63,15 @@ public class HBaseAuditLogApplication{
         BoltDeclarer joinBoltDeclarer = builder.setBolt("joinBolt", joinBolt, numOfJoinTasks);
         joinBoltDeclarer.fieldsGrouping("parserBolt", new Fields("f1"));
 
-        KafkaBolt kafkaBolt = new KafkaBolt();
-        BoltDeclarer kafkaBoltDeclarer = builder.setBolt("kafkaSink", kafkaBolt, numOfSinkTasks);
-        kafkaBoltDeclarer.shuffleGrouping("joinBolt");
+        StormStreamSink sinkBolt = environment.getFlattenStreamSink("hbase_audit_log_stream",config);
+        BoltDeclarer kafkaBoltDeclarer = builder.setBolt("kafkaSink", sinkBolt, numOfSinkTasks);
+        kafkaBoltDeclarer.fieldsGrouping("joinBolt", new Fields("user"));
+        return builder.createTopology();
+    }
+
+    public static void main(String[] args){
+        Config config = ConfigFactory.load();
+        HBaseAuditLogApplication app = new HBaseAuditLogApplication();
+        app.run(config);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HbaseAuditLogMonitoringMain.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HbaseAuditLogMonitoringMain.java b/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HbaseAuditLogMonitoringMain.java
deleted file mode 100644
index 13ca214..0000000
--- a/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HbaseAuditLogMonitoringMain.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-
-package org.apache.eagle.security.hbase;
-
-import backtype.storm.generated.StormTopology;
-import backtype.storm.topology.BoltDeclarer;
-import backtype.storm.topology.IRichSpout;
-import backtype.storm.topology.TopologyBuilder;
-import backtype.storm.tuple.Fields;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import org.apache.eagle.security.topo.NewKafkaSourcedSpoutProvider;
-import org.apache.eagle.security.topo.TopologySubmitter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import storm.kafka.*;
-import storm.kafka.bolt.KafkaBolt;
-
-public class HbaseAuditLogMonitoringMain {
-    private static Logger LOG = LoggerFactory.getLogger(HbaseAuditLogMonitoringMain.class);
-    public final static String SPOUT_TASK_NUM = "topology.numOfSpoutTasks";
-    public final static String PARSER_TASK_NUM = "topology.numOfParserTasks";
-    public final static String JOIN_TASK_NUM = "topology.numOfJoinTasks";
-    public final static String SINK_TASK_NUM = "topology.numOfSinkTasks";
-
-    public static void main(String[] args) throws Exception{
-        System.setProperty("config.resource", "/application.conf");
-        Config config = ConfigFactory.load();
-        NewKafkaSourcedSpoutProvider provider = new NewKafkaSourcedSpoutProvider();
-        IRichSpout spout = provider.getSpout(config);
-
-        HBaseAuditLogParserBolt bolt = new HBaseAuditLogParserBolt();
-        TopologyBuilder builder = new TopologyBuilder();
-
-        int numOfSpoutTasks = config.getInt(SPOUT_TASK_NUM);
-        int numOfParserTasks = config.getInt(PARSER_TASK_NUM);
-        int numOfJoinTasks = config.getInt(JOIN_TASK_NUM);
-        int numOfSinkTasks = config.getInt(SINK_TASK_NUM);
-
-        builder.setSpout("ingest", spout, numOfSpoutTasks);
-        BoltDeclarer boltDeclarer = builder.setBolt("parserBolt", bolt, numOfParserTasks);
-        boltDeclarer.fieldsGrouping("ingest", new Fields(StringScheme.STRING_SCHEME_KEY));
-
-        HbaseResourceSensitivityDataJoinBolt joinBolt = new HbaseResourceSensitivityDataJoinBolt(config);
-        BoltDeclarer joinBoltDeclarer = builder.setBolt("joinBolt", joinBolt, numOfJoinTasks);
-        joinBoltDeclarer.fieldsGrouping("parserBolt", new Fields("f1"));
-
-        KafkaBolt kafkaBolt = new KafkaBolt();
-        BoltDeclarer kafkaBoltDeclarer = builder.setBolt("kafkaSink", kafkaBolt, numOfSinkTasks);
-        kafkaBoltDeclarer.shuffleGrouping("joinBolt");
-
-        StormTopology topology = builder.createTopology();
-
-        TopologySubmitter.submit(topology, config);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HbaseResourceSensitivityDataJoinBolt.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HbaseResourceSensitivityDataJoinBolt.java b/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HbaseResourceSensitivityDataJoinBolt.java
index cf486d3..d8d9d6b 100644
--- a/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HbaseResourceSensitivityDataJoinBolt.java
+++ b/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HbaseResourceSensitivityDataJoinBolt.java
@@ -23,14 +23,12 @@ import backtype.storm.topology.OutputFieldsDeclarer;
 import backtype.storm.topology.base.BaseRichBolt;
 import backtype.storm.tuple.Fields;
 import backtype.storm.tuple.Tuple;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import com.typesafe.config.Config;
 import org.apache.eagle.security.entity.HbaseResourceSensitivityAPIEntity;
 import org.apache.eagle.security.util.ExternalDataCache;
 import org.apache.eagle.security.util.ExternalDataJoiner;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper;
 
 import java.util.Arrays;
 import java.util.Map;
@@ -94,9 +92,7 @@ public class HbaseResourceSensitivityDataJoinBolt extends BaseRichBolt {
             }
             LOG.info("After hbase resource sensitivity lookup: " + newEvent);
             // push to Kafka sink
-            ObjectMapper mapper = new ObjectMapper();
-            String msg = mapper.writeValueAsString(map);
-            collector.emit(Arrays.asList(newEvent.get("user"), msg));
+            collector.emit(Arrays.asList(newEvent.get("user"), newEvent));
         }catch(Exception ex){
             LOG.error("error joining data, ignore it", ex);
         }finally {
@@ -106,6 +102,6 @@ public class HbaseResourceSensitivityDataJoinBolt extends BaseRichBolt {
 
     @Override
     public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        declarer.declare(new Fields(FieldNameBasedTupleToKafkaMapper.BOLT_KEY, FieldNameBasedTupleToKafkaMapper.BOLT_MESSAGE));
+        declarer.declare(new Fields("user", "message"));
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HbaseResourceSensitivityPollingJob.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HbaseResourceSensitivityPollingJob.java b/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HbaseResourceSensitivityPollingJob.java
index 9ca0701..603ed5a 100644
--- a/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HbaseResourceSensitivityPollingJob.java
+++ b/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HbaseResourceSensitivityPollingJob.java
@@ -19,10 +19,8 @@ package org.apache.eagle.security.hbase;
 
 import com.google.common.base.Function;
 import com.google.common.collect.Maps;
-import org.apache.eagle.security.entity.HbaseResourceSensitivityAPIEntity;
-import org.apache.eagle.security.service.HBaseSensitivityEntity;
-import org.apache.eagle.security.service.ISecurityMetadataDAO;
-import org.apache.eagle.security.service.MetadataDaoFactory;
+import org.apache.eagle.common.config.EagleConfigConstants;
+import org.apache.eagle.security.service.*;
 import org.apache.eagle.security.util.AbstractResourceSensitivityPollingJob;
 import org.apache.eagle.security.util.ExternalDataCache;
 import org.quartz.Job;
@@ -33,7 +31,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Collection;
-import java.util.List;
 import java.util.Map;
 
 public class HbaseResourceSensitivityPollingJob extends AbstractResourceSensitivityPollingJob implements Job {
@@ -44,12 +41,33 @@ public class HbaseResourceSensitivityPollingJob extends AbstractResourceSensitiv
             throws JobExecutionException {
         JobDataMap jobDataMap = context.getJobDetail().getJobDataMap();
         try {
-            ISecurityMetadataDAO dao = MetadataDaoFactory.getInstance().getMetadataDao();
-            Collection<HBaseSensitivityEntity> sensitivityEntities = dao.listHBaseSensitivies();
-            ExternalDataCache.getInstance().setJobResult(getClass(), sensitivityEntities);
+            Collection<HBaseSensitivityEntity> sensitivityEntities = load(jobDataMap);
+            Map<String, HBaseSensitivityEntity> map = Maps.uniqueIndex(
+                    sensitivityEntities,
+                    new Function<HBaseSensitivityEntity, String>() {
+                        @Override
+                        public String apply(HBaseSensitivityEntity input) {
+                            return input.getHbaseResource();
+                        }
+                    });
+            ExternalDataCache.getInstance().setJobResult(getClass(), map);
         } catch(Exception ex) {
         	LOG.error("Fail to load hbase resource sensitivity data", ex);
         }
     }
 
-}
\ No newline at end of file
+    private Collection<HBaseSensitivityEntity> load(JobDataMap jobDataMap) throws Exception {
+        Map<String, Object> map = (Map<String,Object>)jobDataMap.get(EagleConfigConstants.EAGLE_SERVICE);
+        String eagleServiceHost = (String)map.get(EagleConfigConstants.HOST);
+        Integer eagleServicePort = Integer.parseInt(map.get(EagleConfigConstants.PORT).toString());
+        String username = map.containsKey(EagleConfigConstants.USERNAME) ? (String)map.get(EagleConfigConstants.USERNAME) : null;
+        String password = map.containsKey(EagleConfigConstants.PASSWORD) ? (String)map.get(EagleConfigConstants.PASSWORD) : null;
+
+        // load from eagle database
+        LOG.info("Load hbase resource sensitivity information from eagle service "
+                + eagleServiceHost + ":" + eagleServicePort);
+
+        IMetadataServiceClient client = new MetadataServiceClientImpl(eagleServiceHost, eagleServicePort, "/rest");
+        return client.listHBaseSensitivies();
+    }
+}


[3/3] incubator-eagle git commit: HBase audit monitoring with new app framework https://issues.apache.org/jira/browse/EAGLE-420 Author: Yong Zhang Reviewer: Hao Chen

Posted by yo...@apache.org.
HBase audit monitoring with new app framework
https://issues.apache.org/jira/browse/EAGLE-420
Author: Yong Zhang
Reviewer: Hao Chen


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/660bfbd3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/660bfbd3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/660bfbd3

Branch: refs/heads/develop
Commit: 660bfbd3fac6febddb612b9b629a8d466b536b3d
Parents: 1d84256
Author: yonzhang <yo...@gmail.com>
Authored: Sun Aug 7 17:49:48 2016 -0700
Committer: yonzhang <yo...@gmail.com>
Committed: Sun Aug 7 17:49:48 2016 -0700

----------------------------------------------------------------------
 .../TestSiddhiStateSnapshotAndRestore.java      | 506 -------------------
 .../resolver/AttributeResolveResource.java      |   6 +-
 .../apache/eagle/app/AbstractApplication.java   | 143 +++---
 .../java/org/apache/eagle/app/Application.java  |  11 +-
 .../eagle/app/environment/ExecutionRuntime.java | 107 ++--
 .../environment/impl/SparkExecutionRuntime.java | 117 ++---
 .../app/environment/impl/StormEnvironment.java  |  84 +--
 .../environment/impl/StormExecutionRuntime.java | 327 ++++++------
 .../eagle/app/service/ApplicationContext.java   |  13 +-
 .../impl/ApplicationManagementServiceImpl.java  |  36 +-
 .../apache/eagle/app/sink/KafkaStreamSink.java  |  52 +-
 .../eagle/app/sink/KafkaStreamSinkConfig.java   |  29 +-
 .../eagle/app/sink/LoggingStreamSink.java       |   5 +-
 .../apache/eagle/app/sink/StormStreamSink.java  |   2 +-
 .../eagle/app/sink/StreamSinkProvider.java      |  93 ++--
 .../app/spi/AbstractApplicationProvider.java    |   2 +-
 .../eagle/app/spi/ApplicationProvider.java      |   2 +-
 .../eagle/app/test/ServerSimulatorImpl.java     |   7 +-
 .../apache/eagle/app/TestStormApplication.java  | 174 ++++---
 .../eagle/app/storm/MockStormApplication.java   | 191 ++++---
 .../app/storm/MockStormApplicationTest.java     | 126 ++---
 .../src/test/resources/application.conf         |  17 +-
 .../ApplicationManagementResource.java          |   1 -
 .../entity/GenericServiceAPIResponseEntity.java |  12 +-
 ...ricServiceAPIResponseEntityDeserializer.java |   5 +-
 .../generic/GenericEntityServiceResource.java   |   6 +-
 .../app/example/ExampleStormApplication.java    |  10 +-
 .../example/ExampleApplicationProviderTest.java |  33 +-
 .../src/test/resources/application.conf         |  12 +-
 .../apache/eagle/app/jpm/JPMApplication.java    |   8 +-
 .../eagle/app/jpm/JPMApplicationTest.java       |  19 +-
 eagle-security/eagle-security-common/pom.xml    |   5 +
 .../service/IMetadataServiceClient.java         |  32 ++
 .../security/service/ISecurityMetadataDAO.java  |   1 +
 .../security/service/InMemMetadataDaoImpl.java  |   5 +-
 .../security/service/MetadataDaoFactory.java    |  32 +-
 .../service/MetadataServiceClientImpl.java      | 114 +++++
 .../service/SensitivityMetadataResource.java    |  27 -
 .../topo/NewKafkaSourcedSpoutProvider.java      |   2 -
 .../security/hbase/HBaseAuditLogAppConf.java    |  28 +
 .../hbase/HBaseAuditLogAppProvider.java         |  38 ++
 .../hbase/HBaseAuditLogApplication.java         |  31 +-
 .../hbase/HbaseAuditLogMonitoringMain.java      |  72 ---
 .../HbaseResourceSensitivityDataJoinBolt.java   |   8 +-
 .../HbaseResourceSensitivityPollingJob.java     |  36 +-
 .../src/main/resources/META-INF/metadata.xml    | 243 +++++++++
 ...org.apache.eagle.app.spi.ApplicationProvider |  35 ++
 .../src/main/resources/application.conf         |   8 +-
 .../src/main/resources/application.conf.bak     |  66 ---
 .../src/main/resources/metadata.xml             |  91 ----
 .../src/main/resources/scripts                  |  22 +
 .../hbase/TestHbaseAuditLogProcessTopology.java |  44 --
 .../hbase/SensitivityMetadataResource.java      |  64 +++
 eagle-server/pom.xml                            |  14 +
 .../src/main/resources/application.conf         |   2 +-
 .../src/main/resources/configuration.yml        |  21 +
 .../eagle/server/ServerApplicationTest.java     |   4 +-
 .../src/test/resources/configuration.yml        |  21 +
 58 files changed, 1609 insertions(+), 1613 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/state/TestSiddhiStateSnapshotAndRestore.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/state/TestSiddhiStateSnapshotAndRestore.java b/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/state/TestSiddhiStateSnapshotAndRestore.java
deleted file mode 100644
index 131be28..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/state/TestSiddhiStateSnapshotAndRestore.java
+++ /dev/null
@@ -1,506 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package org.apache.eagle.alert.state;
-
-import org.apache.eagle.common.DateTimeUtil;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.wso2.siddhi.core.ExecutionPlanRuntime;
-import org.wso2.siddhi.core.SiddhiManager;
-import org.wso2.siddhi.core.event.Event;
-import org.wso2.siddhi.core.query.output.callback.QueryCallback;
-import org.wso2.siddhi.core.stream.input.InputHandler;
-import org.wso2.siddhi.core.util.EventPrinter;
-import org.wso2.siddhi.core.util.persistence.InMemoryPersistenceStore;
-import org.wso2.siddhi.core.util.persistence.PersistenceStore;
-
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-
-/**
- * experiment Siddhi state snapshot and restore
- */
-public class TestSiddhiStateSnapshotAndRestore {
-    private ExecutionPlanRuntime setupRuntimeForSimple(){
-        SiddhiManager siddhiManager = new SiddhiManager();
-
-        String cseEventStream = "" +
-                "define stream testStream (cmd string, src string) ;";
-        String queryString = "" +
-                "@info(name = 'query1') " +
-                "from testStream[(cmd == 'rename') and (src == '/tmp/pii')] " +
-                "select cmd, src " +
-                "insert into outputStream ;";
-
-        ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime("@Plan:name('testPlan') " + cseEventStream + queryString);
-
-        QueryCallback callback = new QueryCallback() {
-            @Override
-            public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
-                EventPrinter.print(timeStamp, inEvents, removeEvents);
-            }
-        };
-        executionPlanRuntime.addCallback("query1", callback);
-        executionPlanRuntime.start();
-        return executionPlanRuntime;
-    }
-
-    @Test
-    public void testSimpleSiddhiQuery() throws Exception{
-        String tmpdir = System.getProperty("java.io.tmpdir");
-        System.out.println("temporary directory: " + tmpdir);
-
-        String stateFile = tmpdir + "/siddhi-state";
-        ExecutionPlanRuntime executionPlanRuntime = setupRuntimeForSimple();
-        executionPlanRuntime.getInputHandler("testStream").send(new Object[]{"rename", "/tmp/pii"});
-        byte[] state = executionPlanRuntime.snapshot();
-        int length = state.length;
-        FileOutputStream output = new FileOutputStream(stateFile);
-        output.write(state);
-        output.close();
-        executionPlanRuntime.shutdown();
-
-        ExecutionPlanRuntime restoredRuntime = setupRuntimeForSimple();
-        FileInputStream input = new FileInputStream(stateFile);
-        byte[] restoredState = new byte[length];
-        input.read(restoredState);
-        restoredRuntime.restore(restoredState);
-        restoredRuntime.getInputHandler("testStream").send(new Object[]{"rename", "/tmp/pii"});
-        input.close();
-        restoredRuntime.shutdown();
-    }
-
-    private ExecutionPlanRuntime setupRuntimeForLengthSlideWindow(){
-        SiddhiManager siddhiManager = new SiddhiManager();
-
-        String cseEventStream = "define stream testStream (user string, cmd string);";
-        String query = "@info(name = 'query1') from testStream#window.length(3) "
-                + " select *"
-                + " insert all events into OutputStream";
-
-        ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime("@Plan:name('testPlan') " + cseEventStream + query);
-
-        executionPlanRuntime.addCallback("query1", new QueryCallback() {
-            @Override
-            public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
-                EventPrinter.print(timeStamp, inEvents, removeEvents);
-            }
-        });
-
-        executionPlanRuntime.start();
-        return executionPlanRuntime;
-    }
-
-    @Ignore
-    public void testLengthSlideWindow() throws Exception{
-        String tmpdir = System.getProperty("java.io.tmpdir");
-        System.out.println("temporary directory: " + tmpdir);
-
-        String stateFile = tmpdir + "/siddhi-state-lengthslidewindow";
-        ExecutionPlanRuntime executionPlanRuntime = setupRuntimeForLengthSlideWindow();
-        executionPlanRuntime.getInputHandler("testStream").send(new Object[]{"rename", "/tmp/pii_1"});
-        executionPlanRuntime.getInputHandler("testStream").send(new Object[]{"rename", "/tmp/pii_2"});
-        executionPlanRuntime.getInputHandler("testStream").send(new Object[]{"rename", "/tmp/pii_3"});
-        executionPlanRuntime.getInputHandler("testStream").send(new Object[]{"rename", "/tmp/pii_4"});
-        byte[] state = executionPlanRuntime.snapshot();
-        int length = state.length;
-        FileOutputStream output = new FileOutputStream(stateFile);
-        output.write(state);
-        output.close();
-        executionPlanRuntime.shutdown();
-
-        ExecutionPlanRuntime restoredRuntime = setupRuntimeForLengthSlideWindow();
-        FileInputStream input = new FileInputStream(stateFile);
-        byte[] restoredState = new byte[length];
-        input.read(restoredState);
-        restoredRuntime.restore(restoredState);
-        restoredRuntime.getInputHandler("testStream").send(new Object[]{"rename", "/tmp/pii_5"});
-        input.close();
-        restoredRuntime.shutdown();
-    }
-
-    private ExecutionPlanRuntime setupRuntimeForLengthSlideWindowWithGroupby(){
-        SiddhiManager siddhiManager = new SiddhiManager();
-
-        String cseEventStream = "define stream testStream (user string, cmd string);";
-        String query = "@info(name = 'query1') from testStream#window.length(50) "
-                + " select user, cmd, count(user) as cnt"
-                + " insert all events into OutputStream";
-
-        ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime("@Plan:name('testPlan') " + cseEventStream + query);
-
-        executionPlanRuntime.addCallback("query1", new QueryCallback() {
-            @Override
-            public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
-                EventPrinter.print(timeStamp, inEvents, removeEvents);
-            }
-        });
-
-        executionPlanRuntime.start();
-        return executionPlanRuntime;
-    }
-
-    @Ignore
-    public void testLengthSlideWindowWithGroupby() throws Exception{
-        String tmpdir = System.getProperty("java.io.tmpdir");
-        System.out.println("temporary directory: " + tmpdir);
-
-        String stateFile = tmpdir + "/siddhi-state-lengthslidewindowwithgroupby";
-        ExecutionPlanRuntime executionPlanRuntime = setupRuntimeForLengthSlideWindowWithGroupby();
-        executionPlanRuntime.getInputHandler("testStream").send(new Object[]{"rename", "/tmp/pii_1"});
-        executionPlanRuntime.getInputHandler("testStream").send(new Object[]{"rename", "/tmp/pii_2"});
-        executionPlanRuntime.getInputHandler("testStream").send(new Object[]{"rename", "/tmp/pii_3"});
-        executionPlanRuntime.getInputHandler("testStream").send(new Object[]{"rename", "/tmp/pii_4"});
-        byte[] state = executionPlanRuntime.snapshot();
-        int length = state.length;
-        FileOutputStream output = new FileOutputStream(stateFile);
-        output.write(state);
-        output.close();
-        executionPlanRuntime.shutdown();
-
-        ExecutionPlanRuntime restoredRuntime = setupRuntimeForLengthSlideWindowWithGroupby();
-        FileInputStream input = new FileInputStream(stateFile);
-        byte[] restoredState = new byte[length];
-        input.read(restoredState);
-        restoredRuntime.restore(restoredState);
-        restoredRuntime.getInputHandler("testStream").send(new Object[]{"rename", "/tmp/pii_5"});
-        input.close();
-        restoredRuntime.shutdown();
-    }
-
-    private ExecutionPlanRuntime setupRuntimeForTimeSlideWindow(){
-        SiddhiManager siddhiManager = new SiddhiManager();
-        String cseEventStream = "define stream testStream (timeStamp long, user string, cmd string);";
-        String query = "@info(name = 'query1') from testStream[cmd == 'open']#window.externalTime(timeStamp,3 sec)"
-                + " select user, timeStamp " +
-                "insert all events into outputStream;";
-
-        ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime("@Plan:name('testPlan') " + cseEventStream + query);
-
-        executionPlanRuntime.addCallback("query1", new QueryCallback() {
-            @Override
-            public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
-                EventPrinter.print(timeStamp, inEvents, removeEvents);
-            }
-        });
-        executionPlanRuntime.start();
-        return executionPlanRuntime;
-    }
-
-    @Test
-    public void testTimeSlideWindow() throws Exception{
-        String tmpdir = System.getProperty("java.io.tmpdir");
-        System.out.println("temporary directory: " + tmpdir);
-
-        String stateFile = tmpdir + "/siddhi-state-timeslidewindow";
-        ExecutionPlanRuntime executionPlanRuntime = setupRuntimeForTimeSlideWindow();
-        InputHandler inputHandler = executionPlanRuntime.getInputHandler("testStream");
-        long curTime = DateTimeUtil.humanDateToMilliseconds("2015-09-17 00:00:00,000");
-        inputHandler.send(new Object[]{curTime, "user", "open"});
-        inputHandler.send(new Object[]{curTime + 1000, "user", "open"});
-        inputHandler.send(new Object[]{curTime + 2000, "user", "open"});
-        inputHandler.send(new Object[]{curTime + 3000, "user", "open"});
-
-        byte[] state = executionPlanRuntime.snapshot();
-        int length = state.length;
-        FileOutputStream output = new FileOutputStream(stateFile);
-        output.write(state);
-        output.close();
-        executionPlanRuntime.shutdown();
-
-        ExecutionPlanRuntime restoredRuntime = setupRuntimeForTimeSlideWindow();
-        FileInputStream input = new FileInputStream(stateFile);
-        byte[] restoredState = new byte[length];
-        input.read(restoredState);
-        restoredRuntime.restore(restoredState);
-        inputHandler = restoredRuntime.getInputHandler("testStream");
-        inputHandler.send(new Object[]{curTime + 4000, "user", "open"});
-        inputHandler.send(new Object[]{curTime + 5000, "user", "open"});
-        inputHandler.send(new Object[]{curTime + 6000, "user", "open"});
-        inputHandler.send(new Object[]{curTime + 7000, "user", "open"});
-        Thread.sleep(1000);
-        input.close();
-        restoredRuntime.shutdown();
-    }
-    
-    private ExecutionPlanRuntime setupRuntimeForExternalTimeSlideWindowWithGroupby(){
-        SiddhiManager siddhiManager = new SiddhiManager();
-        String cseEventStream = "define stream testStream (timeStamp long, user string, cmd string);";
-        String query = "@info(name = 'query1') from testStream[cmd == 'open']#window.externalTime(timeStamp,30 sec)"
-                + " select user, timeStamp, count(user) as cnt"
-                + " group by user"
-                + " having cnt > 2"
-               + " insert all events into outputStream;";
-
-        ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime("@Plan:name('testPlan') " + cseEventStream + query);
-
-        executionPlanRuntime.addCallback("query1", new QueryCallback() {
-            @Override
-            public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
-                EventPrinter.print(timeStamp, inEvents, removeEvents);
-            }
-        });
-        executionPlanRuntime.start();
-        return executionPlanRuntime;
-    }
-
-    @Test
-    public void testExternalTimeSlideWindowWithGroupby() throws Exception{
-        String tmpdir = System.getProperty("java.io.tmpdir");
-        System.out.println("temporary directory: " + tmpdir);
-
-        String stateFile = tmpdir + "/siddhi-state-externaltimeslidewindow";
-        ExecutionPlanRuntime executionPlanRuntime = setupRuntimeForExternalTimeSlideWindowWithGroupby();
-        InputHandler inputHandler = executionPlanRuntime.getInputHandler("testStream");
-        long curTime = DateTimeUtil.humanDateToMilliseconds("2015-09-17 00:00:00,000");
-        inputHandler.send(new Object[]{curTime, "user", "open"});
-        inputHandler.send(new Object[]{curTime + 1000, "user", "open"});
-        inputHandler.send(new Object[]{curTime + 2000, "user", "open"});
-        inputHandler.send(new Object[]{curTime + 3000, "user", "open"});
-        Thread.sleep(1000);
-
-        byte[] state = executionPlanRuntime.snapshot();
-        int length = state.length;
-        FileOutputStream output = new FileOutputStream(stateFile);
-        output.write(state);
-        output.close();
-        executionPlanRuntime.shutdown();
-
-        ExecutionPlanRuntime restoredRuntime = setupRuntimeForExternalTimeSlideWindowWithGroupby();
-        FileInputStream input = new FileInputStream(stateFile);
-        byte[] restoredState = new byte[length];
-        input.read(restoredState);
-        restoredRuntime.restore(restoredState);
-        inputHandler = restoredRuntime.getInputHandler("testStream");
-        inputHandler.send(new Object[]{curTime + 4000, "user", "open"});
-        inputHandler.send(new Object[]{curTime + 5000, "user", "open"});
-        inputHandler.send(new Object[]{curTime + 6000, "user", "open"});
-        inputHandler.send(new Object[]{curTime + 7000, "user", "open"});
-        Thread.sleep(1000);
-        input.close();
-        restoredRuntime.shutdown();
-    }
-
-    private ExecutionPlanRuntime setupRuntimeForExternalTimeSlideWindowWithGroupby_2(SiddhiManager siddhiManager){
-        String cseEventStream = "define stream testStream (timeStamp long, user string, cmd string);";
-        String query = "@info(name = 'query1') from testStream[cmd == 'open']#window.externalTime(timeStamp,300 sec)"
-                + " select user, timeStamp, count(user) as cnt"
-                + " group by user"
-                + " insert all events into outputStream;";
-
-        ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime("@Plan:name('testPlan') " + cseEventStream + query);
-
-        executionPlanRuntime.addCallback("query1", new QueryCallback() {
-            @Override
-            public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
-                EventPrinter.print(timeStamp, inEvents, removeEvents);
-            }
-        });
-        return executionPlanRuntime;
-    }
-
-    @Test
-    public void testExternalTimeSlideWindowWithGroupby_2() throws Exception{
-        SiddhiManager siddhiManager = new SiddhiManager();
-        PersistenceStore persistenceStore = new InMemoryPersistenceStore();
-        siddhiManager.setPersistenceStore(persistenceStore);
-
-        ExecutionPlanRuntime executionPlanRuntime = setupRuntimeForExternalTimeSlideWindowWithGroupby_2(siddhiManager);
-        InputHandler inputHandler = executionPlanRuntime.getInputHandler("testStream");
-        executionPlanRuntime.start();
-        long curTime = DateTimeUtil.humanDateToMilliseconds("2015-09-17 00:00:00,000");
-        inputHandler.send(new Object[]{curTime, "user", "open"});
-        inputHandler.send(new Object[]{curTime + 1000, "user", "open"});
-        inputHandler.send(new Object[]{curTime + 2000, "user", "open"});
-        inputHandler.send(new Object[]{curTime + 3000, "user", "open"});
-        Thread.sleep(100);
-        executionPlanRuntime.persist();
-        executionPlanRuntime.shutdown();
-        ExecutionPlanRuntime restoredRuntime = setupRuntimeForExternalTimeSlideWindowWithGroupby_2(siddhiManager);
-        inputHandler = restoredRuntime.getInputHandler("testStream");
-        restoredRuntime.start();
-        restoredRuntime.restoreLastRevision();
-        inputHandler.send(new Object[]{curTime + 4000, "user", "open"});
-        inputHandler.send(new Object[]{curTime + 5000, "user", "open"});
-        inputHandler.send(new Object[]{curTime + 6000, "user", "open"});
-        inputHandler.send(new Object[]{curTime + 7000, "user", "open"});
-        Thread.sleep(1000);
-        restoredRuntime.shutdown();
-    }
-
-    private ExecutionPlanRuntime setupRuntimeForInternalTimeSlideWindowWithGroupby(){
-        SiddhiManager siddhiManager = new SiddhiManager();
-        String cseEventStream = "define stream testStream (user string, cmd string);";
-        String query = "@info(name = 'query1') from testStream[cmd == 'open']#window.time(5 sec)"
-                + " select user, count(user) as cnt"
-                + " group by user"
-                + " having cnt > 2"
-                + " insert events into outputStream;";
-
-        ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime("@Plan:name('testPlan') " + cseEventStream + query);
-
-        executionPlanRuntime.addCallback("query1", new QueryCallback() {
-            @Override
-            public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
-                EventPrinter.print(timeStamp, inEvents, removeEvents);
-            }
-        });
-        executionPlanRuntime.start();
-        return executionPlanRuntime;
-    }
-
-    @Test
-    public void testInternalTimeSlideWindowWithGroupby() throws Exception{
-        String tmpdir = System.getProperty("java.io.tmpdir");
-        System.out.println("temporary directory: " + tmpdir);
-
-        String stateFile = tmpdir + "/siddhi-state-internaltimeslidewindow";
-        ExecutionPlanRuntime executionPlanRuntime = setupRuntimeForInternalTimeSlideWindowWithGroupby();
-        InputHandler inputHandler = executionPlanRuntime.getInputHandler("testStream");
-        inputHandler.send(new Object[]{"user", "open"});
-        Thread.sleep(1000);
-        inputHandler.send(new Object[]{"user", "open"});
-        Thread.sleep(1000);
-        inputHandler.send(new Object[]{"user", "open"});
-        Thread.sleep(1000);
-        inputHandler.send(new Object[]{"user", "open"});
-
-        byte[] state = executionPlanRuntime.snapshot();
-        int length = state.length;
-        FileOutputStream output = new FileOutputStream(stateFile);
-        output.write(state);
-        output.close();
-        executionPlanRuntime.shutdown();
-
-        ExecutionPlanRuntime restoredRuntime = setupRuntimeForInternalTimeSlideWindowWithGroupby();
-        FileInputStream input = new FileInputStream(stateFile);
-        byte[] restoredState = new byte[length];
-        input.read(restoredState);
-        restoredRuntime.restore(restoredState);
-        inputHandler = restoredRuntime.getInputHandler("testStream");
-        inputHandler.send(new Object[]{"user", "open"});
-        Thread.sleep(1000);
-        inputHandler.send(new Object[]{"user", "open"});
-        Thread.sleep(1000);
-        inputHandler.send(new Object[]{"user", "open"});
-        Thread.sleep(1000);
-        inputHandler.send(new Object[]{"user", "open"});
-        Thread.sleep(1000);
-        input.close();
-        restoredRuntime.shutdown();
-    }
-
-    private int count;
-    private boolean eventArrived;
-
-    @Before
-    public void init() {
-        count = 0;
-        eventArrived = false;
-    }
-
-    /**
-     * Siddhi does not support external time window based snapshot
-     * @throws InterruptedException
-     */
-    public void persistenceTest7() throws InterruptedException {
-        PersistenceStore persistenceStore = new InMemoryPersistenceStore();
-
-        SiddhiManager siddhiManager = new SiddhiManager();
-        siddhiManager.setPersistenceStore(persistenceStore);
-
-        String executionPlan = "" +
-                "@plan:name('Test') " +
-                "" +
-                "define stream StockStream (symbol string, price float, volume int, timestamp long);" +
-                "" +
-                "@info(name = 'query1')" +
-                "from StockStream#window.externalTime(timestamp,30 sec) " +
-                "select symbol, price, sum(volume) as totalVol, count(symbol) as cnt " +
-                "group by symbol " +
-                "insert into OutStream ";
-
-        QueryCallback queryCallback = new QueryCallback() {
-            @Override
-            public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
-                EventPrinter.print(timeStamp, inEvents, removeEvents);
-                eventArrived = true;
-                for (Event inEvent : inEvents) {
-                    count++;
-                    Assert.assertTrue("IBM".equals(inEvent.getData(0)) || "WSO2".equals(inEvent.getData(0)));
-                    if (count == 5) {
-                        Assert.assertEquals(400l, inEvent.getData(2));
-                    }
-                    if (count == 6) {
-                        Assert.assertEquals(200l, inEvent.getData(2));
-                    }
-                }
-            }
-        };
-
-        ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(executionPlan);
-        executionPlanRuntime.addCallback("query1", queryCallback);
-
-        InputHandler inputHandler = executionPlanRuntime.getInputHandler("StockStream");
-        executionPlanRuntime.start();
-        long currentTime = 0;
-
-        inputHandler.send(new Object[]{"IBM", 75.1f, 100, currentTime + 1000});
-        Thread.sleep(100);
-        inputHandler.send(new Object[]{"WSO2", 75.2f, 100, currentTime + 2000});
-        Thread.sleep(100);
-        inputHandler.send(new Object[]{"IBM", 75.3f, 100, currentTime + 3000});
-
-        Thread.sleep(500);
-        Assert.assertTrue(eventArrived);
-        Assert.assertEquals(3, count);
-
-        //persisting
-        Thread.sleep(500);
-        executionPlanRuntime.persist();
-
-        //restarting execution plan
-        Thread.sleep(500);
-        executionPlanRuntime.shutdown();
-        executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(executionPlan);
-        executionPlanRuntime.addCallback("query1", queryCallback);
-        inputHandler = executionPlanRuntime.getInputHandler("StockStream");
-        executionPlanRuntime.start();
-
-        //loading
-        executionPlanRuntime.restoreLastRevision();
-
-        inputHandler.send(new Object[]{"IBM", 75.4f, 100, currentTime + 4000});
-        Thread.sleep(100);
-        inputHandler.send(new Object[]{"IBM", 75.5f, 100, currentTime + 5000});
-        Thread.sleep(100);
-        inputHandler.send(new Object[]{"WSO2", 75.6f, 100, currentTime + 6000});
-
-        //shutdown execution plan
-        Thread.sleep(500);
-        executionPlanRuntime.shutdown();
-
-        Assert.assertEquals(count, 6);
-        Assert.assertEquals(true, eventArrived);
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-core/eagle-alert-parent/eagle-alert-service/src/main/java/org/apache/eagle/service/alert/resolver/AttributeResolveResource.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-service/src/main/java/org/apache/eagle/service/alert/resolver/AttributeResolveResource.java b/eagle-core/eagle-alert-parent/eagle-alert-service/src/main/java/org/apache/eagle/service/alert/resolver/AttributeResolveResource.java
index 68995d1..bcffec1 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert-service/src/main/java/org/apache/eagle/service/alert/resolver/AttributeResolveResource.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert-service/src/main/java/org/apache/eagle/service/alert/resolver/AttributeResolveResource.java
@@ -49,7 +49,7 @@ public class AttributeResolveResource {
             response.setObj(result);
         } catch (Exception e) {
             response.setSuccess(false);
-            response.setException(EagleExceptionWrapper.wrap(e));
+            response.setException(e);
             return response;
         }
         return response;
@@ -73,9 +73,9 @@ public class AttributeResolveResource {
             response.setObj(result);
         } catch (Exception e) {
             response.setSuccess(false);
-            response.setException(EagleExceptionWrapper.wrap(e));
+            response.setException(e);
             return response;
         }
         return response;
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/AbstractApplication.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/AbstractApplication.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/AbstractApplication.java
index dca2e3d..5b498eb 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/AbstractApplication.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/AbstractApplication.java
@@ -1,69 +1,74 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app;
-
-import com.typesafe.config.Config;
-import org.apache.eagle.app.environment.Environment;
-import org.apache.eagle.app.environment.ExecutionRuntimeManager;
-import org.apache.eagle.app.utils.ApplicationConfigHelper;
-
-import java.lang.reflect.ParameterizedType;
-import java.util.Map;
-
-abstract class AbstractApplication<Conf extends Configuration,Env extends Environment,Proc> implements Application<Conf,Env,Proc>, ApplicationTool<Conf> {
-    private Class<Conf> parametrizedConfigClass;
-
-    @Override
-    public Proc execute(Map<String, Object> config, Env env) {
-        return execute(ApplicationConfigHelper.convertFrom(config, getConfigType()),env);
-    }
-
-    /**
-     *  Map application configuration from environment
-     *
-     * @param config
-     * @return
-     */
-    private Conf loadAppConfigFromEnv(Config config){
-        return ApplicationConfigHelper.convertFrom(ApplicationConfigHelper.unwrapFrom(config,getClass().getCanonicalName()), getConfigType());
-    }
-
-    @Override
-    public void run(Config config) {
-        ExecutionRuntimeManager.getInstance().getRuntime(getEnvironmentType(),config).start(this,loadAppConfigFromEnv(config));
-    }
-
-    @Override
-    public void run(Configuration conf, Config config) {
-        ExecutionRuntimeManager.getInstance().getRuntime(getEnvironmentType(), config).start(this,conf);
-    }
-
-    @Override
-    public Proc execute(Env environment) {
-        return execute(loadAppConfigFromEnv(environment.config()),environment);
-    }
-
-    /**
-     * @return Config class from Generic Type
-     */
-    public Class<Conf> getConfigType(){
-        if (parametrizedConfigClass == null) {
-            this.parametrizedConfigClass = (Class<Conf>) ((ParameterizedType) getClass().getGenericSuperclass()).getActualTypeArguments()[0];
-        }
-        return parametrizedConfigClass;
-    }
-}
\ No newline at end of file
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app;
+
+import com.typesafe.config.Config;
+import org.apache.eagle.app.environment.Environment;
+import org.apache.eagle.app.environment.ExecutionRuntimeManager;
+import org.apache.eagle.app.utils.ApplicationConfigHelper;
+
+import java.lang.reflect.ParameterizedType;
+import java.util.Map;
+
+abstract class AbstractApplication<Conf extends Configuration,Env extends Environment,Proc> implements Application<Conf,Env,Proc>, ApplicationTool<Conf> {
+    private Class<Conf> parametrizedConfigClass;
+
+    @Override
+    public Proc execute(Map<String, Object> config, Env env) {
+        return execute(ApplicationConfigHelper.convertFrom(config, getConfigType()),env);
+    }
+
+    @Override
+    public Proc execute(Config config, Env environment){
+        return null;
+    }
+    /**
+     *  Map application configuration from environment
+     *
+     * @param config
+     * @return
+     */
+    private Conf loadAppConfigFromEnv(Config config){
+        return ApplicationConfigHelper.convertFrom(ApplicationConfigHelper.unwrapFrom(config,getClass().getCanonicalName()), getConfigType());
+    }
+
+    @Override
+    public void run(Config config) {
+//        ExecutionRuntimeManager.getInstance().getRuntime(getEnvironmentType(),config).start(this,loadAppConfigFromEnv(config));
+        ExecutionRuntimeManager.getInstance().getRuntime(getEnvironmentType(),config).start(this,config);
+    }
+
+    @Override
+    public void run(Configuration conf, Config config) {
+//        ExecutionRuntimeManager.getInstance().getRuntime(getEnvironmentType(), config).start(this,conf);
+    }
+
+    @Override
+    public Proc execute(Env environment) {
+        return execute(loadAppConfigFromEnv(environment.config()),environment);
+    }
+
+    /**
+     * @return Config class from Generic Type
+     */
+    public Class<Conf> getConfigType(){
+        if (parametrizedConfigClass == null) {
+            this.parametrizedConfigClass = (Class<Conf>) ((ParameterizedType) getClass().getGenericSuperclass()).getActualTypeArguments()[0];
+        }
+        return parametrizedConfigClass;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/Application.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/Application.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/Application.java
index 699b13e..d1e4c9b 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/Application.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/Application.java
@@ -16,6 +16,7 @@
  */
 package org.apache.eagle.app;
 
+import com.typesafe.config.Config;
 import org.apache.eagle.app.environment.Environment;
 
 import java.io.Serializable;
@@ -63,6 +64,14 @@ public interface Application <
     Proc execute(Map<String,Object> config, Env environment);
 
     /**
+     * Execute with type-safe configuration
+     * @param config
+     * @param environment
+     * @return
+     */
+    Proc execute(Config config, Env environment);
+
+    /**
      * Execute with environment based configuration
      *
      * Light-weight Runner (dry-run/test purpose) oriented interface
@@ -81,4 +90,4 @@ public interface Application <
      * @return application environment type
      */
     Class<? extends Env> getEnvironmentType();
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/ExecutionRuntime.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/ExecutionRuntime.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/ExecutionRuntime.java
index 7605d92..c4e89f4 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/ExecutionRuntime.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/ExecutionRuntime.java
@@ -1,53 +1,54 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.environment;
-
-import org.apache.eagle.app.Application;
-import org.apache.eagle.app.Configuration;
-
-/**
- * Execution Runtime Adapter
- */
-public interface ExecutionRuntime<Env extends Environment, Proc> {
-    /**
-     * @param environment
-     */
-    void prepare(Env environment);
-
-    Env environment();
-
-    /**
-     * @param executor
-     * @param config
-     * @param <Conf>
-     */
-    <Conf extends Configuration> void start(Application<Conf,Env, Proc> executor, Conf config);
-
-    /**
-     * @param executor
-     * @param config
-     * @param <Conf>
-     */
-    <Conf extends Configuration> void stop(Application<Conf,Env, Proc> executor, Conf config);
-
-    /**
-     * @param executor
-     * @param config
-     * @param <Conf>
-     */
-    <Conf extends Configuration> void status(Application<Conf,Env, Proc> executor, Conf config);
-}
\ No newline at end of file
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.environment;
+
+import com.typesafe.config.Config;
+import org.apache.eagle.app.Application;
+import org.apache.eagle.app.Configuration;
+
+/**
+ * Execution Runtime Adapter
+ */
+public interface ExecutionRuntime<Env extends Environment, Proc> {
+    /**
+     * @param environment
+     */
+    void prepare(Env environment);
+
+    Env environment();
+
+    /**
+     * @param executor
+     * @param config
+     * @param <Conf>
+     */
+    <Conf extends Configuration> void start(Application<Conf,Env, Proc> executor, Config config);
+
+    /**
+     * @param executor
+     * @param config
+     * @param <Conf>
+     */
+    <Conf extends Configuration> void stop(Application<Conf,Env, Proc> executor, Config config);
+
+    /**
+     * @param executor
+     * @param config
+     * @param <Conf>
+     */
+    <Conf extends Configuration> void status(Application<Conf,Env, Proc> executor, Config config);
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/SparkExecutionRuntime.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/SparkExecutionRuntime.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/SparkExecutionRuntime.java
index 5bcde92..5d4e049 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/SparkExecutionRuntime.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/SparkExecutionRuntime.java
@@ -1,58 +1,59 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.environment.impl;
-
-import org.apache.eagle.app.Application;
-import org.apache.eagle.app.Configuration;
-import org.apache.eagle.app.environment.ExecutionRuntime;
-import org.apache.eagle.app.environment.ExecutionRuntimeProvider;
-
-public class SparkExecutionRuntime implements ExecutionRuntime<SparkEnvironment,Object> {
-    @Override
-    public void prepare(SparkEnvironment environment) {
-        throw new RuntimeException("Not implemented yet");
-    }
-
-    @Override
-    public SparkEnvironment environment() {
-        throw new RuntimeException("Not implemented yet");
-    }
-
-    @Override
-    public void start(Application executor, Configuration config) {
-
-        throw new RuntimeException("Not implemented yet");
-    }
-
-    @Override
-    public void stop(Application executor, Configuration config) {
-
-        throw new RuntimeException("Not implemented yet");
-    }
-
-    @Override
-    public void status(Application executor, Configuration config) {
-        throw new RuntimeException("Not implemented yet");
-    }
-
-    public static class Provider implements ExecutionRuntimeProvider<SparkEnvironment,Object> {
-        @Override
-        public SparkExecutionRuntime get() {
-            return new SparkExecutionRuntime();
-        }
-    }
-}
\ No newline at end of file
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.environment.impl;
+
+import com.typesafe.config.Config;
+import org.apache.eagle.app.Application;
+import org.apache.eagle.app.Configuration;
+import org.apache.eagle.app.environment.ExecutionRuntime;
+import org.apache.eagle.app.environment.ExecutionRuntimeProvider;
+
+public class SparkExecutionRuntime implements ExecutionRuntime<SparkEnvironment,Object> {
+    @Override
+    public void prepare(SparkEnvironment environment) {
+        throw new RuntimeException("Not implemented yet");
+    }
+
+    @Override
+    public SparkEnvironment environment() {
+        throw new RuntimeException("Not implemented yet");
+    }
+
+    @Override
+    public void start(Application executor, Config config) {
+
+        throw new RuntimeException("Not implemented yet");
+    }
+
+    @Override
+    public void stop(Application executor, Config config) {
+
+        throw new RuntimeException("Not implemented yet");
+    }
+
+    @Override
+    public void status(Application executor, Config config) {
+        throw new RuntimeException("Not implemented yet");
+    }
+
+    public static class Provider implements ExecutionRuntimeProvider<SparkEnvironment,Object> {
+        @Override
+        public SparkExecutionRuntime get() {
+            return new SparkExecutionRuntime();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormEnvironment.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormEnvironment.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormEnvironment.java
index 4b4a0be..1112588 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormEnvironment.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormEnvironment.java
@@ -1,42 +1,42 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.environment.impl;
-
-import com.typesafe.config.Config;
-import org.apache.eagle.app.Configuration;
-import org.apache.eagle.app.environment.AbstractEnvironment;
-import org.apache.eagle.app.sink.FlattenEventMapper;
-import org.apache.eagle.app.sink.LoggingStreamSink;
-import org.apache.eagle.app.sink.StormStreamSink;
-import org.apache.eagle.app.sink.StreamSink;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.Int;
-import storm.trident.spout.RichSpoutBatchExecutor;
-
-/**
- * Storm Execution Environment Context
- */
-public class StormEnvironment extends AbstractEnvironment {
-    public StormEnvironment(Config envConfig) {
-        super(envConfig);
-    }
-
-    public StormStreamSink getFlattenStreamSink(String streamId,Configuration appConfig) {
-        return ((StormStreamSink) streamSink().getSink(streamId,appConfig)).setEventMapper(new FlattenEventMapper(streamId));
-    }
-}
\ No newline at end of file
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.environment.impl;
+
+import com.typesafe.config.Config;
+import org.apache.eagle.app.Configuration;
+import org.apache.eagle.app.environment.AbstractEnvironment;
+import org.apache.eagle.app.sink.FlattenEventMapper;
+import org.apache.eagle.app.sink.LoggingStreamSink;
+import org.apache.eagle.app.sink.StormStreamSink;
+import org.apache.eagle.app.sink.StreamSink;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Int;
+import storm.trident.spout.RichSpoutBatchExecutor;
+
+/**
+ * Storm Execution Environment Context
+ */
+public class StormEnvironment extends AbstractEnvironment {
+    public StormEnvironment(Config envConfig) {
+        super(envConfig);
+    }
+
+    public StormStreamSink getFlattenStreamSink(String streamId, Config config) {
+        return ((StormStreamSink) streamSink().getSink(streamId,config)).setEventMapper(new FlattenEventMapper(streamId));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java
index 23e7334..06c22dc 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/environment/impl/StormExecutionRuntime.java
@@ -1,162 +1,165 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.environment.impl;
-
-import backtype.storm.Config;
-import backtype.storm.LocalCluster;
-import backtype.storm.StormSubmitter;
-import backtype.storm.generated.*;
-import backtype.storm.utils.NimbusClient;
-import com.google.common.base.Preconditions;
-import org.apache.eagle.app.Application;
-import org.apache.eagle.app.Configuration;
-import org.apache.eagle.app.environment.ExecutionRuntime;
-import org.apache.eagle.app.environment.ExecutionRuntimeProvider;
-import org.apache.eagle.app.utils.DynamicJarPathFinder;
-import org.apache.eagle.metadata.model.ApplicationEntity;
-import org.apache.thrift7.TException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.Int;
-import storm.trident.spout.RichSpoutBatchExecutor;
-
-public class StormExecutionRuntime implements ExecutionRuntime<StormEnvironment,StormTopology> {
-    private final static Logger LOG = LoggerFactory.getLogger(StormExecutionRuntime.class);
-    private static LocalCluster _localCluster;
-
-    private StormEnvironment environment;
-
-//    static {
-//        Runtime.getRuntime().addShutdownHook(new Thread(){
-//            @Override
-//            public void run() {
-//                if(_localCluster != null) {
-//                    LOG.info("Shutting down local storm cluster instance");
-//                    _localCluster.shutdown();
-//                }
-//            }
-//        });
-//    }
-
-    private static LocalCluster getLocalCluster(){
-        if(_localCluster == null){
-            _localCluster = new LocalCluster();
-        }
-        return _localCluster;
-    }
-
-    @Override
-    public void prepare(StormEnvironment environment) {
-        this.environment = environment;
-    }
-
-    @Override
-    public StormEnvironment environment() {
-        return this.environment;
-    }
-
-    private final static String STORM_NIMBUS_HOST_CONF_PATH = "application.storm.nimbusHost";
-    private final static String STORM_NIMBUS_HOST_DEFAULT = "localhost";
-    private final static Integer STORM_NIMBUS_THRIFT_DEFAULT = 6627;
-    private final static String STORM_NIMBUS_THRIFT_CONF_PATH = "application.storm.nimbusThriftPort";
-
-    public backtype.storm.Config getStormConfig(){
-        backtype.storm.Config conf = new backtype.storm.Config();
-        conf.put(RichSpoutBatchExecutor.MAX_BATCH_SIZE_CONF, Int.box(64 * 1024));
-        conf.put(backtype.storm.Config.TOPOLOGY_RECEIVER_BUFFER_SIZE, Int.box(8));
-        conf.put(backtype.storm.Config.TOPOLOGY_TRANSFER_BUFFER_SIZE, Int.box(32));
-        conf.put(backtype.storm.Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, Int.box(16384));
-        conf.put(backtype.storm.Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, Int.box(16384));
-        conf.put(backtype.storm.Config.NIMBUS_THRIFT_MAX_BUFFER_SIZE, Int.box(20480000));
-        String nimbusHost = STORM_NIMBUS_HOST_DEFAULT;
-        if(environment.config().hasPath(STORM_NIMBUS_HOST_CONF_PATH)) {
-            nimbusHost = environment.config().getString(STORM_NIMBUS_HOST_CONF_PATH);
-            LOG.info("Overriding {} = {}",STORM_NIMBUS_HOST_CONF_PATH,nimbusHost);
-        } else {
-            LOG.info("Using default {} = {}",STORM_NIMBUS_HOST_CONF_PATH,STORM_NIMBUS_HOST_DEFAULT);
-        }
-        Integer nimbusThriftPort =  STORM_NIMBUS_THRIFT_DEFAULT;
-        if(environment.config().hasPath(STORM_NIMBUS_THRIFT_CONF_PATH)) {
-            nimbusThriftPort = environment.config().getInt(STORM_NIMBUS_THRIFT_CONF_PATH);
-            LOG.info("Overriding {} = {}",STORM_NIMBUS_THRIFT_CONF_PATH,nimbusThriftPort);
-        } else {
-            LOG.info("Using default {} = {}",STORM_NIMBUS_THRIFT_CONF_PATH,STORM_NIMBUS_THRIFT_DEFAULT);
-        }
-        conf.put(backtype.storm.Config.NIMBUS_HOST, nimbusHost);
-        conf.put(backtype.storm.Config.NIMBUS_THRIFT_PORT, nimbusThriftPort);
-        return conf;
-    }
-
-    @Override
-    public <Conf extends Configuration> void start(Application<Conf, StormEnvironment, StormTopology> executor, Conf config){
-        String topologyName = config.getAppId();
-        Preconditions.checkNotNull(topologyName,"[appId] is required by null for "+executor.getClass().getCanonicalName());
-        StormTopology topology = executor.execute(config, environment);
-        LOG.info("Starting {} ({})",topologyName,executor.getClass().getCanonicalName());
-        Config conf = getStormConfig();
-        if(config.getMode() == ApplicationEntity.Mode.CLUSTER){
-            if(config.getJarPath() == null) config.setJarPath(DynamicJarPathFinder.findPath(executor.getClass()));
-            String jarFile = config.getJarPath();
-            synchronized (StormExecutionRuntime.class) {
-                System.setProperty("storm.jar", jarFile);
-                LOG.info("Submitting as cluster mode ...");
-                try {
-                    StormSubmitter.submitTopologyWithProgressBar(topologyName, conf, topology);
-                } catch (AlreadyAliveException | InvalidTopologyException e) {
-                    LOG.error(e.getMessage(), e);
-                    throw new RuntimeException(e.getMessage(),e);
-                } finally {
-                    System.clearProperty("storm.jar");
-                }
-            }
-        } else {
-            LOG.info("Submitting as local mode ...");
-            getLocalCluster().submitTopology(topologyName, conf, topology);
-            LOG.info("Submitted");
-        }
-    }
-
-    @Override
-    public <Conf extends Configuration> void stop(Application<Conf,StormEnvironment, StormTopology> executor, Conf config) {
-        String appId = config.getAppId();
-        if(config.getMode() == ApplicationEntity.Mode.CLUSTER){
-            Nimbus.Client stormClient = NimbusClient.getConfiguredClient(getStormConfig()).getClient();
-            try {
-                stormClient.killTopology(appId);
-            } catch (NotAliveException | TException e) {
-                LOG.error("Failed to kill topology named {}, due to: {}",appId,e.getMessage(),e.getCause());
-            }
-        } else {
-            KillOptions killOptions = new KillOptions();
-            killOptions.set_wait_secs(0);
-            getLocalCluster().killTopologyWithOpts(appId,killOptions);
-        }
-    }
-
-    @Override
-    public <Conf extends Configuration> void status(Application<Conf,StormEnvironment, StormTopology> executor, Conf config) {
-        // TODO: Not implemented yet!
-        throw new RuntimeException("TODO: Not implemented yet!");
-    }
-
-    public static class Provider implements ExecutionRuntimeProvider<StormEnvironment,StormTopology> {
-        @Override
-        public StormExecutionRuntime get() {
-            return new StormExecutionRuntime();
-        }
-    }
-}
\ No newline at end of file
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.environment.impl;
+
+import backtype.storm.Config;
+import backtype.storm.LocalCluster;
+import backtype.storm.StormSubmitter;
+import backtype.storm.generated.*;
+import backtype.storm.utils.NimbusClient;
+import com.google.common.base.Preconditions;
+import org.apache.eagle.app.Application;
+import org.apache.eagle.app.Configuration;
+import org.apache.eagle.app.environment.ExecutionRuntime;
+import org.apache.eagle.app.environment.ExecutionRuntimeProvider;
+import org.apache.eagle.app.utils.DynamicJarPathFinder;
+import org.apache.eagle.metadata.model.ApplicationEntity;
+import org.apache.thrift7.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Int;
+import storm.trident.spout.RichSpoutBatchExecutor;
+
+public class StormExecutionRuntime implements ExecutionRuntime<StormEnvironment,StormTopology> {
+    private final static Logger LOG = LoggerFactory.getLogger(StormExecutionRuntime.class);
+    private static LocalCluster _localCluster;
+
+    private StormEnvironment environment;
+
+//    static {
+//        Runtime.getRuntime().addShutdownHook(new Thread(){
+//            @Override
+//            public void run() {
+//                if(_localCluster != null) {
+//                    LOG.info("Shutting down local storm cluster instance");
+//                    _localCluster.shutdown();
+//                }
+//            }
+//        });
+//    }
+
+    private static LocalCluster getLocalCluster(){
+        if(_localCluster == null){
+            _localCluster = new LocalCluster();
+        }
+        return _localCluster;
+    }
+
+    @Override
+    public void prepare(StormEnvironment environment) {
+        this.environment = environment;
+    }
+
+    @Override
+    public StormEnvironment environment() {
+        return this.environment;
+    }
+
+    private final static String STORM_NIMBUS_HOST_CONF_PATH = "application.storm.nimbusHost";
+    private final static String STORM_NIMBUS_HOST_DEFAULT = "localhost";
+    private final static Integer STORM_NIMBUS_THRIFT_DEFAULT = 6627;
+    private final static String STORM_NIMBUS_THRIFT_CONF_PATH = "application.storm.nimbusThriftPort";
+
+    public backtype.storm.Config getStormConfig(){
+        backtype.storm.Config conf = new backtype.storm.Config();
+        conf.put(RichSpoutBatchExecutor.MAX_BATCH_SIZE_CONF, Int.box(64 * 1024));
+        conf.put(backtype.storm.Config.TOPOLOGY_RECEIVER_BUFFER_SIZE, Int.box(8));
+        conf.put(backtype.storm.Config.TOPOLOGY_TRANSFER_BUFFER_SIZE, Int.box(32));
+        conf.put(backtype.storm.Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, Int.box(16384));
+        conf.put(backtype.storm.Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, Int.box(16384));
+        conf.put(backtype.storm.Config.NIMBUS_THRIFT_MAX_BUFFER_SIZE, Int.box(20480000));
+        String nimbusHost = STORM_NIMBUS_HOST_DEFAULT;
+        if(environment.config().hasPath(STORM_NIMBUS_HOST_CONF_PATH)) {
+            nimbusHost = environment.config().getString(STORM_NIMBUS_HOST_CONF_PATH);
+            LOG.info("Overriding {} = {}",STORM_NIMBUS_HOST_CONF_PATH,nimbusHost);
+        } else {
+            LOG.info("Using default {} = {}",STORM_NIMBUS_HOST_CONF_PATH,STORM_NIMBUS_HOST_DEFAULT);
+        }
+        Integer nimbusThriftPort =  STORM_NIMBUS_THRIFT_DEFAULT;
+        if(environment.config().hasPath(STORM_NIMBUS_THRIFT_CONF_PATH)) {
+            nimbusThriftPort = environment.config().getInt(STORM_NIMBUS_THRIFT_CONF_PATH);
+            LOG.info("Overriding {} = {}",STORM_NIMBUS_THRIFT_CONF_PATH,nimbusThriftPort);
+        } else {
+            LOG.info("Using default {} = {}",STORM_NIMBUS_THRIFT_CONF_PATH,STORM_NIMBUS_THRIFT_DEFAULT);
+        }
+        conf.put(backtype.storm.Config.NIMBUS_HOST, nimbusHost);
+        conf.put(backtype.storm.Config.NIMBUS_THRIFT_PORT, nimbusThriftPort);
+        return conf;
+    }
+
+    @Override
+    public <Conf extends Configuration> void start(Application<Conf, StormEnvironment, StormTopology> executor, com.typesafe.config.Config config){
+        String topologyName = config.getString("appId");
+        Preconditions.checkNotNull(topologyName,"[appId] is required by null for "+executor.getClass().getCanonicalName());
+        StormTopology topology = executor.execute(config, environment);
+        LOG.info("Starting {} ({})",topologyName,executor.getClass().getCanonicalName());
+        Config conf = getStormConfig();
+        if(config.getString("mode") == ApplicationEntity.Mode.CLUSTER.name()){
+//            if(config.getString("jarPath") == null) config.setJarPath(DynamicJarPathFinder.findPath(executor.getClass()));
+            String jarFile = config.getString("jarPath");
+            if(jarFile == null){
+                jarFile = DynamicJarPathFinder.findPath(executor.getClass());
+            }
+            synchronized (StormExecutionRuntime.class) {
+                System.setProperty("storm.jar", jarFile);
+                LOG.info("Submitting as cluster mode ...");
+                try {
+                    StormSubmitter.submitTopologyWithProgressBar(topologyName, conf, topology);
+                } catch (AlreadyAliveException | InvalidTopologyException e) {
+                    LOG.error(e.getMessage(), e);
+                    throw new RuntimeException(e.getMessage(),e);
+                } finally {
+                    System.clearProperty("storm.jar");
+                }
+            }
+        } else {
+            LOG.info("Submitting as local mode ...");
+            getLocalCluster().submitTopology(topologyName, conf, topology);
+            LOG.info("Submitted");
+        }
+    }
+
+    @Override
+    public <Conf extends Configuration> void stop(Application<Conf,StormEnvironment, StormTopology> executor, com.typesafe.config.Config config) {
+        String appId = config.getString("appId");
+        if(config.getString("mode") == ApplicationEntity.Mode.CLUSTER.name()){
+            Nimbus.Client stormClient = NimbusClient.getConfiguredClient(getStormConfig()).getClient();
+            try {
+                stormClient.killTopology(appId);
+            } catch (NotAliveException | TException e) {
+                LOG.error("Failed to kill topology named {}, due to: {}",appId,e.getMessage(),e.getCause());
+            }
+        } else {
+            KillOptions killOptions = new KillOptions();
+            killOptions.set_wait_secs(0);
+            getLocalCluster().killTopologyWithOpts(appId,killOptions);
+        }
+    }
+
+    @Override
+    public <Conf extends Configuration> void status(Application<Conf,StormEnvironment, StormTopology> executor, com.typesafe.config.Config config) {
+        // TODO: Not implemented yet!
+        throw new RuntimeException("TODO: Not implemented yet!");
+    }
+
+    public static class Provider implements ExecutionRuntimeProvider<StormEnvironment,StormTopology> {
+        @Override
+        public StormExecutionRuntime get() {
+            return new StormExecutionRuntime();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationContext.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationContext.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationContext.java
index 7a0de82..90137e5 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationContext.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationContext.java
@@ -18,6 +18,7 @@ package org.apache.eagle.app.service;
 
 import com.google.common.base.Preconditions;
 import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
 import org.apache.eagle.app.Application;
 import org.apache.eagle.app.ApplicationLifecycle;
 import org.apache.eagle.app.Configuration;
@@ -45,7 +46,7 @@ import java.util.stream.Collectors;
  * </ul>
  */
 public class ApplicationContext implements Serializable, ApplicationLifecycle {
-    private final Configuration config;
+    private final Config config;
     private final Application application;
     private final ExecutionRuntime runtime;
     private final ApplicationEntity metadata;
@@ -54,19 +55,17 @@ public class ApplicationContext implements Serializable, ApplicationLifecycle {
      * @param metadata ApplicationEntity
      * @param application Application
      */
-    public ApplicationContext(Application application, ApplicationEntity metadata, Config config){
+    public ApplicationContext(Application application, ApplicationEntity metadata, Config config1){
         Preconditions.checkNotNull(application,"Application is null");
         Preconditions.checkNotNull(metadata,"ApplicationEntity is null");
         this.application = application;
         this.metadata = metadata;
-        this.runtime = ExecutionRuntimeManager.getInstance().getRuntime(application.getEnvironmentType(),config);
+        this.runtime = ExecutionRuntimeManager.getInstance().getRuntime(application.getEnvironmentType(),config1);
         Map<String,Object> applicationConfig = metadata.getConfiguration();
         if(applicationConfig == null) {
             applicationConfig = Collections.emptyMap();
         }
-        this.config = ApplicationConfigHelper.convertFrom(applicationConfig,application.getConfigType());
-        this.config.setMode(metadata.getMode());
-        this.config.setAppId(metadata.getAppId());
+        this.config = ConfigFactory.parseMap(applicationConfig);
     }
 
     @Override
@@ -100,4 +99,4 @@ public class ApplicationContext implements Serializable, ApplicationLifecycle {
     public ApplicationEntity getMetadata() {
         return metadata;
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationManagementServiceImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationManagementServiceImpl.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationManagementServiceImpl.java
index 88c9f4d..c98e7cc 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationManagementServiceImpl.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationManagementServiceImpl.java
@@ -20,19 +20,26 @@ import com.google.common.base.Preconditions;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
 import org.apache.eagle.app.service.ApplicationContext;
 import org.apache.eagle.app.service.ApplicationOperations;
 import org.apache.eagle.app.service.ApplicationManagementService;
 import org.apache.eagle.app.service.ApplicationProviderService;
+import org.apache.eagle.app.spi.ApplicationProvider;
 import org.apache.eagle.metadata.exceptions.EntityNotFoundException;
 import org.apache.eagle.metadata.model.ApplicationDesc;
 import org.apache.eagle.metadata.model.ApplicationEntity;
+import org.apache.eagle.metadata.model.Property;
 import org.apache.eagle.metadata.model.SiteEntity;
 import org.apache.eagle.metadata.service.ApplicationEntityService;
 import org.apache.eagle.metadata.service.SiteEntityService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
 @Singleton
 public class ApplicationManagementServiceImpl implements ApplicationManagementService {
     private final SiteEntityService siteEntityService;
@@ -53,6 +60,7 @@ public class ApplicationManagementServiceImpl implements ApplicationManagementSe
         this.applicationEntityService = applicationEntityService;
     }
 
+    @Override
     public ApplicationEntity install(ApplicationOperations.InstallOperation operation) throws EntityNotFoundException {
         Preconditions.checkNotNull(operation.getSiteId(),"siteId is null");
         Preconditions.checkNotNull(operation.getAppType(),"appType is null");
@@ -63,16 +71,38 @@ public class ApplicationManagementServiceImpl implements ApplicationManagementSe
         ApplicationEntity applicationEntity = new ApplicationEntity();
         applicationEntity.setDescriptor(appDesc);
         applicationEntity.setSite(siteEntity);
-        applicationEntity.setConfiguration(operation.getConfiguration());
+
+        /**
+         *  calculate application config based on
+         *   1) default values in metadata.xml
+         *   2) user's config value
+         *   3) some metadata, for example siteId, mode, appId
+         */
+        Map<String, Object> appConfig = new HashMap<>();
+        ApplicationProvider provider = applicationProviderService.getApplicationProviderByType(operation.getAppType());
+        List<Property> propertyList = provider.getApplicationDesc().getConfiguration().getProperties();
+        for(Property p : propertyList){
+            appConfig.put(p.getName(), p.getValue());
+        }
+        if(operation.getConfiguration() != null) {
+            appConfig.putAll(operation.getConfiguration());
+        }
+        appConfig.put("siteId", operation.getSiteId());
+        appConfig.put("mode", operation.getMode().name());
+        appConfig.put("appId", operation.getAppType());
+
+        applicationEntity.setConfiguration(appConfig);
         applicationEntity.setMode(operation.getMode());
         ApplicationContext applicationContext = new ApplicationContext(
                 applicationProviderService.getApplicationProviderByType(applicationEntity.getDescriptor().getType()).getApplication(),
                 applicationEntity,config);
         applicationContext.onInstall();
         applicationEntityService.create(applicationEntity);
+
         return applicationEntity;
     }
 
+    @Override
     public ApplicationEntity uninstall(ApplicationOperations.UninstallOperation operation) {
         ApplicationEntity applicationEntity = applicationEntityService.getByUUIDOrAppId(operation.getUuid(),operation.getAppId());
         ApplicationContext applicationContext = new ApplicationContext(
@@ -88,6 +118,7 @@ public class ApplicationManagementServiceImpl implements ApplicationManagementSe
         return applicationEntityService.delete(applicationEntity);
     }
 
+    @Override
     public ApplicationEntity start(ApplicationOperations.StartOperation operation) {
         ApplicationEntity applicationEntity = applicationEntityService.getByUUIDOrAppId(operation.getUuid(),operation.getAppId());
         ApplicationContext applicationContext = new ApplicationContext(
@@ -97,6 +128,7 @@ public class ApplicationManagementServiceImpl implements ApplicationManagementSe
         return applicationEntity;
     }
 
+    @Override
     public ApplicationEntity stop(ApplicationOperations.StopOperation operation) {
         ApplicationEntity applicationEntity = applicationEntityService.getByUUIDOrAppId(operation.getUuid(),operation.getAppId());
         ApplicationContext applicationContext = new ApplicationContext(
@@ -105,4 +137,4 @@ public class ApplicationManagementServiceImpl implements ApplicationManagementSe
         applicationContext.onStop();
         return applicationEntity;
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSink.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSink.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSink.java
index dda58d6..e4adfd2 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSink.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSink.java
@@ -17,32 +17,60 @@
 package org.apache.eagle.app.sink;
 
 import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.BasicOutputCollector;
+import backtype.storm.tuple.Tuple;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.typesafe.config.Config;
+import kafka.javaapi.producer.Producer;
+import kafka.producer.KeyedMessage;
+import kafka.producer.ProducerConfig;
 import org.apache.eagle.alert.engine.model.StreamEvent;
-import org.apache.eagle.app.Configuration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Map;
+import java.util.Properties;
 
 public class KafkaStreamSink extends StormStreamSink<KafkaStreamSinkConfig> {
-    private final static Logger LOGGER = LoggerFactory.getLogger(KafkaStreamSink.class);
+    private final static Logger LOG = LoggerFactory.getLogger(KafkaStreamSink.class);
     private String topicId;
+    private Producer producer;
+    private KafkaStreamSinkConfig config;
 
     @Override
     public void init(String streamId, KafkaStreamSinkConfig config) {
         super.init(streamId, config);
         this.topicId = config.getTopicId();
+        this.config = config;
     }
 
     @Override
     public void prepare(Map stormConf, TopologyContext context) {
         super.prepare(stormConf, context);
-        // TODO: Create KafkaProducer
+        Properties properties = new Properties();
+        properties.put("metadata.broker.list", config.getBrokerList());
+        properties.put("serializer.class", config.getSerializerClass());
+        properties.put("key.serializer.class", config.getKeySerializerClass());
+        ProducerConfig producerConfig = new ProducerConfig(properties);
+        producer = new Producer(producerConfig);
     }
 
     @Override
     protected void onEvent(StreamEvent streamEvent) {
-        LOGGER.info("TODO: producing {} to '{}'",streamEvent,topicId);
+    }
+
+    @Override
+    public void execute(Tuple input, BasicOutputCollector collector) {
+        LOG.info("TODO: producing {} to '{}'", input, topicId);
+
+        try {
+            Map m = (Map) input.getValue(1);
+            String output = new ObjectMapper().writeValueAsString(m);
+            producer.send(new KeyedMessage(this.topicId, m.get("user"), output));
+        }catch(Exception ex){
+            LOG.error("", ex);
+            collector.reportError(ex);
+        }
     }
 
     @Override
@@ -51,11 +79,11 @@ public class KafkaStreamSink extends StormStreamSink<KafkaStreamSinkConfig> {
     }
 
     private void ensureTopicCreated(){
-        LOGGER.info("TODO: ensure kafka topic {} created",this.topicId);
+        LOG.info("TODO: ensure kafka topic {} created",this.topicId);
     }
 
     private void ensureTopicDeleted(){
-        LOGGER.info("TODO: ensure kafka topic {} deleted",this.topicId);
+        LOG.info("TODO: ensure kafka topic {} deleted",this.topicId);
     }
 
     @Override
@@ -65,12 +93,12 @@ public class KafkaStreamSink extends StormStreamSink<KafkaStreamSinkConfig> {
 
     public static class Provider implements StreamSinkProvider<KafkaStreamSink,KafkaStreamSinkConfig> {
         @Override
-        public KafkaStreamSinkConfig getSinkConfig(String streamId, Configuration appConfig) {
-            String topicId = String.format("EAGLE.%s.%s",
-                    appConfig.getSiteId(),
-                    streamId).toLowerCase();
+        public KafkaStreamSinkConfig getSinkConfig(String streamId, Config config) {
             KafkaStreamSinkConfig desc = new KafkaStreamSinkConfig();
-            desc.setTopicId(topicId);
+            desc.setTopicId(config.getString("dataSinkConfig.topic"));
+            desc.setBrokerList(config.getString("dataSinkConfig.brokerList"));
+            desc.setSerializerClass(config.getString("dataSinkConfig.serializerClass"));
+            desc.setKeySerializerClass(config.getString("dataSinkConfig.keySerializerClass"));
             return desc;
         }
 
@@ -79,4 +107,4 @@ public class KafkaStreamSink extends StormStreamSink<KafkaStreamSinkConfig> {
             return new KafkaStreamSink();
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSinkConfig.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSinkConfig.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSinkConfig.java
index 17a3aa8..9d6a0ab 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSinkConfig.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSinkConfig.java
@@ -20,6 +20,9 @@ import org.apache.eagle.metadata.model.StreamSinkConfig;
 
 public class KafkaStreamSinkConfig implements StreamSinkConfig {
     private String topicId;
+    private String brokerList;
+    private String serializerClass;
+    private String keySerializerClass;
 
     public String getTopicId() {
         return topicId;
@@ -29,6 +32,30 @@ public class KafkaStreamSinkConfig implements StreamSinkConfig {
         this.topicId = topicId;
     }
 
+    public String getBrokerList() {
+        return brokerList;
+    }
+
+    public void setBrokerList(String brokerList) {
+        this.brokerList = brokerList;
+    }
+
+    public String getSerializerClass() {
+        return serializerClass;
+    }
+
+    public void setSerializerClass(String serializerClass) {
+        this.serializerClass = serializerClass;
+    }
+
+    public String getKeySerializerClass() {
+        return keySerializerClass;
+    }
+
+    public void setKeySerializerClass(String keySerializerClass) {
+        this.keySerializerClass = keySerializerClass;
+    }
+
     @Override
     public String getType() {
         return "KAFKA";
@@ -43,4 +70,4 @@ public class KafkaStreamSinkConfig implements StreamSinkConfig {
     public Class<? extends StreamSinkConfig> getConfigType() {
         return KafkaStreamSinkConfig.class;
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/LoggingStreamSink.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/LoggingStreamSink.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/LoggingStreamSink.java
index 3efd811..0d835d6 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/LoggingStreamSink.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/LoggingStreamSink.java
@@ -16,6 +16,7 @@
  */
 package org.apache.eagle.app.sink;
 
+import com.typesafe.config.Config;
 import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
 import org.apache.eagle.alert.engine.model.StreamEvent;
 import org.apache.eagle.app.Configuration;
@@ -43,7 +44,7 @@ public class LoggingStreamSink extends StormStreamSink<DefaultStreamSinkConfig>
 
     public static class Provider implements StreamSinkProvider<LoggingStreamSink,DefaultStreamSinkConfig> {
         @Override
-        public DefaultStreamSinkConfig getSinkConfig(String streamId, Configuration appConfig) {
+        public DefaultStreamSinkConfig getSinkConfig(String streamId, Config config) {
             return new DefaultStreamSinkConfig(LoggingStreamSink.class);
         }
 
@@ -52,4 +53,4 @@ public class LoggingStreamSink extends StormStreamSink<DefaultStreamSinkConfig>
             return new LoggingStreamSink();
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/660bfbd3/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/StormStreamSink.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/StormStreamSink.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/StormStreamSink.java
index 1b35a99..6765443 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/StormStreamSink.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/StormStreamSink.java
@@ -85,4 +85,4 @@ public abstract class StormStreamSink<K extends StreamSinkConfig> extends BaseBa
     public String getStreamId() {
         return streamId;
     }
-}
\ No newline at end of file
+}