You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by mw...@apache.org on 2016/07/25 09:36:39 UTC

[02/47] incubator-eagle git commit: EAGLE-271 Topology management in remote/local mode including start/stop operations

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ecf75b28/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/criteria/impl/QueryCriteriaBuilder.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/criteria/impl/QueryCriteriaBuilder.java b/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/criteria/impl/QueryCriteriaBuilder.java
index 9687cad..8975051 100644
--- a/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/criteria/impl/QueryCriteriaBuilder.java
+++ b/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/criteria/impl/QueryCriteriaBuilder.java
@@ -107,7 +107,7 @@ public class QueryCriteriaBuilder implements CriteriaBuilder {
         root.addFrom(this.tableName);
 
         // WHERE timestamp in time range
-        Criterion where = new Criterion(new ColumnImpl(this.tableName, JdbcConstants.TIMESTAMP_COLUMN_NAME),query.getStartTime(), SqlEnum.GREATER_EQUAL)
+        Criterion where = new Criterion(new ColumnImpl(this.tableName, JdbcConstants.TIMESTAMP_COLUMN_NAME), query.getStartTime(), SqlEnum.GREATER_EQUAL)
                         .and(new Criterion(new ColumnImpl(this.tableName, JdbcConstants.TIMESTAMP_COLUMN_NAME),query.getEndTime(), SqlEnum.LESS_THAN));
         ORExpression expression = searchCondition.getQueryExpression();
         if(expression!=null){

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ecf75b28/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/schema/JdbcEntityDefinitionManager.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/schema/JdbcEntityDefinitionManager.java b/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/schema/JdbcEntityDefinitionManager.java
index 15810b2..b98d881 100644
--- a/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/schema/JdbcEntityDefinitionManager.java
+++ b/eagle-core/eagle-query/eagle-storage-jdbc/src/main/java/org/apache/eagle/storage/jdbc/schema/JdbcEntityDefinitionManager.java
@@ -147,7 +147,7 @@ public class JdbcEntityDefinitionManager {
     }
 
     //================================================
-    // Intially bind basic java types with SQL types
+    // Initially bind basic java types with SQL types
     //================================================
     static {
         registerJdbcType(String.class, Types.LONGVARCHAR);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ecf75b28/eagle-core/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-core/pom.xml b/eagle-core/pom.xml
index 7debe69..a236739 100644
--- a/eagle-core/pom.xml
+++ b/eagle-core/pom.xml
@@ -42,5 +42,6 @@
    		<module>eagle-machinelearning</module>
         <module>eagle-embed</module>
         <module>eagle-metric</module>
+        <module>eagle-application-management</module>
     </modules>
 </project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ecf75b28/eagle-docs/images/appManager/admin-page.png
----------------------------------------------------------------------
diff --git a/eagle-docs/images/appManager/admin-page.png b/eagle-docs/images/appManager/admin-page.png
new file mode 100644
index 0000000..961487c
Binary files /dev/null and b/eagle-docs/images/appManager/admin-page.png differ

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ecf75b28/eagle-docs/images/appManager/start-topology-1.png
----------------------------------------------------------------------
diff --git a/eagle-docs/images/appManager/start-topology-1.png b/eagle-docs/images/appManager/start-topology-1.png
new file mode 100644
index 0000000..29274a0
Binary files /dev/null and b/eagle-docs/images/appManager/start-topology-1.png differ

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ecf75b28/eagle-docs/images/appManager/start-topology-2.png
----------------------------------------------------------------------
diff --git a/eagle-docs/images/appManager/start-topology-2.png b/eagle-docs/images/appManager/start-topology-2.png
new file mode 100644
index 0000000..440aec1
Binary files /dev/null and b/eagle-docs/images/appManager/start-topology-2.png differ

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ecf75b28/eagle-docs/images/appManager/stop-topology-1.png
----------------------------------------------------------------------
diff --git a/eagle-docs/images/appManager/stop-topology-1.png b/eagle-docs/images/appManager/stop-topology-1.png
new file mode 100644
index 0000000..4b28192
Binary files /dev/null and b/eagle-docs/images/appManager/stop-topology-1.png differ

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ecf75b28/eagle-docs/images/appManager/stop-topology-2.png
----------------------------------------------------------------------
diff --git a/eagle-docs/images/appManager/stop-topology-2.png b/eagle-docs/images/appManager/stop-topology-2.png
new file mode 100644
index 0000000..88f9f6d
Binary files /dev/null and b/eagle-docs/images/appManager/stop-topology-2.png differ

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ecf75b28/eagle-docs/images/appManager/stop-topology-3.png
----------------------------------------------------------------------
diff --git a/eagle-docs/images/appManager/stop-topology-3.png b/eagle-docs/images/appManager/stop-topology-3.png
new file mode 100644
index 0000000..3f593b2
Binary files /dev/null and b/eagle-docs/images/appManager/stop-topology-3.png differ

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ecf75b28/eagle-docs/images/appManager/topology-configuration-1.png
----------------------------------------------------------------------
diff --git a/eagle-docs/images/appManager/topology-configuration-1.png b/eagle-docs/images/appManager/topology-configuration-1.png
new file mode 100644
index 0000000..fae6b84
Binary files /dev/null and b/eagle-docs/images/appManager/topology-configuration-1.png differ

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ecf75b28/eagle-docs/images/appManager/topology-configuration-2.png
----------------------------------------------------------------------
diff --git a/eagle-docs/images/appManager/topology-configuration-2.png b/eagle-docs/images/appManager/topology-configuration-2.png
new file mode 100644
index 0000000..1d333a5
Binary files /dev/null and b/eagle-docs/images/appManager/topology-configuration-2.png differ

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ecf75b28/eagle-docs/images/appManager/topology-configuration-save.png
----------------------------------------------------------------------
diff --git a/eagle-docs/images/appManager/topology-configuration-save.png b/eagle-docs/images/appManager/topology-configuration-save.png
new file mode 100644
index 0000000..45ca922
Binary files /dev/null and b/eagle-docs/images/appManager/topology-configuration-save.png differ

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ecf75b28/eagle-docs/images/appManager/topology-description.png
----------------------------------------------------------------------
diff --git a/eagle-docs/images/appManager/topology-description.png b/eagle-docs/images/appManager/topology-description.png
new file mode 100644
index 0000000..57ac504
Binary files /dev/null and b/eagle-docs/images/appManager/topology-description.png differ

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ecf75b28/eagle-docs/images/appManager/topology-execution.png
----------------------------------------------------------------------
diff --git a/eagle-docs/images/appManager/topology-execution.png b/eagle-docs/images/appManager/topology-execution.png
new file mode 100644
index 0000000..6b20bdc
Binary files /dev/null and b/eagle-docs/images/appManager/topology-execution.png differ

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ecf75b28/eagle-docs/images/appManager/topology-monitor.png
----------------------------------------------------------------------
diff --git a/eagle-docs/images/appManager/topology-monitor.png b/eagle-docs/images/appManager/topology-monitor.png
new file mode 100644
index 0000000..bd007be
Binary files /dev/null and b/eagle-docs/images/appManager/topology-monitor.png differ

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ecf75b28/eagle-docs/tutorial/application_manager_tutorial.md
----------------------------------------------------------------------
diff --git a/eagle-docs/tutorial/application_manager_tutorial.md b/eagle-docs/tutorial/application_manager_tutorial.md
new file mode 100644
index 0000000..5083111
--- /dev/null
+++ b/eagle-docs/tutorial/application_manager_tutorial.md
@@ -0,0 +1,109 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+
+> Application manager aims to manage topology status on EAGLE UI. Users can easily start/start topologies remotely or locally without shell commands.
+
+This tutorial will go through all parts of appliaction manager and then give an example to use it. 
+
+### Design
+Briefly speaking, Application manager consists of a deamon scheduler and the application manager. The scheduler loads user operations(start/stop), and the applicaiton manager is responsible to execute these operations. For more details, please refer to [here](https://cwiki.apache.org/confluence/display/EAG/Application+Management)
+
+### Manual
+
+#### Step 1: configure the scheduler
+The configuration file `eagle-scheduler.conf` defines scheduler parameters, topology execution platform settings and parts of topology settings. Here are some important ones:
+
+* `envContextConfig.env`
+
+   application execution platform. Default value is storm
+   
+* `envContextConfig.url`
+   
+   execution platform http url. Default is "http://sandbox.hortonworks.com:8744"
+   
+* `envContextConfig.nimbusHost`
+  
+   Storm nimbus host. Default is "sandbox.hortonworks.com"
+   
+* `envContextConfig.nimbusThriftPort`
+   
+   Default is 6627
+   
+* `envContextConfig.jarFile`
+
+   Storm fat jar path. The only setting users must specify "/dir-to-jar/eagle-topology-0.3.0-incubating-assembly.jar"
+   
+After the configuration is ready, start Eagle service `bin/eagle-service.sh start`. 
+  
+#### Step 2: add topologies on UI
+1. First of all, go to admin page 
+   ![admin page](/images/appManager/admin-page.png)
+   ![admin page](/images/appManager/topology-monitor.png)
+    
+2. Go to management page, and create a topology description. There are three required fields
+    * name: topology name
+    * type: topology type [CLASS, DYNAMIC]
+    * execution entry: either the class which implement interface TopologyExecutable or eagle [DSL](https://github.com/apache/incubator-eagle/blob/master/eagle-assembly/src/main/conf/sandbox-hadoopjmx-pipeline.conf) based topology definition
+   ![admin page](/images/appManager/topology-description.png)
+   
+3. Back to monitoring page, and choose the site/application to deploy the topology 
+   ![admin page](/images/appManager/topology-execution.png)
+   
+4. Go to site page, and edit site->application and add some new configurations. Blow are some example configurations for [site=sandbox, applicatoin=hbaseSecurityLog]
+   `These configurations have a higher priority than those in eagle-scheduler.conf`
+   
+           classification.hbase.zookeeper.property.clientPort=2181
+           classification.hbase.zookeeper.quorum=sandbox.hortonworks.com
+           # platform related configurations
+           app.envContextConfig.env=storm
+           app.envContextConfig.mode=cluster
+           # data source related configurations
+           app.dataSourceConfig.topic=sandbox_hbase_security_log
+           app.dataSourceConfig.zkConnection=sandbox.hortonworks.com:2181
+           app.dataSourceConfig.zkConnectionTimeoutMS=15000
+           app.dataSourceConfig.brokerZkPath=/brokers
+           app.dataSourceConfig.fetchSize=1048586
+           app.dataSourceConfig.transactionZKServers=sandbox.hortonworks.com
+           app.dataSourceConfig.transactionZKPort=2181
+           app.dataSourceConfig.transactionZKRoot=/consumers
+           app.dataSourceConfig.consumerGroupId=eagle.hbasesecurity.consumer
+           app.dataSourceConfig.transactionStateUpdateMS=2000
+           app.dataSourceConfig.deserializerClass=org.apache.eagle.security.hbase.parse.HbaseAuditLogKafkaDeserializer
+           # service related configurations
+           app.eagleProps.site=sandbox
+           app.eagleProps.application=hbaseSecurityLog
+           app.eagleProps.dataJoinPollIntervalSec=30
+           app.eagleProps.mailHost=atom.corp.ebay.com
+           app.eagleProps.mailSmtpPort=25
+           app.eagleProps.mailDebug=true
+           app.eagleProps.eagleService.host=localhost
+           app.eagleProps.eagleService.port=9099
+           app.eagleProps.eagleService.username=admin
+           app.eagleProps.eagleService.password=secret
+   ![admin page](/images/appManager/topology-configuration-1.png)
+   ![admin page](/images/appManager/topology-configuration-2.png)
+   
+5. Go to monitoring page, and start topologies
+   ![admin page](/images/appManager/start-topology-1.png)
+   ![admin page](/images/appManager/start-topology-2.png)
+   
+6. stop topologies on monitoring page
+   ![admin page](/images/appManager/stop-topology-1.png)
+   ![admin page](/images/appManager/stop-topology-2.png)
+   ![admin page](/images/appManager/stop-topology-3.png)
+
+ 
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ecf75b28/eagle-docs/tutorial/getting_started_with_eagle.md
----------------------------------------------------------------------
diff --git a/eagle-docs/tutorial/getting_started_with_eagle.md b/eagle-docs/tutorial/getting_started_with_eagle.md
index 37e0741..0e83a01 100644
--- a/eagle-docs/tutorial/getting_started_with_eagle.md
+++ b/eagle-docs/tutorial/getting_started_with_eagle.md
@@ -1,5 +1,4 @@
 <!--
-{% comment %}
 Licensed to the Apache Software Foundation (ASF) under one or more
 contributor license agreements.  See the NOTICE file distributed with
 this work for additional information regarding copyright ownership.
@@ -14,5 +13,4 @@ distributed under the License is distributed on an "AS IS" BASIS,
 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 See the License for the specific language governing permissions and
 limitations under the License.
-{% endcomment %}
 -->
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ecf75b28/eagle-hadoop-metric/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-hadoop-metric/pom.xml b/eagle-hadoop-metric/pom.xml
index fda911f..24f7a3b 100644
--- a/eagle-hadoop-metric/pom.xml
+++ b/eagle-hadoop-metric/pom.xml
@@ -32,6 +32,11 @@
             <artifactId>eagle-stream-process-api</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.eagle</groupId>
+            <artifactId>eagle-stream-application-manager</artifactId>
+            <version>${project.version}</version>
+        </dependency>
     </dependencies>
 
     <build>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ecf75b28/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/HadoopJmxMetricMonitoringTopology.java
----------------------------------------------------------------------
diff --git a/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/HadoopJmxMetricMonitoringTopology.java b/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/HadoopJmxMetricMonitoringTopology.java
new file mode 100644
index 0000000..044a48f
--- /dev/null
+++ b/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/HadoopJmxMetricMonitoringTopology.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.eagle.hadoop.metric;
+
+
+import com.typesafe.config.Config;
+import org.apache.eagle.datastream.ExecutionEnvironments;
+import org.apache.eagle.datastream.core.StreamProducer;
+import org.apache.eagle.datastream.storm.StormExecutionEnvironment;
+import org.apache.eagle.stream.application.TopologyExecutable;
+
+public class HadoopJmxMetricMonitoringTopology implements TopologyExecutable {
+    @Override
+    public void submit(String topology, Config config) {
+        StormExecutionEnvironment env = ExecutionEnvironments.getStorm(config);
+        String streamName = "hadoopJmxMetricEventStream";
+        StreamProducer sp = env.fromSpout(Utils.createProvider(env.getConfig())).withOutputFields(2).nameAs(streamName);
+        sp.alertWithConsumer(streamName, "hadoopJmxMetricAlertExecutor");
+        env.execute();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ecf75b28/eagle-hadoop-metric/src/main/resources/hadoop-metric-init.sh
----------------------------------------------------------------------
diff --git a/eagle-hadoop-metric/src/main/resources/hadoop-metric-init.sh b/eagle-hadoop-metric/src/main/resources/hadoop-metric-init.sh
index dbcb9b8..849f462 100755
--- a/eagle-hadoop-metric/src/main/resources/hadoop-metric-init.sh
+++ b/eagle-hadoop-metric/src/main/resources/hadoop-metric-init.sh
@@ -34,7 +34,7 @@ curl -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:a
            "application":"hadoopJmxMetricDataSource"
         },
         "enabled": true,
-        "config": "{\"druid\": {\"coordinator\": \"coordinatorHost:port\", \"broker\": \"brokerHost:port\"}}"
+        "config": "web.druid.coordinator=coordinatorHost:port\nweb.druid.broker=brokerHost:port"
      }
   ]
   '

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ecf75b28/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/resolver/MetadataAccessConfigRepo.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/resolver/MetadataAccessConfigRepo.java b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/resolver/MetadataAccessConfigRepo.java
index fa178c9..1292fb8 100644
--- a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/resolver/MetadataAccessConfigRepo.java
+++ b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/resolver/MetadataAccessConfigRepo.java
@@ -19,9 +19,7 @@
 package org.apache.eagle.security.resolver;
 
 
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import com.typesafe.config.ConfigValue;
+import com.typesafe.config.*;
 import org.apache.eagle.alert.entity.SiteApplicationServiceEntity;
 import org.apache.eagle.common.config.EagleConfigConstants;
 import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
@@ -47,11 +45,14 @@ public class MetadataAccessConfigRepo {
         if (list == null || list.size() == 0)
             throw new Exception("Config is empty for site=" + siteId +" application=" + application + ".");
         String originalConfigStr = list.get(0).getConfig();
-        Config originalConfig = ConfigFactory.parseString(originalConfigStr);
-        if(!originalConfig.hasPath(EagleConfigConstants.WEB_CONFIG)) {
-            throw new Exception("Fail to get WEB_CONFIG configurations for data classification");
+        ConfigParseOptions options = ConfigParseOptions.defaults()
+                .setSyntax(ConfigSyntax.PROPERTIES)
+                .setAllowMissing(false);
+        Config originalConfig = ConfigFactory.parseString(originalConfigStr, options);
+        if(!originalConfig.hasPath(EagleConfigConstants.CLASSIFICATION_CONFIG)) {
+            throw new Exception("Fail to get CLASSIFICATION_CONFIG for data classification");
         }
-        return originalConfig.getConfig(EagleConfigConstants.WEB_CONFIG);
+        return originalConfig.getConfig(EagleConfigConstants.CLASSIFICATION_CONFIG);
     }
 
     public Configuration convert(Config originalConfig) throws Exception {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ecf75b28/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/util/AbstractResourceSensitivityPollingJob.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/util/AbstractResourceSensitivityPollingJob.java b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/util/AbstractResourceSensitivityPollingJob.java
index 2d4af7f..8cd1317 100644
--- a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/util/AbstractResourceSensitivityPollingJob.java
+++ b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/util/AbstractResourceSensitivityPollingJob.java
@@ -36,7 +36,7 @@ public class AbstractResourceSensitivityPollingJob {
     protected <R extends TaggedLogAPIEntity> List<R> load(JobDataMap jobDataMap, String service) throws Exception {
         Map<String, Object> map = (Map<String, Object>) jobDataMap.get(EagleConfigConstants.EAGLE_SERVICE);
         String eagleServiceHost = (String) map.get(EagleConfigConstants.HOST);
-        Integer eagleServicePort = (Integer) map.get(EagleConfigConstants.PORT);
+        Integer eagleServicePort = Integer.parseInt(map.get(EagleConfigConstants.PORT).toString());
         String username = map.containsKey(EagleConfigConstants.USERNAME) ? (String) map.get(EagleConfigConstants.USERNAME) : null;
         String password = map.containsKey(EagleConfigConstants.PASSWORD) ? (String) map.get(EagleConfigConstants.PASSWORD) : null;
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ecf75b28/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/util/ExternalDataJoiner.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/util/ExternalDataJoiner.java b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/util/ExternalDataJoiner.java
index 79a44c0..e5e4ae0 100644
--- a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/util/ExternalDataJoiner.java
+++ b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/util/ExternalDataJoiner.java
@@ -19,6 +19,7 @@ package org.apache.eagle.security.util;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.eagle.common.config.EagleConfigConstants;
 import org.quartz.Job;
 import org.quartz.JobBuilder;
 import org.quartz.JobDataMap;
@@ -77,19 +78,20 @@ public class ExternalDataJoiner {
 	
 	public void start(){
 		// for job
-		String group = String.format("%s.%s.%s", QUARTZ_GROUP_NAME, jobDataMap.getString("site"), jobDataMap.getString("dataSource"));
+		String group = String.format("%s.%s.%s", QUARTZ_GROUP_NAME, jobDataMap.getString(EagleConfigConstants.SITE), jobDataMap.getString(EagleConfigConstants.APPLICATION));
 		JobDetail job = JobBuilder.newJob(jobCls)
 		     .withIdentity(jobCls.getName() + ".job", group)
-		     .setJobData(jobDataMap)
+		     .usingJobData(jobDataMap)
 		     .build();
 		
 		// for trigger
 		Object interval = jobDataMap.get(DATA_JOIN_POLL_INTERVALSEC);
+        int dataJoinPollIntervalSec = (interval == null ? defaultIntervalSeconds : Integer.parseInt(interval.toString()));
 		Trigger trigger = TriggerBuilder.newTrigger() 
 			  .withIdentity(jobCls.getName() + ".trigger", QUARTZ_GROUP_NAME) 
 		      .startNow() 
 		      .withSchedule(SimpleScheduleBuilder.simpleSchedule() 
-		          .withIntervalInSeconds(interval == null ? defaultIntervalSeconds : ((Integer)interval).intValue()) 
+		          .withIntervalInSeconds(dataJoinPollIntervalSec)
 		          .repeatForever()) 
 		      .build(); 
 		try{

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ecf75b28/eagle-security/eagle-security-common/src/test/java/org/apache/eagle/security/crawler/audit/TestMetaDataAccessConfigRepo.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-common/src/test/java/org/apache/eagle/security/crawler/audit/TestMetaDataAccessConfigRepo.java b/eagle-security/eagle-security-common/src/test/java/org/apache/eagle/security/crawler/audit/TestMetaDataAccessConfigRepo.java
index 146e757..3306c40 100644
--- a/eagle-security/eagle-security-common/src/test/java/org/apache/eagle/security/crawler/audit/TestMetaDataAccessConfigRepo.java
+++ b/eagle-security/eagle-security-common/src/test/java/org/apache/eagle/security/crawler/audit/TestMetaDataAccessConfigRepo.java
@@ -19,9 +19,7 @@
 package org.apache.eagle.security.crawler.audit;
 
 
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import com.typesafe.config.ConfigValue;
+import com.typesafe.config.*;
 import junit.framework.Assert;
 import org.apache.eagle.common.config.EagleConfigConstants;
 import org.junit.Test;
@@ -32,24 +30,35 @@ public class TestMetaDataAccessConfigRepo {
 
     @Test
     public void testStringToConfig() {
-        String hdfsConfigStr = "web.fs.defaultFS: \"hdfs://sandbox.hortonworks.com:8020\"";
-        Config config = ConfigFactory.parseString(hdfsConfigStr);
-        Assert.assertTrue(config.hasPath(EagleConfigConstants.WEB_CONFIG));
+        String hdfsConfigStr = "classification.fs.defaultFS=hdfs://sandbox.hortonworks.com:8020";
+        ConfigParseOptions options = ConfigParseOptions.defaults()
+                .setSyntax(ConfigSyntax.PROPERTIES)
+                .setAllowMissing(false);
+        Config config = ConfigFactory.parseString(hdfsConfigStr, options);
+        Assert.assertTrue(config.hasPath(EagleConfigConstants.CLASSIFICATION_CONFIG));
 
-        String hiveConfigStr = "web.accessType:\"metastoredb_jdbc\",web.password:\"hive\",web.user:\"hive\",web.jdbcDriverClassName:\"com.mysql.jdbc.Driver\",web.jdbcUrl:\"jdbc:mysql://sandbox.hortonworks.com/hive?createDatabaseIfNotExist=true\"";
-        config = ConfigFactory.parseString(hiveConfigStr);
+        String hiveConfigStr = "classification.accessType=metastoredb_jdbc\nclassification.password=hive\nclassification.user=hive\nclassification.jdbcDriverClassName=com.mysql.jdbc.Driver\nclassification.jdbcUrl=jdbc:mysql://sandbox.hortonworks.com/hive?createDatabaseIfNotExist=true";
+        config = ConfigFactory.parseString(hiveConfigStr, options);
         Config hiveConfig = null;
-        if(config.hasPath(EagleConfigConstants.WEB_CONFIG)) {
-            hiveConfig = config.getConfig(EagleConfigConstants.WEB_CONFIG);
+        if(config.hasPath(EagleConfigConstants.CLASSIFICATION_CONFIG)) {
+            hiveConfig = config.getConfig(EagleConfigConstants.CLASSIFICATION_CONFIG);
             Assert.assertTrue(hiveConfig.getString("accessType").equals("metastoredb_jdbc"));
         }
 
-        String hbaseConfigStr = "web.hbase.zookeeper.property.clientPort: \"2181\", web.hbase.zookeeper.quorum: \"localhost\"";
-        config = ConfigFactory.parseString(hbaseConfigStr);
+        String hbaseConfigStr = "classification.hbase.zookeeper.property.clientPort=2181\nclassification.hbase.zookeeper.quorum=localhost";
+        config = ConfigFactory.parseString(hbaseConfigStr, options);
         Config hbaseConfig = null;
-        if(config.hasPath(EagleConfigConstants.WEB_CONFIG)) {
-            hbaseConfig = config.getConfig(EagleConfigConstants.WEB_CONFIG);
+        if(config.hasPath(EagleConfigConstants.CLASSIFICATION_CONFIG)) {
+            hbaseConfig = config.getConfig(EagleConfigConstants.CLASSIFICATION_CONFIG);
             Assert.assertTrue(hbaseConfig.getString("hbase.zookeeper.property.clientPort").equals("2181"));
         }
+
+        String appConfigStr = "classification.hbase.zookeeper.property.clientPort=2181\nclassification.hbase.zookeeper.quorum=sandbox.hortonworks.com\n\napp.envContextConfig.env=storm\napp.envContextConfig.mode=cluster\napp.dataSourceConfig.topic=sandbox_hbase_security_log\napp.dataSourceConfig.zkConnection=127.0.0.1:2181\napp.dataSourceConfig.zkConnectionTimeoutMS=15000\napp.dataSourceConfig.brokerZkPath=/brokers\napp.dataSourceConfig.fetchSize=1048586\napp.dataSourceConfig.transactionZKServers=127.0.0.1\napp.dataSourceConfig.transactionZKPort=2181\napp.dataSourceConfig.transactionZKRoot=/consumers\napp.dataSourceConfig.consumerGroupId=eagle.hbasesecurity.consumer\napp.dataSourceConfig.transactionStateUpdateMS=2000\napp.dataSourceConfig.deserializerClass=org.apache.eagle.security.hbase.parse.HbaseAuditLogKafkaDeserializer\napp.eagleProps.site=sandbox\napp.eagleProps.application=hbaseSecurityLog\napp.eagleProps.dataJoinPollIntervalSec=30\napp.eagleProps.mailHost=mailHost.com\napp.ea
 gleProps.mailSmtpPort=25\napp.eagleProps.mailDebug=true\napp.eagleProps.eagleService.host=localhost\napp.eagleProps.eagleService.port=9099\napp.eagleProps.eagleService.username=admin\napp.eagleProps.eagleService.password=secret";
+        config = ConfigFactory.parseString(appConfigStr, options);
+        Config appConfig = null;
+        if(config.hasPath(EagleConfigConstants.APP_CONFIG)) {
+            appConfig = config.getConfig(EagleConfigConstants.APP_CONFIG);
+            Assert.assertTrue(appConfig.getString("envContextConfig.mode").equals("cluster"));
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ecf75b28/eagle-security/eagle-security-hbase-securitylog/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hbase-securitylog/pom.xml b/eagle-security/eagle-security-hbase-securitylog/pom.xml
index 1f4bfc8..9e8fd72 100644
--- a/eagle-security/eagle-security-hbase-securitylog/pom.xml
+++ b/eagle-security/eagle-security-hbase-securitylog/pom.xml
@@ -36,5 +36,10 @@
             <artifactId>eagle-security-common</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.eagle</groupId>
+            <artifactId>eagle-stream-application-manager</artifactId>
+            <version>${project.version}</version>
+        </dependency>
     </dependencies>
 </project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ecf75b28/eagle-security/eagle-security-hbase-securitylog/src/main/java/org/apache/eagle/security/hbase/HbaseAuditLogMonitoringTopology.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hbase-securitylog/src/main/java/org/apache/eagle/security/hbase/HbaseAuditLogMonitoringTopology.java b/eagle-security/eagle-security-hbase-securitylog/src/main/java/org/apache/eagle/security/hbase/HbaseAuditLogMonitoringTopology.java
new file mode 100644
index 0000000..0077912
--- /dev/null
+++ b/eagle-security/eagle-security-hbase-securitylog/src/main/java/org/apache/eagle/security/hbase/HbaseAuditLogMonitoringTopology.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.eagle.security.hbase;
+
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSourcedSpoutProvider;
+import org.apache.eagle.datastream.ExecutionEnvironments;
+import org.apache.eagle.datastream.storm.StormExecutionEnvironment;
+import org.apache.eagle.security.hbase.sensitivity.HbaseResourceSensitivityDataJoinExecutor;
+import org.apache.eagle.stream.application.TopologyExecutable;
+
+
+public class HbaseAuditLogMonitoringTopology implements TopologyExecutable {
+    @Override
+    public void submit(String application, Config config) {
+        //Config baseConfig = ConfigFactory.load();
+        //config = (config != null) ? config.withFallback(baseConfig): baseConfig;
+        StormExecutionEnvironment env = ExecutionEnvironments.getStorm(config);
+        env.fromSpout(new KafkaSourcedSpoutProvider()).withOutputFields(1).nameAs("kafkaMsgConsumer")
+                .flatMap(new HbaseResourceSensitivityDataJoinExecutor())
+                .alertWithConsumer("hbaseSecurityLogEventStream", "hbaseSecurityLogAlertExecutor");
+        env.execute();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ecf75b28/eagle-security/eagle-security-hbase-securitylog/src/main/java/org/apache/eagle/security/hbase/parse/HbaseAuditLogKafkaDeserializer.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hbase-securitylog/src/main/java/org/apache/eagle/security/hbase/parse/HbaseAuditLogKafkaDeserializer.java b/eagle-security/eagle-security-hbase-securitylog/src/main/java/org/apache/eagle/security/hbase/parse/HbaseAuditLogKafkaDeserializer.java
index 6d6f5c4..ff98ed9 100644
--- a/eagle-security/eagle-security-hbase-securitylog/src/main/java/org/apache/eagle/security/hbase/parse/HbaseAuditLogKafkaDeserializer.java
+++ b/eagle-security/eagle-security-hbase-securitylog/src/main/java/org/apache/eagle/security/hbase/parse/HbaseAuditLogKafkaDeserializer.java
@@ -40,26 +40,23 @@ public class HbaseAuditLogKafkaDeserializer implements SpoutKafkaMessageDeserial
         String logLine = new String(arg0);
 
         HbaseAuditLogParser parser = new HbaseAuditLogParser();
-        HbaseAuditLogObject entity = null;
         try{
-            entity = parser.parse(logLine);
+            HbaseAuditLogObject entity = parser.parse(logLine);
+            if(entity == null) return null;
+
+            Map<String, Object> map = new TreeMap<String, Object>();
+            map.put("action", entity.action);
+            map.put("host", entity.host);
+            map.put("status", entity.status);
+            map.put("request", entity.request);
+            map.put("scope", entity.scope);
+            map.put("user", entity.user);
+            map.put("timestamp", entity.timestamp);
+            return map;
         }catch(Exception ex){
-            LOG.error("Failing parse audit log message", ex);
-        }
-        if(entity == null){
-            LOG.warn("Event ignored as it can't be correctly parsed, the log is ", logLine);
+            LOG.error("Failing parse audit log:" + logLine, ex);
             return null;
         }
-        Map<String, Object> map = new TreeMap<String, Object>();
-        map.put("action", entity.action);
-        map.put("host", entity.host);
-        map.put("status", entity.status);
-        map.put("request", entity.request);
-        map.put("scope", entity.scope);
-        map.put("user", entity.user);
-        map.put("timestamp", entity.timestamp);
-
-        return map;
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ecf75b28/eagle-security/eagle-security-hbase-securitylog/src/main/java/org/apache/eagle/security/hbase/parse/HbaseAuditLogParser.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hbase-securitylog/src/main/java/org/apache/eagle/security/hbase/parse/HbaseAuditLogParser.java b/eagle-security/eagle-security-hbase-securitylog/src/main/java/org/apache/eagle/security/hbase/parse/HbaseAuditLogParser.java
index 6fdb03f..0504ed0 100644
--- a/eagle-security/eagle-security-hbase-securitylog/src/main/java/org/apache/eagle/security/hbase/parse/HbaseAuditLogParser.java
+++ b/eagle-security/eagle-security-hbase-securitylog/src/main/java/org/apache/eagle/security/hbase/parse/HbaseAuditLogParser.java
@@ -18,6 +18,7 @@
 package org.apache.eagle.security.hbase.parse;
 
 import java.io.Serializable;
+import java.text.ParseException;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.regex.Matcher;
@@ -36,111 +37,72 @@ public class HbaseAuditLogParser implements Serializable {
     private final static int LOGDATE_INDEX = 1;
     private final static int LOGLEVEL_INDEX = 2;
     private final static int LOGATTRS_INDEX = 3;
-    private final static String LOGDATE="logdate";
-    private final static String LOGLEVEL="loglevel";
-    private final static String CONTROLLER = "SecurityLogger.org.apache.hadoop.hbase.security.access.AccessController";
-    private final static String REASON = "reason";
-    private final static String ADDRESS = "address";
-    private final static String REQUEST = "request";
     private final static String ALLOWED = "allowed";
     private final static String DENIED = "denied";
-    private final static String USER = "user";
-    private final static String SCOPE = "scope";
-    private final static String FAMILY = "family";
-    private final static String ACTION = "action";
     private final static Pattern loggerPattern = Pattern.compile("^([\\d\\s\\-:,]+)\\s+(\\w+)\\s+(.*)");
-    private final static Pattern loggerAttributesPattern = Pattern.compile("([\\w\\.]+:[/\\w\\.\\s\\\\]+);\\s+");
-    private final static Pattern loggerContextPattern = Pattern.compile("\\((.*)\\)");
+    private final static Pattern loggerContextPattern = Pattern.compile("\\w+:\\s*\\(user=(.*),\\s*scope=(.*),\\s*family=(.*),\\s*action=(.*)\\)");
     private final static Pattern allowedPattern = Pattern.compile(ALLOWED);
 
 
-    public HbaseAuditLogObject parse(String logLine) throws Exception {
-        HbaseAuditLogObject ret = new HbaseAuditLogObject();
-        Map<String, String> auditMap = parseAudit(logLine);
-        if(auditMap == null) return null;
-
-        String status = auditMap.get(CONTROLLER);
-        if(StringUtils.isNotEmpty(status)) {
-            ret.status = allowedPattern.matcher(status).find() ? ALLOWED : DENIED;
-        }
-
-        String scope = auditMap.get(SCOPE);
-        String family = auditMap.get(FAMILY);
-        if(StringUtils.isNotEmpty(family)) {
-            if(!scope.contains(":")) scope = "default:" + scope;
-            scope = String.format("%s:%s", scope, family);
-        }
-        String ip = auditMap.get(ADDRESS);
-        if(StringUtils.isNotEmpty(ip)) {
-            ret.host = ip.substring(1);
-        }
-        ret.scope = scope;
-        ret.action = auditMap.get(ACTION);
-        ret.user = LogParseUtil.parseUserFromUGI(auditMap.get(USER));
-        ret.request = auditMap.get(REQUEST);
-        ret.timestamp = DateTimeUtil.humanDateToMilliseconds(auditMap.get(LOGDATE));
-        return ret;
-    }
-
-    Map<String, String> parseContext(String logLine) {
-        Matcher loggerMatcher = loggerContextPattern.matcher(logLine);
-        Map<String, String> ret = new HashMap<>();
-        if(loggerMatcher.find()) {
-            String context = loggerMatcher.group(1);
-            String [] kvs = context.split(",");
-            for(String kv : kvs){
-                String [] vals = kv.split("=");
-                if(vals.length > 1) {
-                    ret.put(vals[0].trim(), vals[1].trim());
-                } else {
-                    ret.put(vals[0].trim(), "");
-                }
-            }
-        }
-        return ret;
-    }
-
-    Map<String, String> parseAttribute(String logLine) {
-        Map<String, String> ret = new HashMap<>();
-        Matcher loggerMatcher = loggerAttributesPattern.matcher(logLine);
-        while(loggerMatcher.find()) {
-            String kv = loggerMatcher.group(1);
-            String[] kvs = kv.split(":");
-            if(kvs.length > 1) {
-                ret.put(kvs[0].trim(), kvs[1].trim());
-            } else {
-                ret.put(kvs[0].trim(), "");
-            }
-        }
-        return ret;
-    }
+    public HbaseAuditLogObject parse(String logLine) {
+        if(logLine == null || logLine.isEmpty()) return null;
 
-    Map<String, String> parseAudit(String logLine) {
-        Map<String, String> ret = null;
+        HbaseAuditLogObject ret = new HbaseAuditLogObject();
+        String timestamp = "";
+        String user = "";
+        String scope = "";
+        String action = "";
+        String ip = "";
+        String request = "";
+        String family = "";
+        String context = "";
 
         Matcher loggerMatcher = loggerPattern.matcher(logLine);
         if(loggerMatcher.find()) {
             try {
-                ret = new HashMap<>();
-                ret.put(LOGDATE, loggerMatcher.group(LOGDATE_INDEX));
-                ret.put(LOGLEVEL, loggerMatcher.group(LOGLEVEL_INDEX));
-                String logAttr = loggerMatcher.group(LOGATTRS_INDEX);
-                Map<String, String> attrs = parseAttribute(logAttr);
-                ret.put(CONTROLLER, attrs.get(CONTROLLER));
-                ret.put(REASON, attrs.get(REASON));
-                ret.put(ADDRESS, attrs.get(ADDRESS));
-                ret.put(REQUEST, attrs.get(REQUEST));
-                Map<String, String> contextMap = parseContext(logAttr);
-                ret.put(USER, contextMap.get(USER));
-                ret.put(SCOPE, contextMap.get(SCOPE));
-                ret.put(FAMILY, contextMap.get(FAMILY));
-                ret.put(ACTION, contextMap.get(ACTION));
-            } catch(IndexOutOfBoundsException e) {
+                timestamp = loggerMatcher.group(LOGDATE_INDEX);
+                String [] attrs = loggerMatcher.group(LOGATTRS_INDEX).split(";");
+                ret.status = allowedPattern.matcher(attrs[0]).find() ? ALLOWED : DENIED;
+                try {
+                    ip = attrs[2].split(":")[1].trim();
+                } catch (Exception e) {
+                    ip = "";
+                }
+                try {
+                    request = attrs[3].split(":")[1].trim();
+                } catch (Exception e) {
+                    request = "";
+                }
+                try {
+                    context = attrs[4].trim();
+                } catch (Exception e) {
+                    context = "";
+                }
+                Matcher contextMatcher = loggerContextPattern.matcher(context);
+                if(contextMatcher.find()) {
+                    user = contextMatcher.group(1);
+                    scope = contextMatcher.group(2);
+                    family = contextMatcher.group(3);
+                    action = contextMatcher.group(4);
+                }
+                if(StringUtils.isNotEmpty(family)) {
+                    if(!scope.contains(":")) scope = "default:" + scope;
+                    scope = String.format("%s:%s", scope, family);
+                }
+                if(StringUtils.isNotEmpty(ip)) {
+                    ret.host = ip.substring(1);
+                }
+                ret.timestamp = DateTimeUtil.humanDateToMilliseconds(timestamp);
+                ret.scope = scope;
+                ret.action = action;
+                ret.user = LogParseUtil.parseUserFromUGI(user);
+                ret.request = request;
+                return ret;
+            } catch(Exception e) {
                 LOG.error("Got exception when parsing audit log:" + logLine + ", exception:" + e.getMessage(), e);
-                ret = null;
             }
         }
-        return ret;
+        return null;
     }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ecf75b28/eagle-security/eagle-security-hbase-securitylog/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hbase-securitylog/src/main/resources/log4j.properties b/eagle-security/eagle-security-hbase-securitylog/src/main/resources/log4j.properties
new file mode 100644
index 0000000..fb13ad5
--- /dev/null
+++ b/eagle-security/eagle-security-hbase-securitylog/src/main/resources/log4j.properties
@@ -0,0 +1,21 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+log4j.rootLogger=DEBUG, stdout
+
+# standard output
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ecf75b28/eagle-security/eagle-security-hbase-securitylog/src/test/java/org/apache/eagle/security/hbase/TestHbaseAuditLogProcessTopology.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hbase-securitylog/src/test/java/org/apache/eagle/security/hbase/TestHbaseAuditLogProcessTopology.java b/eagle-security/eagle-security-hbase-securitylog/src/test/java/org/apache/eagle/security/hbase/TestHbaseAuditLogProcessTopology.java
new file mode 100644
index 0000000..c1be351
--- /dev/null
+++ b/eagle-security/eagle-security-hbase-securitylog/src/test/java/org/apache/eagle/security/hbase/TestHbaseAuditLogProcessTopology.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.eagle.security.hbase;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigParseOptions;
+import com.typesafe.config.ConfigSyntax;
+import org.apache.eagle.common.config.EagleConfigConstants;
+import org.junit.Test;
+
+
+public class TestHbaseAuditLogProcessTopology {
+    @Test
+    public void test() throws Exception {
+        //Config baseConfig = ConfigFactory.load("eagle-scheduler.conf");
+        ConfigParseOptions options = ConfigParseOptions.defaults()
+                .setSyntax(ConfigSyntax.PROPERTIES)
+                .setAllowMissing(false);
+        String topoConfigStr = "web.hbase.zookeeper.property.clientPort=2181\nweb.hbase.zookeeper.quorum=sandbox.hortonworks.com\n\napp.envContextConfig.env=storm\napp.envContextConfig.mode=local\napp.dataSourceConfig.topic=sandbox_hbase_security_log\napp.dataSourceConfig.zkConnection=sandbox.hortonworks.com:2181\napp.dataSourceConfig.zkConnectionTimeoutMS=15000\napp.dataSourceConfig.brokerZkPath=/brokers\napp.dataSourceConfig.fetchSize=1048586\napp.dataSourceConfig.transactionZKServers=sandbox.hortonworks.com\napp.dataSourceConfig.transactionZKPort=2181\napp.dataSourceConfig.transactionZKRoot=/consumers\napp.dataSourceConfig.consumerGroupId=eagle.hbasesecurity.consumer\napp.dataSourceConfig.transactionStateUpdateMS=2000\napp.dataSourceConfig.deserializerClass=org.apache.eagle.security.hbase.parse.HbaseAuditLogKafkaDeserializer\napp.eagleProps.site=sandbox\napp.eagleProps.application=hbaseSecurityLog\napp.eagleProps.dataJoinPollIntervalSec=30\napp.eagleProps.mailHost=mailHost.com\na
 pp.eagleProps.mailSmtpPort=25\napp.eagleProps.mailDebug=true\napp.eagleProps.eagleService.host=localhost\napp.eagleProps.eagleService.port=9098\napp.eagleProps.eagleService.username=admin\napp.eagleProps.eagleService.password=secret";
+
+        Config topoConfig = ConfigFactory.parseString(topoConfigStr, options);
+        Config conf = topoConfig.getConfig(EagleConfigConstants.APP_CONFIG);
+
+        HbaseAuditLogMonitoringTopology topology = new HbaseAuditLogMonitoringTopology();
+        //topology.submit("", conf);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ecf75b28/eagle-security/eagle-security-hbase-securitylog/src/test/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hbase-securitylog/src/test/resources/application.conf b/eagle-security/eagle-security-hbase-securitylog/src/test/resources/application.conf
new file mode 100644
index 0000000..2fc60a5
--- /dev/null
+++ b/eagle-security/eagle-security-hbase-securitylog/src/test/resources/application.conf
@@ -0,0 +1,66 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+{
+  "envContextConfig" : {
+    "env" : "storm",
+    "mode" : "local",
+    "topologyName" : "sandbox-hbaseSecurityLog-topology",
+    "stormConfigFile" : "security-auditlog-storm.yaml",
+    "parallelismConfig" : {
+      "kafkaMsgConsumer" : 1,
+      "hbaseSecurityLogAlertExecutor*" : 1
+    }
+  },
+  "dataSourceConfig": {
+    "topic" : "sandbox_hbase_security_log",
+    "zkConnection" : "sandbox.hortonworks.com:2181",
+    "zkConnectionTimeoutMS" : 15000,
+    "consumerGroupId" : "EagleConsumer",
+    "fetchSize" : 1048586,
+    "deserializerClass" : "org.apache.eagle.security.hbase.parse.HbaseAuditLogKafkaDeserializer",
+    "transactionZKServers" : "sandbox.hortonworks.com",
+    "transactionZKPort" : 2181,
+    "transactionZKRoot" : "/consumers",
+    "consumerGroupId" : "eagle.hbasesecurity.consumer",
+    "transactionStateUpdateMS" : 2000
+  },
+  "alertExecutorConfigs" : {
+    "hbaseSecurityLogAlertExecutor" : {
+      "parallelism" : 1,
+      "partitioner" : "org.apache.eagle.policy.DefaultPolicyPartitioner"
+      "needValidation" : "true"
+    }
+  },
+  "eagleProps" : {
+    "site" : "sandbox",
+    "application": "hbaseSecurityLog",
+    "dataJoinPollIntervalSec" : 30,
+    "mailHost" : "mailHost.com",
+    "mailSmtpPort":"25",
+    "mailDebug" : "true",
+    "eagleService": {
+      "host": "localhost",
+      "port": 9098
+      "username": "admin",
+      "password": "secret"
+    }
+  },
+  "dynamicConfigSource" : {
+    "enabled" : true,
+    "initDelayMillis" : 0,
+    "delayMillis" : 30000
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ecf75b28/eagle-security/eagle-security-hbase-securitylog/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hbase-securitylog/src/test/resources/log4j.properties b/eagle-security/eagle-security-hbase-securitylog/src/test/resources/log4j.properties
new file mode 100644
index 0000000..25331ab
--- /dev/null
+++ b/eagle-security/eagle-security-hbase-securitylog/src/test/resources/log4j.properties
@@ -0,0 +1,35 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+log4j.rootLogger=INFO, stdout
+
+ eagle.log.dir=../logs
+ eagle.log.file=eagle.log
+
+# standard output
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n
+
+# Daily Rolling File Appender
+ log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender
+ log4j.appender.DRFA.File=${eagle.log.dir}/${eagle.log.file}
+ log4j.appender.DRFA.DatePattern=.yyyy-MM-dd
+## 30-day backup
+# log4j.appender.DRFA.MaxBackupIndex=30
+ log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout
+
+# Pattern format: Date LogLevel LoggerName LogMessage
+log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ecf75b28/eagle-security/eagle-security-hdfs-auditlog/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/pom.xml b/eagle-security/eagle-security-hdfs-auditlog/pom.xml
index c8bc8e7..09bf007 100644
--- a/eagle-security/eagle-security-hdfs-auditlog/pom.xml
+++ b/eagle-security/eagle-security-hdfs-auditlog/pom.xml
@@ -35,6 +35,11 @@
         <version>${project.version}</version>
   	</dependency>
     <dependency>
+      <groupId>org.apache.eagle</groupId>
+      <artifactId>eagle-stream-application-manager</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
         	<groupId>org.apache.eagle</groupId>
     		<artifactId>eagle-embed-server</artifactId>
             <version>${project.version}</version>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ecf75b28/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogMonitoringTopology.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogMonitoringTopology.java b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogMonitoringTopology.java
new file mode 100644
index 0000000..a7f207e
--- /dev/null
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogMonitoringTopology.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.eagle.security.auditlog;
+
+
+import com.typesafe.config.Config;
+import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSourcedSpoutProvider;
+import org.apache.eagle.datastream.ExecutionEnvironments;
+import org.apache.eagle.datastream.storm.StormExecutionEnvironment;
+import org.apache.eagle.stream.application.TopologyExecutable;
+
+public class HdfsAuditLogMonitoringTopology implements TopologyExecutable {
+    @Override
+    public void submit(String topology, Config config) {
+        StormExecutionEnvironment env = ExecutionEnvironments.getStorm(config);
+        KafkaSourcedSpoutProvider provider = HdfsAuditLogProcessorMain.createProvider(env.getConfig());
+        Boolean balancePartition = config.hasPath("eagleProps.balancePartitionEnabled") && config.getBoolean("eagleProps.balancePartitionEnabled");
+        if (balancePartition) {
+            HdfsAuditLogProcessorMain.execWithBalancedPartition(env, provider);
+        } else {
+            HdfsAuditLogProcessorMain.execWithDefaultPartition(env, provider);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ecf75b28/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/timer/FileSensitivityPollingJob.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/timer/FileSensitivityPollingJob.java b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/timer/FileSensitivityPollingJob.java
index a28e532..a4fed79 100644
--- a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/timer/FileSensitivityPollingJob.java
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/timer/FileSensitivityPollingJob.java
@@ -63,7 +63,7 @@ public class FileSensitivityPollingJob implements Job{
 	private List<FileSensitivityAPIEntity> load(JobDataMap jobDataMap) throws Exception{
 		Map<String, Object> map = (Map<String,Object>)jobDataMap.get(EagleConfigConstants.EAGLE_SERVICE);
 		String eagleServiceHost = (String)map.get(EagleConfigConstants.HOST);
-		Integer eagleServicePort = (Integer)map.get(EagleConfigConstants.PORT);
+		Integer eagleServicePort = Integer.parseInt(map.get(EagleConfigConstants.PORT).toString());
 		String username = map.containsKey(EagleConfigConstants.USERNAME) ? (String)map.get(EagleConfigConstants.USERNAME) : null;
 		String password = map.containsKey(EagleConfigConstants.PASSWORD) ? (String)map.get(EagleConfigConstants.PASSWORD) : null;
 		// load from eagle database

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ecf75b28/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/timer/IPZonePollingJob.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/timer/IPZonePollingJob.java b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/timer/IPZonePollingJob.java
index ca6dc16..2f7efc8 100644
--- a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/timer/IPZonePollingJob.java
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/timer/IPZonePollingJob.java
@@ -64,7 +64,7 @@ public class IPZonePollingJob implements Job{
 	private List<IPZoneEntity> load(JobDataMap jobDataMap) throws Exception{
 		Map<String, Object> map = (Map<String,Object>)jobDataMap.get(EagleConfigConstants.EAGLE_SERVICE);
 		String eagleServiceHost = (String)map.get(EagleConfigConstants.HOST);
-		Integer eagleServicePort = (Integer)map.get(EagleConfigConstants.PORT);
+		Integer eagleServicePort = Integer.parseInt(map.get(EagleConfigConstants.PORT).toString());
 		String username = map.containsKey(EagleConfigConstants.USERNAME) ? (String)map.get(EagleConfigConstants.USERNAME) : null;
 		String password = map.containsKey(EagleConfigConstants.PASSWORD) ? (String)map.get(EagleConfigConstants.PASSWORD) : null;
 		// load from eagle database

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ecf75b28/eagle-security/eagle-security-hive/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hive/pom.xml b/eagle-security/eagle-security-hive/pom.xml
index 413a782..efa534f 100644
--- a/eagle-security/eagle-security-hive/pom.xml
+++ b/eagle-security/eagle-security-hive/pom.xml
@@ -34,7 +34,12 @@
 	      <groupId>org.apache.eagle</groupId>
 	  	  <artifactId>eagle-storm-jobrunning-spout</artifactId>
           <version>${project.version}</version>
-	   </dependency>	   
+	   </dependency>
+      <dependency>
+          <groupId>org.apache.eagle</groupId>
+          <artifactId>eagle-stream-application-manager</artifactId>
+          <version>${project.version}</version>
+      </dependency>
 	   <dependency>
 	      <groupId>org.apache.curator</groupId>
 	  	  <artifactId>curator-framework</artifactId>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ecf75b28/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/HiveJobRunningMonitoringTopology.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/HiveJobRunningMonitoringTopology.java b/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/HiveJobRunningMonitoringTopology.java
new file mode 100644
index 0000000..81f329d
--- /dev/null
+++ b/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/HiveJobRunningMonitoringTopology.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.eagle.security.hive;
+
+
+import com.typesafe.config.Config;
+import org.apache.eagle.datastream.ExecutionEnvironments;
+import org.apache.eagle.datastream.storm.StormExecutionEnvironment;
+import org.apache.eagle.security.hive.jobrunning.HiveJobRunningSourcedStormSpoutProvider;
+import org.apache.eagle.security.hive.jobrunning.HiveQueryParserExecutor;
+import org.apache.eagle.security.hive.jobrunning.JobConfigurationAdaptorExecutor;
+import org.apache.eagle.security.hive.sensitivity.HiveResourceSensitivityDataJoinExecutor;
+import org.apache.eagle.stream.application.TopologyExecutable;
+
+import java.util.Arrays;
+
+public class HiveJobRunningMonitoringTopology implements TopologyExecutable {
+    @Override
+    public void submit(String topology, Config config) {
+        StormExecutionEnvironment env = ExecutionEnvironments.getStorm(config);
+        String spoutName = "msgConsumer";
+        int parallelism = env.getConfig().getInt("envContextConfig.parallelismConfig." + spoutName);
+        env.fromSpout(new HiveJobRunningSourcedStormSpoutProvider().getSpout(env.getConfig(), parallelism))
+                .withOutputFields(4).nameAs(spoutName).groupBy(Arrays.asList(0))
+                .flatMap(new JobConfigurationAdaptorExecutor()).groupBy(Arrays.asList(0))
+                .flatMap(new HiveQueryParserExecutor()).groupBy(Arrays.asList(0))
+                .flatMap(new HiveResourceSensitivityDataJoinExecutor())
+                .alertWithConsumer("hiveAccessLogStream", "hiveAccessAlertByRunningJob");
+        env.execute();
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ecf75b28/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/HiveJobRunningSourcedStormSpoutProvider.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/HiveJobRunningSourcedStormSpoutProvider.java b/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/HiveJobRunningSourcedStormSpoutProvider.java
index 9ddf0b2..729f519 100644
--- a/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/HiveJobRunningSourcedStormSpoutProvider.java
+++ b/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/jobrunning/HiveJobRunningSourcedStormSpoutProvider.java
@@ -17,6 +17,7 @@
 package org.apache.eagle.security.hive.jobrunning;
 
 import backtype.storm.topology.base.BaseRichSpout;
+import org.apache.eagle.job.DefaultJobPartitionerImpl;
 import org.apache.eagle.job.JobPartitioner;
 import org.apache.eagle.jobrunning.config.RunningJobCrawlConfig;
 import org.apache.eagle.jobrunning.config.RunningJobCrawlConfig.ControlConfig;
@@ -69,9 +70,11 @@ public class HiveJobRunningSourcedStormSpoutProvider {
 			controlConfig.partitionerCls = (Class<? extends JobPartitioner>)Class.forName(config.getString("dataSourceConfig.partitionerCls"));
 		}
 		catch(Exception ex){
-			LOG.error("failing find job id partitioner class " + config.getString("dataSourceConfig.partitionerCls"));
-			throw new IllegalStateException("jobId partitioner class does not exist " + config.getString("dataSourceConfig.partitionerCls"));
-		}
+			LOG.warn("failing find job id partitioner class " + config.getString("dataSourceConfig.partitionerCls"));
+			//throw new IllegalStateException("jobId partitioner class does not exist " + config.getString("dataSourceConfig.partitionerCls"));
+            controlConfig.partitionerCls = DefaultJobPartitionerImpl.class;
+
+        }
 		
 		JobRunningSpout spout = new JobRunningSpout(crawlConfig);
 		return spout;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ecf75b28/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/sensitivity/HiveResourceSensitivityPollingJob.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/sensitivity/HiveResourceSensitivityPollingJob.java b/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/sensitivity/HiveResourceSensitivityPollingJob.java
index ff7462c..b7d9e9c 100644
--- a/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/sensitivity/HiveResourceSensitivityPollingJob.java
+++ b/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/sensitivity/HiveResourceSensitivityPollingJob.java
@@ -66,7 +66,7 @@ public class HiveResourceSensitivityPollingJob implements Job {
     private List<HiveResourceSensitivityAPIEntity> load(JobDataMap jobDataMap) throws Exception {
         Map<String, Object> map = (Map<String,Object>)jobDataMap.get(EagleConfigConstants.EAGLE_SERVICE);
         String eagleServiceHost = (String)map.get(EagleConfigConstants.HOST);
-        Integer eagleServicePort = (Integer)map.get(EagleConfigConstants.PORT);
+        Integer eagleServicePort = Integer.parseInt(map.get(EagleConfigConstants.PORT).toString());
         String username = map.containsKey(EagleConfigConstants.USERNAME) ? (String)map.get(EagleConfigConstants.USERNAME) : null;
         String password = map.containsKey(EagleConfigConstants.PASSWORD) ? (String)map.get(EagleConfigConstants.PASSWORD) : null;
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ecf75b28/eagle-topology-assembly/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-topology-assembly/pom.xml b/eagle-topology-assembly/pom.xml
index 7fc9875..bbf0a91 100644
--- a/eagle-topology-assembly/pom.xml
+++ b/eagle-topology-assembly/pom.xml
@@ -87,12 +87,12 @@
             <artifactId>eagle-hadoop-metric</artifactId>
             <version>${project.version}</version>
         </dependency>
-	<dependency>
+	    <dependency>
             <groupId>org.apache.eagle</groupId>
             <artifactId>eagle-gc</artifactId>
             <version>${project.version}</version>
         </dependency>
-<dependency>
+        <dependency>
             <groupId>org.apache.eagle</groupId>
             <artifactId>eagle-alert-notification-plugin</artifactId>
             <version>${project.version}</version>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ecf75b28/eagle-topology-assembly/src/assembly/eagle-topology-assembly.xml
----------------------------------------------------------------------
diff --git a/eagle-topology-assembly/src/assembly/eagle-topology-assembly.xml b/eagle-topology-assembly/src/assembly/eagle-topology-assembly.xml
index 2a0d211..97e36b8 100644
--- a/eagle-topology-assembly/src/assembly/eagle-topology-assembly.xml
+++ b/eagle-topology-assembly/src/assembly/eagle-topology-assembly.xml
@@ -56,8 +56,8 @@
             <directory>${project.build.outputDirectory}</directory>
             <outputDirectory>/</outputDirectory>
             <excludes>
-                <exclude>application.conf</exclude>
-                <exclude>log4j.properties</exclude>
+                <exclude>**/application.conf</exclude>
+                <exclude>**/log4j.properties</exclude>
                 <exclude>**/storm.yaml.1</exclude>
             </excludes>
             <includes>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ecf75b28/eagle-webservice/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-webservice/pom.xml b/eagle-webservice/pom.xml
index abddd91..2d16f2f 100644
--- a/eagle-webservice/pom.xml
+++ b/eagle-webservice/pom.xml
@@ -140,12 +140,28 @@
 				</exclusion>
 			</exclusions>
 		</dependency>
-
 		<dependency>
 			<groupId>org.apache.eagle</groupId>
 			<artifactId>eagle-security-hdfs-web</artifactId>
 			<version>${project.version}</version>
 		</dependency>
+		<!--
+		<dependency>
+			<groupId>org.apache.eagle</groupId>
+			<artifactId>eagle-stream-application-manager</artifactId>
+			<version>${project.version}</version>
+		</dependency> -->
+        <dependency>
+            <groupId>org.apache.eagle</groupId>
+            <artifactId>eagle-topology-assembly</artifactId>
+            <version>${project.version}</version>
+			<exclusions>
+				<exclusion>
+					<groupId>org.quartz-scheduler</groupId>
+					<artifactId>quartz</artifactId>
+				</exclusion>
+			</exclusions>
+        </dependency>
 
 		<!-- eagle user profile common dependency -->
 		<dependency>
@@ -163,7 +179,6 @@
 			<artifactId>eagle-machinelearning-base</artifactId>
 			<version>${project.version}</version>
 		</dependency>
-
 		<dependency>
 			<groupId>org.apache.eagle</groupId>
 			<artifactId>eagle-alert-base</artifactId>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ecf75b28/eagle-webservice/src/main/java/org/apache/eagle/service/security/profile/ApplicationSchedulerListener.java
----------------------------------------------------------------------
diff --git a/eagle-webservice/src/main/java/org/apache/eagle/service/security/profile/ApplicationSchedulerListener.java b/eagle-webservice/src/main/java/org/apache/eagle/service/security/profile/ApplicationSchedulerListener.java
new file mode 100644
index 0000000..a225192
--- /dev/null
+++ b/eagle-webservice/src/main/java/org/apache/eagle/service/security/profile/ApplicationSchedulerListener.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.eagle.service.security.profile;
+
+
+import akka.actor.ActorSystem;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import org.apache.eagle.stream.application.scheduler.ApplicationScheduler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.Duration;
+
+import javax.servlet.ServletContextEvent;
+import javax.servlet.ServletContextListener;
+import java.util.concurrent.TimeUnit;
+
+public class ApplicationSchedulerListener implements ServletContextListener {
+    private static final Logger LOG = LoggerFactory.getLogger(ApplicationSchedulerListener.class);
+
+    //@Autowired
+    private ActorSystem system;
+
+    @Override
+    public void contextInitialized(ServletContextEvent servletContextEvent) {
+        //Get the actor system from the spring context
+        //SpringBeanAutowiringSupport.processInjectionBasedOnCurrentContext(this);
+        Config config = ConfigFactory.load("eagle-scheduler.conf");
+        system = new ApplicationScheduler().start(config);
+    }
+
+    @Override
+    public void contextDestroyed(ServletContextEvent servletContextEvent) {
+        if (system != null) {
+            LOG.info("Killing ActorSystem as a part of web application ctx destruction.");
+            system.shutdown();
+            system.awaitTermination(Duration.create(15, TimeUnit.SECONDS));
+        } else {
+            LOG.warn("No actor system loaded, yet trying to shut down. Check AppContext config and consider if you need this listener.");
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ecf75b28/eagle-webservice/src/main/java/org/apache/eagle/service/security/profile/EagleServiceProfileInitializer.java
----------------------------------------------------------------------
diff --git a/eagle-webservice/src/main/java/org/apache/eagle/service/security/profile/EagleServiceProfileInitializer.java b/eagle-webservice/src/main/java/org/apache/eagle/service/security/profile/EagleServiceProfileInitializer.java
index fd28453..600e17a 100644
--- a/eagle-webservice/src/main/java/org/apache/eagle/service/security/profile/EagleServiceProfileInitializer.java
+++ b/eagle-webservice/src/main/java/org/apache/eagle/service/security/profile/EagleServiceProfileInitializer.java
@@ -17,6 +17,7 @@
 package org.apache.eagle.service.security.profile;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
+import org.apache.eagle.stream.application.scheduler.ApplicationScheduler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.context.ApplicationContextInitializer;
@@ -37,5 +38,7 @@ public class EagleServiceProfileInitializer implements ApplicationContextInitial
         logger.info("Eagle service use env: " + profile);
         applicationContext.getEnvironment().setActiveProfiles(profile);
         applicationContext.refresh();
+
+        //new ApplicationScheduler().startDeamon();
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ecf75b28/eagle-webservice/src/main/resources/application-derby.conf
----------------------------------------------------------------------
diff --git a/eagle-webservice/src/main/resources/application-derby.conf b/eagle-webservice/src/main/resources/application-derby.conf
new file mode 100644
index 0000000..c922a7e
--- /dev/null
+++ b/eagle-webservice/src/main/resources/application-derby.conf
@@ -0,0 +1,30 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+eagle {
+	service {
+		storage-type="jdbc"
+		storage-adapter="derby"
+		storage-username="eagle"
+		storage-password=eagle
+		storage-database=eagle
+		storage-connection-url="jdbc:derby:/tmp/eagle-db-dev;create=true"
+		storage-connection-props="encoding=UTF-8"
+		storage-driver-class="org.apache.derby.jdbc.EmbeddedDriver"
+		storage-connection-max=8
+	}
+}
+
+

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ecf75b28/eagle-webservice/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-webservice/src/main/resources/application.conf b/eagle-webservice/src/main/resources/application.conf
index ca936ed..cfa6a0b 100644
--- a/eagle-webservice/src/main/resources/application.conf
+++ b/eagle-webservice/src/main/resources/application.conf
@@ -13,16 +13,16 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-eagle {
-	service {
-		storage-type="jdbc"
-		storage-adapter="derby"
-		storage-username="eagle"
-		storage-password=eagle
-		storage-database=eagle
-		storage-connection-url="jdbc:derby:/tmp/eagle-db-dev;create=true"
-		storage-connection-props="encoding=UTF-8"
-		storage-driver-class="org.apache.derby.jdbc.EmbeddedDriver"
-		storage-connection-max=8
+
+eagle{
+	service{
+		storage-type="hbase"
+		hbase-zookeeper-quorum="sandbox.hortonworks.com"
+		hbase-zookeeper-property-clientPort=2181
+		zookeeper-znode-parent="/hbase-unsecure",
+		springActiveProfile="sandbox"
+		audit-enabled=true
 	}
-}
\ No newline at end of file
+}
+
+

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ecf75b28/eagle-webservice/src/main/resources/eagle-scheduler.conf
----------------------------------------------------------------------
diff --git a/eagle-webservice/src/main/resources/eagle-scheduler.conf b/eagle-webservice/src/main/resources/eagle-scheduler.conf
new file mode 100644
index 0000000..aaab131
--- /dev/null
+++ b/eagle-webservice/src/main/resources/eagle-scheduler.conf
@@ -0,0 +1,41 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+### scheduler propertise
+appCommandLoaderIntervalSecs = 1
+appHealthCheckIntervalSecs = 5
+
+### execution platform properties
+envContextConfig.env = "storm"
+envContextConfig.url = "http://sandbox.hortonworks.com:8744"
+envContextConfig.nimbusHost = "sandbox.hortonworks.com"
+envContextConfig.nimbusThriftPort = 6627
+envContextConfig.jarFile = "/dir-to-jar/eagle-topology-0.3.0-incubating-assembly.jar"
+
+### default topology properties
+eagleProps.mailHost = "mailHost.com"
+eagleProps.mailSmtpPort = "25"
+eagleProps.mailDebug = "true"
+eagleProps.eagleService.host = "localhost"
+eagleProps.eagleService.port = 9099
+eagleProps.eagleService.username = "admin"
+eagleProps.eagleService.password = "secret"
+eagleProps.dataJoinPollIntervalSec = 30
+
+dynamicConfigSource.enabled = true
+dynamicConfigSource.initDelayMillis = 0
+dynamicConfigSource.delayMillis = 30000
+

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ecf75b28/eagle-webservice/src/main/webapp/WEB-INF/web.xml
----------------------------------------------------------------------
diff --git a/eagle-webservice/src/main/webapp/WEB-INF/web.xml b/eagle-webservice/src/main/webapp/WEB-INF/web.xml
index ed56a73..1907767 100644
--- a/eagle-webservice/src/main/webapp/WEB-INF/web.xml
+++ b/eagle-webservice/src/main/webapp/WEB-INF/web.xml
@@ -102,6 +102,12 @@
     <listener>
         <listener-class>org.springframework.web.context.ContextLoaderListener</listener-class>
     </listener>
+
+    <!-- AKKA System Setup -->
+    <listener>
+        <listener-class>org.apache.eagle.service.security.profile.ApplicationSchedulerListener</listener-class>
+    </listener>
+
     <session-config>
         <!-- in minutes -->
         <session-timeout>60</session-timeout>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ecf75b28/eagle-webservice/src/main/webapp/app/public/css/main.css
----------------------------------------------------------------------
diff --git a/eagle-webservice/src/main/webapp/app/public/css/main.css b/eagle-webservice/src/main/webapp/app/public/css/main.css
index f07c3fb..a7eba4b 100644
--- a/eagle-webservice/src/main/webapp/app/public/css/main.css
+++ b/eagle-webservice/src/main/webapp/app/public/css/main.css
@@ -779,6 +779,14 @@ td.text-ellipsis {
 	line-height: 100%;
 }
 
+pre.noWrap {
+	border: none;
+	border-radius: 0;
+	background: transparent;
+	margin: 0;
+	padding: 0;
+}
+
 .noSelect {
 	-khtml-user-select: none;
 	-moz-user-select: none;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ecf75b28/eagle-webservice/src/main/webapp/app/public/feature/metrics/controller.js
----------------------------------------------------------------------
diff --git a/eagle-webservice/src/main/webapp/app/public/feature/metrics/controller.js b/eagle-webservice/src/main/webapp/app/public/feature/metrics/controller.js
index a2ac640..d717ad1 100644
--- a/eagle-webservice/src/main/webapp/app/public/feature/metrics/controller.js
+++ b/eagle-webservice/src/main/webapp/app/public/feature/metrics/controller.js
@@ -69,7 +69,7 @@
 
 	feature.controller('dashboard', function(PageConfig, $scope, $http, $q, UI, Site, Authorization, Application, Entities, DashboardFormatter) {
 		var _siteApp = Site.currentSiteApplication();
-		var _druidConfig = _siteApp.configObj.druid;
+		var _druidConfig = _siteApp.configObj.getValueByPath("web.druid");
 		var _refreshInterval;
 
 		var _menu_newChart;