You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by yo...@apache.org on 2016/08/12 03:40:29 UTC

[2/3] incubator-eagle git commit: hdfs, hbase, mapr app conversion Author: Yong Zhang Close: #334

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/27513f7b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/AbstractHdfsAuditLogApplication.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/AbstractHdfsAuditLogApplication.java b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/AbstractHdfsAuditLogApplication.java
new file mode 100644
index 0000000..ccbce98
--- /dev/null
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/AbstractHdfsAuditLogApplication.java
@@ -0,0 +1,117 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  * <p/>
+ *  * http://www.apache.org/licenses/LICENSE-2.0
+ *  * <p/>
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.security.auditlog;
+
+import backtype.storm.generated.StormTopology;
+import backtype.storm.topology.BoltDeclarer;
+import backtype.storm.topology.IRichSpout;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.tuple.Fields;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import org.apache.commons.lang3.time.DateUtils;
+import org.apache.eagle.app.StormApplication;
+import org.apache.eagle.app.environment.impl.StormEnvironment;
+import org.apache.eagle.app.sink.StormStreamSink;
+import org.apache.eagle.common.config.EagleConfigConstants;
+import org.apache.eagle.dataproc.impl.storm.partition.CustomPartitionGrouping;
+import org.apache.eagle.partition.DataDistributionDao;
+import org.apache.eagle.partition.PartitionAlgorithm;
+import org.apache.eagle.partition.PartitionStrategy;
+import org.apache.eagle.partition.PartitionStrategyImpl;
+import org.apache.eagle.security.partition.DataDistributionDaoImpl;
+import org.apache.eagle.security.partition.GreedyPartitionAlgorithm;
+import org.apache.eagle.security.topo.NewKafkaSourcedSpoutProvider;
+import storm.kafka.StringScheme;
+
+/**
+ * Since 8/10/16.
+ */
+public abstract class AbstractHdfsAuditLogApplication extends StormApplication {
+    public final static String SPOUT_TASK_NUM = "topology.numOfSpoutTasks";
+    public final static String PARSER_TASK_NUM = "topology.numOfParserTasks";
+    public final static String SENSITIVITY_JOIN_TASK_NUM = "topology.numOfSensitivityJoinTasks";
+    public final static String IPZONE_JOIN_TASK_NUM = "topology.numOfIPZoneJoinTasks";
+    public final static String SINK_TASK_NUM = "topology.numOfSinkTasks";
+
+    @Override
+    public StormTopology execute(Config config, StormEnvironment environment) {
+        TopologyBuilder builder = new TopologyBuilder();
+        NewKafkaSourcedSpoutProvider provider = new NewKafkaSourcedSpoutProvider();
+        IRichSpout spout = provider.getSpout(config);
+
+        int numOfSpoutTasks = config.getInt(SPOUT_TASK_NUM);
+        int numOfParserTasks = config.getInt(PARSER_TASK_NUM);
+        int numOfSensitivityJoinTasks = config.getInt(SENSITIVITY_JOIN_TASK_NUM);
+        int numOfIPZoneJoinTasks = config.getInt(IPZONE_JOIN_TASK_NUM);
+        int numOfSinkTasks = config.getInt(SINK_TASK_NUM);
+
+        builder.setSpout("ingest", spout, numOfSpoutTasks);
+
+
+        HdfsAuditLogParserBolt parserBolt = new HdfsAuditLogParserBolt();
+        BoltDeclarer boltDeclarer = builder.setBolt("parserBolt", parserBolt, numOfParserTasks);
+
+        Boolean useDefaultPartition = !config.hasPath("eagleProps.useDefaultPartition") || config.getBoolean("eagleProps.useDefaultPartition");
+        if(useDefaultPartition){
+            boltDeclarer.fieldsGrouping("ingest", new Fields(StringScheme.STRING_SCHEME_KEY));
+        }else{
+            boltDeclarer.customGrouping("ingest", new CustomPartitionGrouping(createStrategy(config)));
+        }
+
+        FileSensitivityDataJoinBolt sensitivityDataJoinBolt = new FileSensitivityDataJoinBolt(config);
+        BoltDeclarer sensitivityDataJoinBoltDeclarer = builder.setBolt("sensitivityJoin", sensitivityDataJoinBolt, numOfSensitivityJoinTasks);
+        sensitivityDataJoinBoltDeclarer.fieldsGrouping("parserBolt", new Fields("f1"));
+
+        IPZoneDataJoinBolt ipZoneDataJoinBolt = new IPZoneDataJoinBolt(config);
+        BoltDeclarer ipZoneDataJoinBoltDeclarer = builder.setBolt("ipZoneJoin", ipZoneDataJoinBolt, numOfIPZoneJoinTasks);
+        ipZoneDataJoinBoltDeclarer.fieldsGrouping("sensitivityJoin", new Fields("user"));
+
+        StormStreamSink sinkBolt = environment.getStreamSink("hdfs_audit_log_stream",config);
+        BoltDeclarer kafkaBoltDeclarer = builder.setBolt("kafkaSink", sinkBolt, numOfSinkTasks);
+        kafkaBoltDeclarer.fieldsGrouping("ipZoneJoin", new Fields("user"));
+        return builder.createTopology();
+
+
+    }
+
+    public abstract BaseRichBolt getParserBolt();
+    public abstract String getSinkStreamName();
+
+    public static PartitionStrategy createStrategy(Config config) {
+        // TODO: Refactor configuration structure to avoid repeated config processing configure ~ hao
+        String host = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.HOST);
+        Integer port = config.getInt(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PORT);
+        String username = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.USERNAME);
+        String password = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PASSWORD);
+        String topic = config.getString("dataSourceConfig.topic");
+        DataDistributionDao dao = new DataDistributionDaoImpl(host, port, username, password, topic);
+        PartitionAlgorithm algorithm = new GreedyPartitionAlgorithm();
+        String key1 = EagleConfigConstants.EAGLE_PROPS + ".partitionRefreshIntervalInMin";
+        Integer partitionRefreshIntervalInMin = config.hasPath(key1) ? config.getInt(key1) : 60;
+        String key2 = EagleConfigConstants.EAGLE_PROPS + ".kafkaStatisticRangeInMin";
+        Integer kafkaStatisticRangeInMin =  config.hasPath(key2) ? config.getInt(key2) : 60;
+        PartitionStrategy strategy = new PartitionStrategyImpl(dao, algorithm, partitionRefreshIntervalInMin * DateUtils.MILLIS_PER_MINUTE, kafkaStatisticRangeInMin * DateUtils.MILLIS_PER_MINUTE);
+        return strategy;
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/27513f7b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/FileSensitivityDataJoinBolt.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/FileSensitivityDataJoinBolt.java b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/FileSensitivityDataJoinBolt.java
new file mode 100644
index 0000000..6cbbde6
--- /dev/null
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/FileSensitivityDataJoinBolt.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.security.auditlog;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.IRichBolt;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import com.typesafe.config.Config;
+import org.apache.eagle.datastream.Collector;
+import org.apache.eagle.datastream.JavaStormStreamExecutor2;
+import org.apache.eagle.security.auditlog.timer.FileSensitivityPollingJob;
+import org.apache.eagle.security.auditlog.util.SimplifyPath;
+import org.apache.eagle.security.entity.FileSensitivityAPIEntity;
+import org.apache.eagle.security.util.ExternalDataCache;
+import org.apache.eagle.security.util.ExternalDataJoiner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Tuple2;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class FileSensitivityDataJoinBolt extends BaseRichBolt {
+    private static Logger LOG = LoggerFactory.getLogger(FileSensitivityDataJoinBolt.class);
+    private Config config;
+    private OutputCollector collector;
+
+    public FileSensitivityDataJoinBolt(Config config){
+        this.config = config;
+    }
+
+
+    @Override
+    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+        this.collector = collector;
+        // start hdfs sensitivity data polling
+        try{
+            ExternalDataJoiner joiner = new ExternalDataJoiner(
+                    FileSensitivityPollingJob.class, config, context.getThisComponentId() + "." + context.getThisTaskIndex());
+            joiner.start();
+        }catch(Exception ex){
+            LOG.error("Fail bringing up quartz scheduler", ex);
+            throw new IllegalStateException(ex);
+        }
+    }
+
+    @Override
+    public void execute(Tuple input) {
+        try {
+            Map<String, Object> toBeCopied = (Map<String, Object>) input.getValue(0);
+            Map<String, Object> event = new TreeMap<String, Object>(toBeCopied);
+            Map<String, FileSensitivityAPIEntity> map =
+                    (Map<String, FileSensitivityAPIEntity>) ExternalDataCache.getInstance().
+                            getJobResult(FileSensitivityPollingJob.class);
+            FileSensitivityAPIEntity e = null;
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Receive map: " + map + "event: " + event);
+            }
+
+            String src = (String) event.get("src");
+            if (map != null && src != null) {
+                String simplifiedPath = new SimplifyPath().build(src);
+                for (String fileDir : map.keySet()) {
+                    Pattern pattern = Pattern.compile(simplifiedPath, Pattern.CASE_INSENSITIVE);
+                    Matcher matcher = pattern.matcher(fileDir);
+                    boolean isMatched = matcher.matches();
+                    if (isMatched) {
+                        e = map.get(fileDir);
+                        break;
+                    }
+                }
+            }
+            event.put("sensitivityType", e == null ? "NA" : e.getSensitivityType());
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("After file sensitivity lookup: " + event);
+            }
+            // LOG.info(">>>> After file sensitivity lookup: " + event);
+            collector.emit(Arrays.asList(event.get("user"), event));
+        }catch(Exception ex){
+            LOG.error("error joining data, ignore it", ex);
+        }finally {
+            collector.ack(input);
+        }
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        declarer.declare(new Fields("user", "message"));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/27513f7b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/FileSensitivityDataJoinExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/FileSensitivityDataJoinExecutor.java b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/FileSensitivityDataJoinExecutor.java
deleted file mode 100644
index 33d29d0..0000000
--- a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/FileSensitivityDataJoinExecutor.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.security.auditlog;
-
-import com.typesafe.config.Config;
-import org.apache.eagle.datastream.Collector;
-import org.apache.eagle.datastream.JavaStormStreamExecutor2;
-import org.apache.eagle.security.auditlog.timer.FileSensitivityPollingJob;
-import org.apache.eagle.security.auditlog.util.SimplifyPath;
-import org.apache.eagle.security.entity.FileSensitivityAPIEntity;
-import org.apache.eagle.security.util.ExternalDataCache;
-import org.apache.eagle.security.util.ExternalDataJoiner;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.Tuple2;
-
-import java.util.Map;
-import java.util.TreeMap;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-public class FileSensitivityDataJoinExecutor extends JavaStormStreamExecutor2<String, Map> {
-	private static final Logger LOG = LoggerFactory.getLogger(FileSensitivityDataJoinExecutor.class);
-	private Config config;
-	
-	@Override
-	public void prepareConfig(Config config) {
-		this.config = config;		
-	}
-
-	@Override
-	public void init() {
-		// start IPZone data polling
-		try{
-			ExternalDataJoiner joiner = new ExternalDataJoiner(FileSensitivityPollingJob.class, config, "1");
-			joiner.start();
-		}catch(Exception ex){
-			LOG.error("Fail bring up quartz scheduler", ex);
-			throw new IllegalStateException(ex);
-		}
-	}
-
-    @Override
-    public void flatMap(java.util.List<Object> input, Collector<Tuple2<String, Map>> outputCollector){
-        Map<String, Object> toBeCopied = (Map<String, Object>)input.get(1);
-        Map<String, Object> event = new TreeMap<String, Object>(toBeCopied);
-        Map<String, FileSensitivityAPIEntity> map = (Map<String, FileSensitivityAPIEntity>) ExternalDataCache.getInstance().getJobResult(FileSensitivityPollingJob.class);
-        FileSensitivityAPIEntity e = null;
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Receive map: " + map + "event: " + event);
-        }
-
-        String src = (String)event.get("src");
-        if(map != null && src != null) {
-            String simplifiedPath = new SimplifyPath().build(src);
-            for (String fileDir : map.keySet()) {
-                Pattern pattern = Pattern.compile(simplifiedPath,Pattern.CASE_INSENSITIVE);
-                Matcher matcher = pattern.matcher(fileDir);
-                boolean isMatched = matcher.matches();
-                if (isMatched) {
-                    e = map.get(fileDir);
-                    break;
-                }
-            }
-        }
-        event.put("sensitivityType",  e == null ? "NA" : e.getSensitivityType());
-        if(LOG.isDebugEnabled()) {
-            LOG.debug("After file sensitivity lookup: " + event);
-        }
-        // LOG.info(">>>> After file sensitivity lookup: " + event);
-        outputCollector.collect(new Tuple2(event.get("user"), event));
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/27513f7b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HDFSAuditLogAppProvider.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HDFSAuditLogAppProvider.java b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HDFSAuditLogAppProvider.java
new file mode 100644
index 0000000..fcf9d4f
--- /dev/null
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HDFSAuditLogAppProvider.java
@@ -0,0 +1,34 @@
+/*
+ *
+ *  *
+ *  *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  *  * contributor license agreements.  See the NOTICE file distributed with
+ *  *  * this work for additional information regarding copyright ownership.
+ *  *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  *  * (the "License"); you may not use this file except in compliance with
+ *  *  * the License.  You may obtain a copy of the License at
+ *  *  * <p/>
+ *  *  * http://www.apache.org/licenses/LICENSE-2.0
+ *  *  * <p/>
+ *  *  * Unless required by applicable law or agreed to in writing, software
+ *  *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  *  * See the License for the specific language governing permissions and
+ *  *  * limitations under the License.
+ *  *
+ *
+ */
+
+package org.apache.eagle.security.auditlog;
+
+import org.apache.eagle.app.spi.AbstractApplicationProvider;
+
+/**
+ * Since 8/11/16.
+ */
+public class HdfsAuditLogAppProvider extends AbstractApplicationProvider<HdfsAuditLogApplication> {
+    @Override
+    public HdfsAuditLogApplication getApplication() {
+        return new HdfsAuditLogApplication();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/27513f7b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogApplication.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogApplication.java b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogApplication.java
new file mode 100644
index 0000000..791572b
--- /dev/null
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogApplication.java
@@ -0,0 +1,47 @@
+/*
+ *
+ *  *
+ *  *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  *  * contributor license agreements.  See the NOTICE file distributed with
+ *  *  * this work for additional information regarding copyright ownership.
+ *  *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  *  * (the "License"); you may not use this file except in compliance with
+ *  *  * the License.  You may obtain a copy of the License at
+ *  *  * <p/>
+ *  *  * http://www.apache.org/licenses/LICENSE-2.0
+ *  *  * <p/>
+ *  *  * Unless required by applicable law or agreed to in writing, software
+ *  *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  *  * See the License for the specific language governing permissions and
+ *  *  * limitations under the License.
+ *  *
+ *
+ */
+
+package org.apache.eagle.security.auditlog;
+
+import backtype.storm.topology.base.BaseRichBolt;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+/**
+ * Since 8/11/16.
+ */
+public class HdfsAuditLogApplication extends AbstractHdfsAuditLogApplication {
+    @Override
+    public BaseRichBolt getParserBolt() {
+        return new HdfsAuditLogParserBolt();
+    }
+
+    @Override
+    public String getSinkStreamName() {
+        return "hdfs_audit_log_stream";
+    }
+
+    public static void main(String[] args){
+        Config config = ConfigFactory.load();
+        HdfsAuditLogApplication app = new HdfsAuditLogApplication();
+        app.run(config);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/27513f7b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogKafkaDeserializer.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogKafkaDeserializer.java b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogKafkaDeserializer.java
deleted file mode 100644
index 08ab993..0000000
--- a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogKafkaDeserializer.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.security.auditlog;
-
-import java.util.Map;
-import java.util.Properties;
-import java.util.TreeMap;
-
-import org.apache.eagle.security.hdfs.HDFSAuditLogObject;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.eagle.dataproc.impl.storm.kafka.SpoutKafkaMessageDeserializer;
-import org.apache.eagle.security.hdfs.HDFSAuditLogParser;
-import org.apache.eagle.security.hdfs.HDFSAuditLogObject;
-
-public class HdfsAuditLogKafkaDeserializer implements SpoutKafkaMessageDeserializer{
-	private static Logger LOG = LoggerFactory.getLogger(HdfsAuditLogKafkaDeserializer.class);
-	private Properties props;
-
-	public  HdfsAuditLogKafkaDeserializer(Properties props){
-		this.props = props;
-	}
-	
-	/**
-	 * the steps for deserializing message from kafka
-	 * 1. convert byte array to string
-	 * 2. parse string to eagle entity
-	 */
-	@Override
-	public Object deserialize(byte[] arg0) {
-		String logLine = new String(arg0);
-
-		HDFSAuditLogParser parser = new HDFSAuditLogParser();
-		HDFSAuditLogObject entity = null;
-		try{
-			entity = parser.parse(logLine);
-		}catch(Exception ex){
-			LOG.error("Failing parse audit log message", ex);
-		}
-		if(entity == null){
-			LOG.warn("Event ignored as it can't be correctly parsed, the log is ", logLine);
-			return null;
-		}
-		Map<String, Object> map = new TreeMap<String, Object>();
-		map.put("src", entity.src);
-		map.put("dst", entity.dst);
-		map.put("host", entity.host);
-		map.put("timestamp", entity.timestamp);
-		map.put("allowed", entity.allowed);
-		map.put("user", entity.user);
-		map.put("cmd", entity.cmd);
-		
-		return map;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/27513f7b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogMonitoringTopology.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogMonitoringTopology.java b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogMonitoringTopology.java
deleted file mode 100644
index a7f207e..0000000
--- a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogMonitoringTopology.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-
-package org.apache.eagle.security.auditlog;
-
-
-import com.typesafe.config.Config;
-import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSourcedSpoutProvider;
-import org.apache.eagle.datastream.ExecutionEnvironments;
-import org.apache.eagle.datastream.storm.StormExecutionEnvironment;
-import org.apache.eagle.stream.application.TopologyExecutable;
-
-public class HdfsAuditLogMonitoringTopology implements TopologyExecutable {
-    @Override
-    public void submit(String topology, Config config) {
-        StormExecutionEnvironment env = ExecutionEnvironments.getStorm(config);
-        KafkaSourcedSpoutProvider provider = HdfsAuditLogProcessorMain.createProvider(env.getConfig());
-        Boolean balancePartition = config.hasPath("eagleProps.balancePartitionEnabled") && config.getBoolean("eagleProps.balancePartitionEnabled");
-        if (balancePartition) {
-            HdfsAuditLogProcessorMain.execWithBalancedPartition(env, provider);
-        } else {
-            HdfsAuditLogProcessorMain.execWithDefaultPartition(env, provider);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/27513f7b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogParserBolt.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogParserBolt.java b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogParserBolt.java
new file mode 100644
index 0000000..5ea5950
--- /dev/null
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogParserBolt.java
@@ -0,0 +1,77 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  * <p/>
+ *  * http://www.apache.org/licenses/LICENSE-2.0
+ *  * <p/>
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.security.auditlog;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import org.apache.eagle.security.hdfs.HDFSAuditLogObject;
+import org.apache.eagle.security.hdfs.HDFSAuditLogParser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.TreeMap;
+
+/**
+ * Since 8/10/16.
+ */
+public class HdfsAuditLogParserBolt extends BaseRichBolt {
+    private static Logger LOG = LoggerFactory.getLogger(HdfsAuditLogParserBolt.class);
+    private OutputCollector collector;
+
+    @Override
+    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+        this.collector = collector;
+    }
+
+    @Override
+    public void execute(Tuple input) {
+        String logLine = input.getString(0);
+
+        HDFSAuditLogParser parser = new HDFSAuditLogParser();
+        HDFSAuditLogObject entity = null;
+        try{
+            entity = parser.parse(logLine);
+            Map<String, Object> map = new TreeMap<String, Object>();
+            map.put("src", entity.src);
+            map.put("dst", entity.dst);
+            map.put("host", entity.host);
+            map.put("timestamp", entity.timestamp);
+            map.put("allowed", entity.allowed);
+            map.put("user", entity.user);
+            map.put("cmd", entity.cmd);
+            collector.emit(Arrays.asList(map));
+        }catch(Exception ex){
+            LOG.error("Failing parse audit log message", ex);
+        }finally {
+            collector.ack(input);
+        }
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        declarer.declare(new Fields("f1"));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/27513f7b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogProcessorMain.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogProcessorMain.java b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogProcessorMain.java
deleted file mode 100644
index 60b0e36..0000000
--- a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogProcessorMain.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-package org.apache.eagle.security.auditlog;
-
-import backtype.storm.spout.SchemeAsMultiScheme;
-import com.typesafe.config.Config;
-import org.apache.commons.lang3.time.DateUtils;
-import org.apache.eagle.common.config.EagleConfigConstants;
-import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSourcedSpoutProvider;
-import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSourcedSpoutScheme;
-import org.apache.eagle.datastream.ExecutionEnvironments;
-import org.apache.eagle.datastream.core.StreamProducer;
-import org.apache.eagle.datastream.storm.StormExecutionEnvironment;
-import org.apache.eagle.partition.DataDistributionDao;
-import org.apache.eagle.partition.PartitionAlgorithm;
-import org.apache.eagle.partition.PartitionStrategy;
-import org.apache.eagle.partition.PartitionStrategyImpl;
-import org.apache.eagle.security.partition.DataDistributionDaoImpl;
-import org.apache.eagle.security.partition.GreedyPartitionAlgorithm;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-
-public class HdfsAuditLogProcessorMain {
-    public static PartitionStrategy createStrategy(Config config) {
-        // TODO: Refactor configuration structure to avoid repeated config processing configure ~ hao
-        String host = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.HOST);
-        Integer port = config.getInt(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PORT);
-        String username = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.USERNAME);
-        String password = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PASSWORD);
-        String topic = config.getString("dataSourceConfig.topic");
-        DataDistributionDao dao = new DataDistributionDaoImpl(host, port, username, password, topic);
-        PartitionAlgorithm algorithm = new GreedyPartitionAlgorithm();
-        String key1 = EagleConfigConstants.EAGLE_PROPS + ".partitionRefreshIntervalInMin";
-        Integer partitionRefreshIntervalInMin = config.hasPath(key1) ? config.getInt(key1) : 60;
-        String key2 = EagleConfigConstants.EAGLE_PROPS + ".kafkaStatisticRangeInMin";
-        Integer kafkaStatisticRangeInMin =  config.hasPath(key2) ? config.getInt(key2) : 60;
-        PartitionStrategy strategy = new PartitionStrategyImpl(dao, algorithm, partitionRefreshIntervalInMin * DateUtils.MILLIS_PER_MINUTE, kafkaStatisticRangeInMin * DateUtils.MILLIS_PER_MINUTE);
-        return strategy;
-    }
-
-    public static KafkaSourcedSpoutProvider createProvider(Config config) {
-         String deserClsName = config.getString("dataSourceConfig.deserializerClass");
-         final KafkaSourcedSpoutScheme scheme = new KafkaSourcedSpoutScheme(deserClsName, config) {
-                 @Override
-                 public List<Object> deserialize(byte[] ser) {
-                         Object tmp = deserializer.deserialize(ser);
-                         Map<String, Object> map = (Map<String, Object>)tmp;
-                         if(tmp == null) return null;
-                         return Arrays.asList(map.get("user"), tmp);
-                 }
-         };
-
-         KafkaSourcedSpoutProvider provider = new KafkaSourcedSpoutProvider() {
-                 @Override
-                 public SchemeAsMultiScheme getStreamScheme(String deserClsName, Config context) {
-                         return new SchemeAsMultiScheme(scheme);
-                  }
-         };
-         return provider;
-    }
-
-    @SuppressWarnings("unchecked")
-    public static void execWithDefaultPartition(StormExecutionEnvironment env, KafkaSourcedSpoutProvider provider) {
-        StreamProducer source = env.fromSpout(provider).withOutputFields(2).nameAs("kafkaMsgConsumer").groupBy(Arrays.asList(0));
-        //StreamProducer reassembler = source.flatMap(new HdfsUserCommandReassembler()).groupBy(Arrays.asList(0));
-        //source.streamUnion(reassembler)
-        source.flatMap(new FileSensitivityDataJoinExecutor()).groupBy(Arrays.asList(0))
-              .flatMap(new IPZoneDataJoinExecutor())
-              .alertWithConsumer("hdfsAuditLogEventStream", "hdfsAuditLogAlertExecutor");
-        env.execute();
-    }
-
-    @SuppressWarnings("unchecked")
-    public static void execWithBalancedPartition(StormExecutionEnvironment env, KafkaSourcedSpoutProvider provider) {
-        PartitionStrategy strategy = createStrategy(env.getConfig());
-        StreamProducer source = env.fromSpout(provider).withOutputFields(2).nameAs("kafkaMsgConsumer").groupBy(strategy);
-        //StreamProducer reassembler = source.flatMap(new HdfsUserCommandReassembler()).groupBy(Arrays.asList(0));
-        //source.streamUnion(reassembler)
-        source.flatMap(new FileSensitivityDataJoinExecutor()).groupBy(Arrays.asList(0))
-                .flatMap(new IPZoneDataJoinExecutor())
-                .alertWithConsumer("hdfsAuditLogEventStream", "hdfsAuditLogAlertExecutor");
-        env.execute();
-    }
-
-	public static void main(String[] args) throws Exception{
-        StormExecutionEnvironment env = ExecutionEnvironments.getStorm(args);
-        Config config = env.getConfig();
-        KafkaSourcedSpoutProvider provider = createProvider(env.getConfig());
-        Boolean balancePartition = config.hasPath("eagleProps.balancePartitionEnabled") && config.getBoolean("eagleProps.balancePartitionEnabled");
-        if (balancePartition) {
-            execWithBalancedPartition(env, provider);
-        } else {
-            execWithDefaultPartition(env, provider);
-        }
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/27513f7b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/IPZoneDataJoinBolt.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/IPZoneDataJoinBolt.java b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/IPZoneDataJoinBolt.java
new file mode 100644
index 0000000..d02f959
--- /dev/null
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/IPZoneDataJoinBolt.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.security.auditlog;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import com.typesafe.config.Config;
+import org.apache.eagle.datastream.Collector;
+import org.apache.eagle.datastream.JavaStormStreamExecutor2;
+import org.apache.eagle.security.auditlog.timer.FileSensitivityPollingJob;
+import org.apache.eagle.security.auditlog.timer.IPZonePollingJob;
+import org.apache.eagle.security.entity.IPZoneEntity;
+import org.apache.eagle.security.util.ExternalDataCache;
+import org.apache.eagle.security.util.ExternalDataJoiner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Tuple2;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.TreeMap;
+
+public class IPZoneDataJoinBolt extends BaseRichBolt {
+	private static final Logger LOG = LoggerFactory.getLogger(IPZoneDataJoinBolt.class);
+	private Config config;
+	private OutputCollector collector;
+
+	public IPZoneDataJoinBolt(Config config){
+		this.config = config;
+	}
+
+	@Override
+	public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+		this.collector = collector;
+		// start ipzone data polling
+		try{
+			ExternalDataJoiner joiner = new ExternalDataJoiner(IPZonePollingJob.class, config, context.getThisComponentId() + "." + context.getThisTaskIndex());
+			joiner.start();
+		}catch(Exception ex){
+			LOG.error("Fail bring up quartz scheduler", ex);
+			throw new IllegalStateException(ex);
+		}
+	}
+
+	@Override
+	public void execute(Tuple input) {
+		try {
+			Map<String, Object> toBeCopied = (Map<String, Object>) input.getValue(1);
+			Map<String, Object> event = new TreeMap<String, Object>(toBeCopied); // shallow copy
+			Map<String, IPZoneEntity> map = (Map<String, IPZoneEntity>) ExternalDataCache.getInstance().getJobResult(IPZonePollingJob.class);
+			IPZoneEntity e = null;
+			if (map != null) {
+				e = map.get(event.get("host"));
+			}
+			event.put("securityZone", e == null ? "NA" : e.getSecurityZone());
+			if (LOG.isDebugEnabled()) LOG.debug("After IP zone lookup: " + event);
+			collector.emit(Arrays.asList(event.get("user"), event));
+		}catch(Exception ex){
+			LOG.error("error joining data, ignore it", ex);
+		}finally {
+			collector.ack(input);
+		}
+    }
+
+	@Override
+	public void declareOutputFields(OutputFieldsDeclarer declarer) {
+		declarer.declare(new Fields("user", "message"));
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/27513f7b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/IPZoneDataJoinExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/IPZoneDataJoinExecutor.java b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/IPZoneDataJoinExecutor.java
deleted file mode 100644
index d633dcd..0000000
--- a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/IPZoneDataJoinExecutor.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.security.auditlog;
-
-import com.typesafe.config.Config;
-import org.apache.eagle.datastream.Collector;
-import org.apache.eagle.datastream.JavaStormStreamExecutor2;
-import org.apache.eagle.security.auditlog.timer.IPZonePollingJob;
-import org.apache.eagle.security.entity.IPZoneEntity;
-import org.apache.eagle.security.util.ExternalDataCache;
-import org.apache.eagle.security.util.ExternalDataJoiner;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.Tuple2;
-
-import java.util.Map;
-import java.util.TreeMap;
-
-public class IPZoneDataJoinExecutor extends JavaStormStreamExecutor2<String, Map> {
-	private static final Logger LOG = LoggerFactory.getLogger(IPZoneDataJoinExecutor.class);
-	private Config config;
-	
-	@Override
-	public void prepareConfig(Config config) {
-		this.config = config;
-	}
-
-	@Override
-	public void init() {
-		// start IPZone data polling
-		try{
-			ExternalDataJoiner joiner = new ExternalDataJoiner(IPZonePollingJob.class, config, "1");
-			joiner.start();
-		}catch(Exception ex){
-			LOG.error("Fail bring up quartz scheduler", ex);
-			throw new IllegalStateException(ex);
-		}
-	}
-
-    @Override
-    public void flatMap(java.util.List<Object> input, Collector<Tuple2<String, Map>> outputCollector){
-        Map<String, Object> toBeCopied = (Map<String, Object>)input.get(1);
-        Map<String, Object> event = new TreeMap<String, Object>(toBeCopied); // shallow copy
-        Map<String, IPZoneEntity> map = (Map<String, IPZoneEntity>) ExternalDataCache.getInstance().getJobResult(IPZonePollingJob.class);
-        IPZoneEntity e = null;
-        if(map != null){
-            e = map.get(event.get("host"));
-        }
-        event.put("securityZone",  e == null ? "NA" : e.getSecurityZone());
-        if(LOG.isDebugEnabled()) LOG.debug("After IP zone lookup: " + event);
-        outputCollector.collect(new Tuple2(event.get("user"), event));
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/27513f7b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/timer/FileSensitivityPollingJob.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/timer/FileSensitivityPollingJob.java b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/timer/FileSensitivityPollingJob.java
index a4fed79..375edc7 100644
--- a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/timer/FileSensitivityPollingJob.java
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/timer/FileSensitivityPollingJob.java
@@ -16,10 +16,14 @@
  */
 package org.apache.eagle.security.auditlog.timer;
 
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 
 import org.apache.eagle.common.config.EagleConfigConstants;
+import org.apache.eagle.security.service.HdfsSensitivityEntity;
+import org.apache.eagle.security.service.IMetadataServiceClient;
+import org.apache.eagle.security.service.MetadataServiceClientImpl;
 import org.apache.eagle.security.util.ExternalDataCache;
 import org.apache.eagle.security.entity.FileSensitivityAPIEntity;
 import org.quartz.Job;
@@ -29,9 +33,6 @@ import org.quartz.JobExecutionException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
-import org.apache.eagle.service.client.IEagleServiceClient;
-import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
 import com.google.common.base.Function;
 import com.google.common.collect.Maps;
 
@@ -43,15 +44,15 @@ public class FileSensitivityPollingJob implements Job{
 			throws JobExecutionException {
 		JobDataMap jobDataMap = context.getJobDetail().getJobDataMap();
 		try{
-			List<FileSensitivityAPIEntity> ipZones = load(jobDataMap);
-			if(ipZones == null){
+			Collection<HdfsSensitivityEntity> sensitivityAPIEntities = load(jobDataMap);
+			if(sensitivityAPIEntities == null){
 				LOG.warn("File sensitivity information is empty");
 				return;
 			}
-			Map<String, FileSensitivityAPIEntity> map = Maps.uniqueIndex(ipZones, new Function<FileSensitivityAPIEntity, String>(){
+			Map<String, HdfsSensitivityEntity> map = Maps.uniqueIndex(sensitivityAPIEntities, new Function<HdfsSensitivityEntity, String>(){
 				@Override
-				public String apply(FileSensitivityAPIEntity input) {
-					return input.getTags().get("filedir");
+				public String apply(HdfsSensitivityEntity input) {
+					return input.getFiledir();
 				}
 			});
 			ExternalDataCache.getInstance().setJobResult(getClass(), map);
@@ -60,7 +61,7 @@ public class FileSensitivityPollingJob implements Job{
 		}
 	}
 
-	private List<FileSensitivityAPIEntity> load(JobDataMap jobDataMap) throws Exception{
+	private Collection<HdfsSensitivityEntity> load(JobDataMap jobDataMap) throws Exception{
 		Map<String, Object> map = (Map<String,Object>)jobDataMap.get(EagleConfigConstants.EAGLE_SERVICE);
 		String eagleServiceHost = (String)map.get(EagleConfigConstants.HOST);
 		Integer eagleServicePort = Integer.parseInt(map.get(EagleConfigConstants.PORT).toString());
@@ -68,15 +69,7 @@ public class FileSensitivityPollingJob implements Job{
 		String password = map.containsKey(EagleConfigConstants.PASSWORD) ? (String)map.get(EagleConfigConstants.PASSWORD) : null;
 		// load from eagle database
 		LOG.info("Load file sensitivity information from eagle service " + eagleServiceHost + ":" + eagleServicePort);
-		IEagleServiceClient client = new EagleServiceClientImpl(eagleServiceHost, eagleServicePort, username, password);
-		String query = "FileSensitivityService[]{*}";
-		GenericServiceAPIResponseEntity<FileSensitivityAPIEntity> response = client.search()
-			.pageSize(Integer.MAX_VALUE)
-			.query(query)
-			.send();
-		client.close();
-		if(response.getException() != null)
-			throw new IllegalStateException(response.getException());
-		return response.getObj();
+		IMetadataServiceClient client = new MetadataServiceClientImpl(eagleServiceHost, eagleServicePort, "/rest");
+		return client.listHdfsSensitivities();
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/27513f7b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/timer/IPZonePollingJob.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/timer/IPZonePollingJob.java b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/timer/IPZonePollingJob.java
index 2f7efc8..dc80eb9 100644
--- a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/timer/IPZonePollingJob.java
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/timer/IPZonePollingJob.java
@@ -16,10 +16,13 @@
  */
 package org.apache.eagle.security.auditlog.timer;
 
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 
 import org.apache.eagle.common.config.EagleConfigConstants;
+import org.apache.eagle.security.service.IMetadataServiceClient;
+import org.apache.eagle.security.service.MetadataServiceClientImpl;
 import org.apache.eagle.security.util.ExternalDataCache;
 import org.quartz.Job;
 import org.quartz.JobDataMap;
@@ -29,7 +32,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
-import org.apache.eagle.security.entity.IPZoneEntity;
+import org.apache.eagle.security.service.IPZoneEntity;
 import org.apache.eagle.service.client.IEagleServiceClient;
 import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
 import com.google.common.base.Function;
@@ -44,7 +47,7 @@ public class IPZonePollingJob implements Job{
 			throws JobExecutionException {
 		JobDataMap jobDataMap = context.getJobDetail().getJobDataMap();
 		try{
-			List<IPZoneEntity> ipZones = load(jobDataMap);
+			Collection<IPZoneEntity> ipZones = load(jobDataMap);
 			if(ipZones == null){
 				LOG.warn("Ipzone information is empty");
 				return;
@@ -52,7 +55,7 @@ public class IPZonePollingJob implements Job{
 			Map<String, IPZoneEntity> map = Maps.uniqueIndex(ipZones, new Function<IPZoneEntity, String>(){
 				@Override
 				public String apply(IPZoneEntity input) {
-					return input.getTags().get("iphost");
+					return input.getIphost();
 				}
 			});
 			ExternalDataCache.getInstance().setJobResult(getClass(), map);
@@ -61,7 +64,7 @@ public class IPZonePollingJob implements Job{
 		}
 	}
 
-	private List<IPZoneEntity> load(JobDataMap jobDataMap) throws Exception{
+	private Collection<IPZoneEntity> load(JobDataMap jobDataMap) throws Exception{
 		Map<String, Object> map = (Map<String,Object>)jobDataMap.get(EagleConfigConstants.EAGLE_SERVICE);
 		String eagleServiceHost = (String)map.get(EagleConfigConstants.HOST);
 		Integer eagleServicePort = Integer.parseInt(map.get(EagleConfigConstants.PORT).toString());
@@ -69,15 +72,7 @@ public class IPZonePollingJob implements Job{
 		String password = map.containsKey(EagleConfigConstants.PASSWORD) ? (String)map.get(EagleConfigConstants.PASSWORD) : null;
 		// load from eagle database
 		LOG.info("Load ip zone information from eagle service " + eagleServiceHost + ":" + eagleServicePort);
-		IEagleServiceClient client = new EagleServiceClientImpl(eagleServiceHost, eagleServicePort, username, password);
-		String query = "IPZoneService[]{*}";
-		GenericServiceAPIResponseEntity<IPZoneEntity> response = client.search()
-			.pageSize(Integer.MAX_VALUE)
-			.query(query)
-			.send();
-		client.close();
-		if(response.getException() != null)
-			throw new IllegalStateException(response.getException());
-		return response.getObj();
+		IMetadataServiceClient client = new MetadataServiceClientImpl(eagleServiceHost, eagleServicePort, "/rest");
+		return client.listIPZones();
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/27513f7b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider.xml
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider.xml b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider.xml
new file mode 100644
index 0000000..dadab98
--- /dev/null
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider.xml
@@ -0,0 +1,247 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+  ~ /*
+  ~  *
+  ~  *  * Licensed to the Apache Software Foundation (ASF) under one or more
+  ~  *  * contributor license agreements.  See the NOTICE file distributed with
+  ~  *  * this work for additional information regarding copyright ownership.
+  ~  *  * The ASF licenses this file to You under the Apache License, Version 2.0
+  ~  *  * (the "License"); you may not use this file except in compliance with
+  ~  *  * the License.  You may obtain a copy of the License at
+  ~  *  * <p/>
+  ~  *  * http://www.apache.org/licenses/LICENSE-2.0
+  ~  *  * <p/>
+  ~  *  * Unless required by applicable law or agreed to in writing, software
+  ~  *  * distributed under the License is distributed on an "AS IS" BASIS,
+  ~  *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~  *  * See the License for the specific language governing permissions and
+  ~  *  * limitations under the License.
+  ~  *
+  ~  */
+  -->
+
+<application>
+    <type>HdfsAuditLogApplication</type>
+    <name>Hdfs Audit Log Monitoring Application</name>
+    <version>0.5.0-incubating</version>
+    <appClass>org.apache.eagle.security.auditlog.HdfsAuditLogApplication</appClass>
+    <viewPath>/apps/example</viewPath>
+    <configuration>
+        <property>
+            <name>dataSourceConfig.topic</name>
+            <displayName>dataSourceConfig.topic</displayName>
+            <value>hdfs_audit_log</value>
+            <description>data source topic</description>
+        </property>
+        <property>
+            <name>dataSourceConfig.zkConnection</name>
+            <displayName>dataSourceConfig.zkConnection</displayName>
+            <value>server.eagle.apache.org</value>
+            <description>zk connection</description>
+        </property>
+        <property>
+            <name>dataSourceConfig.zkConnectionTimeoutMS</name>
+            <displayName>dataSourceConfig.zkConnectionTimeoutMS</displayName>
+            <value>15000</value>
+            <description>zk connection timeout in milliseconds</description>
+        </property>
+        <property>
+            <name>dataSourceConfig.fetchSize</name>
+            <displayName>dataSourceConfig.fetchSize</displayName>
+            <value>1048586</value>
+            <description>kafka fetch size</description>
+        </property>
+        <property>
+            <name>dataSourceConfig.transactionZKServers</name>
+            <displayName>dataSourceConfig.transactionZKServers</displayName>
+            <value>server.eagle.apache.org</value>
+            <description>zookeeper server for offset transaction</description>
+        </property>
+        <property>
+            <name>dataSourceConfig.transactionZKPort</name>
+            <displayName>dataSourceConfig.transactionZKPort</displayName>
+            <value>2181</value>
+            <description>zookeeper server port for offset transaction</description>
+        </property>
+        <property>
+            <name>dataSourceConfig.transactionZKRoot</name>
+            <displayName>dataSourceConfig.transactionZKRoot</displayName>
+            <value>/consumers</value>
+            <description>offset transaction root</description>
+        </property>
+        <property>
+            <name>dataSourceConfig.consumerGroupId</name>
+            <displayName>dataSourceConfig.consumerGroupId</displayName>
+            <value>eagle.hdfsaudit.consumer</value>
+            <description>kafka consumer group Id</description>
+        </property>
+        <property>
+            <name>dataSourceConfig.transactionStateUpdateMS</name>
+            <displayName>dataSourceConfig.transactionStateUpdateMS</displayName>
+            <value>2000</value>
+            <description>zk upate</description>
+        </property>
+        <property>
+            <name>dataSourceConfig.schemeCls</name>
+            <displayName>dataSourceConfig.schemeCls</displayName>
+            <value>storm.kafka.StringScheme</value>
+            <description>scheme class</description>
+        </property>
+        <property>
+            <name>dataSourceConfig.transactionZKPort</name>
+            <displayName>dataSourceConfig.transactionZKPort</displayName>
+            <value>2181</value>
+            <description>zookeeper server port for offset transaction</description>
+        </property>
+        <property>
+            <name>dataSourceConfig.transactionZKPort</name>
+            <displayName>dataSourceConfig.transactionZKPort</displayName>
+            <value>2181</value>
+            <description>zookeeper server port for offset transaction</description>
+        </property>
+        <property>
+            <name>topology.numOfSpoutTasks</name>
+            <displayName>topology.numOfSpoutTasks</displayName>
+            <value>2</value>
+            <description>number of spout tasks</description>
+        </property>
+        <property>
+            <name>topology.numOfParserTasks</name>
+            <displayName>topology.numOfParserTasks</displayName>
+            <value>2</value>
+            <description>number of parser tasks</description>
+        </property>
+        <property>
+            <name>topology.numOfJoinTasks</name>
+            <displayName>topology.numOfJoinTasks</displayName>
+            <value>2</value>
+            <description>number of external join tasks</description>
+        </property>
+        <property>
+            <name>topology.numOfSinkTasks</name>
+            <displayName>topology.numOfSinkTasks</displayName>
+            <value>2</value>
+            <description>number of sink tasks</description>
+        </property>
+        <property>
+            <name>eagleProps.dataJoinPollIntervalSec</name>
+            <displayName>eagleProps.dataJoinPollIntervalSec</displayName>
+            <value>30</value>
+            <description>interval in seconds for polling</description>
+        </property>
+        <property>
+            <name>eagleProps.eagleService.host</name>
+            <displayName>eagleProps.eagleService.host</displayName>
+            <value>localhost</value>
+            <description>eagle service host</description>
+        </property>
+        <property>
+            <name>eagleProps.eagleService.port</name>
+            <displayName>eagleProps.eagleService.port</displayName>
+            <value>8080</value>
+            <description>eagle service port</description>
+        </property>
+        <property>
+            <name>eagleProps.eagleService.username</name>
+            <displayName>eagleProps.eagleService.username</displayName>
+            <value>admin</value>
+            <description>eagle service username</description>
+        </property>
+        <property>
+            <name>eagleProps.eagleService.password</name>
+            <displayName>eagleProps.eagleService.password</displayName>
+            <value>secret</value>
+            <description>eagle service password</description>
+        </property>
+        <property>
+            <name>dataSinkConfig.topic</name>
+            <displayName>dataSinkConfig.topic</displayName>
+            <value>hdfs_audit_log_parsed</value>
+            <description>topic for kafka data sink</description>
+        </property>
+        <property>
+            <name>dataSinkConfig.brokerList</name>
+            <displayName>dataSinkConfig.brokerList</displayName>
+            <value>sandbox.hortonworks.com:6667</value>
+            <description>kafka broker list</description>
+        </property>
+        <property>
+            <name>dataSinkConfig.serializerClass</name>
+            <displayName>dataSinkConfig.serializerClass</displayName>
+            <value>kafka.serializer.StringEncoder</value>
+            <description>serializer class Kafka message value</description>
+        </property>
+        <property>
+            <name>dataSinkConfig.keySerializerClass</name>
+            <displayName>dataSinkConfig.keySerializerClass</displayName>
+            <value>kafka.serializer.StringEncoder</value>
+            <description>serializer class Kafka message key</description>
+        </property>
+
+        <!-- properties for hdfs file system access and attribute resolver-->
+        <property>
+            <name>fs.defaultFS</name>
+            <displayName>fs.defaultFS</displayName>
+            <value>hdfs://server.eagle.apache.org:8020</value>
+            <description>hdfs endpoint</description>
+        </property>
+    </configuration>
+    <streams>
+        <stream>
+            <streamId>hdfs_audit_log_stream</streamId>
+            <description>Hdfs Audit Log Stream</description>
+            <validate>true</validate>
+            <timeseries>true</timeseries>
+            <columns>
+                <column>
+                    <name>action</name>
+                    <type>string</type>
+                </column>
+                <column>
+                    <name>host</name>
+                    <type>string</type>
+                </column>
+                <column>
+                    <name>status</name>
+                    <type>string</type>
+                </column>
+                <column>
+                    <name>timestamp</name>
+                    <type>long</type>
+                </column>
+            </columns>
+        </stream>
+    </streams>
+    <docs>
+        <install>
+# Step 1: Create source kafka topic named "${site}_example_source_topic"
+
+./bin/kafka-topics.sh --create --topic example_source_topic --replication-factor 1 --replication 1
+
+# Step 2: Set up data collector to flow data into kafka topic in
+
+./bin/logstash -f log_collector.conf
+
+## `log_collector.conf` sample as following:
+
+input {
+
+}
+filter {
+
+}
+output{
+
+}
+
+# Step 3: start application
+
+# Step 4: monitor with featured portal or alert with policies
+        </install>
+        <uninstall>
+# Step 1: stop and uninstall application
+# Step 2: delete kafka topic named "${site}_example_source_topic"
+# Step 3: stop logstash
+        </uninstall>
+    </docs>
+</application>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/27513f7b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider
new file mode 100644
index 0000000..42cf62b
--- /dev/null
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider
@@ -0,0 +1,37 @@
+#
+# /*
+#  *
+#  *  * Licensed to the Apache Software Foundation (ASF) under one or more
+#  *  * contributor license agreements.  See the NOTICE file distributed with
+#  *  * this work for additional information regarding copyright ownership.
+#  *  * The ASF licenses this file to You under the Apache License, Version 2.0
+#  *  * (the "License"); you may not use this file except in compliance with
+#  *  * the License.  You may obtain a copy of the License at
+#  *  * <p/>
+#  *  * http://www.apache.org/licenses/LICENSE-2.0
+#  *  * <p/>
+#  *  * Unless required by applicable law or agreed to in writing, software
+#  *  * distributed under the License is distributed on an "AS IS" BASIS,
+#  *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  *  * See the License for the specific language governing permissions and
+#  *  * limitations under the License.
+#  *
+#  */
+#
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/27513f7b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/application.conf b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/application.conf
index 3c3572e..efa6467 100644
--- a/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/application.conf
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/application.conf
@@ -14,56 +14,42 @@
 # limitations under the License.
 
 {
-  "envContextConfig" : {
-    "env" : "storm",
-    "mode" : "local",
-    "topologyName" : "auditLogProcessTopology",
-    "stormConfigFile" : "security-auditlog-storm.yaml",
-    "parallelismConfig" : {
-      "kafkaMsgConsumer" : 1,
-      "hdfsAuditLogAlertExecutor*" : 1
-    }
+  "appId" : "HdfsAuditLogApp",
+  "mode" : "LOCAL",
+  "siteId" : "testsite",
+  "topology" : {
+    "numOfTotalWorkers" : 2,
+    "numOfSpoutTasks" : 2,
+    "numOfParserTasks" : 2,
+    "numOfSensitivityJoinTasks" : 2,
+    "numOfIPZoneJoinTasks" : 2,
+    "numOfSinkTasks" : 2
   },
   "dataSourceConfig": {
-    "topic" : "sandbox_hdfs_audit_log",
-    "zkConnection" : "sandbox.hortonworks.com:2181",
+    "topic" : "hdfs_audit_log",
+    "zkConnection" : "server.eagle.apache.org:2181",
     "zkConnectionTimeoutMS" : 15000,
     "consumerGroupId" : "EagleConsumer",
     "fetchSize" : 1048586,
-    "deserializerClass" : "org.apache.eagle.security.auditlog.HdfsAuditLogKafkaDeserializer",
-    "transactionZKServers" : "sandbox.hortonworks.com",
+    "transactionZKServers" : "server.eagle.apache.org",
     "transactionZKPort" : 2181,
     "transactionZKRoot" : "/consumers",
-    "transactionStateUpdateMS" : 2000
-  },
-  "alertExecutorConfigs" : {
-     "hdfsAuditLogAlertExecutor" : {
-       "parallelism" : 1,
-       "partitioner" : "org.apache.eagle.policy.DefaultPolicyPartitioner"
-       "needValidation" : "true"
-     }
+    "transactionStateUpdateMS" : 2000,
+    "schemeCls" : "storm.kafka.StringScheme"
   },
   "eagleProps" : {
-    "site" : "sandbox",
-    "application": "hdfsAuditLog",
   	"dataJoinPollIntervalSec" : 30,
-    "mailHost" : "mailHost.com",
-    "mailSmtpPort":"25",
-    "mailDebug" : "true",
-    "balancePartitionEnabled" : true,
-    #"partitionRefreshIntervalInMin" : 60,
-    #"kafkaStatisticRangeInMin" : 60,
     "eagleService": {
       "host": "localhost",
-      "port": 38080,
+      "port": 9090,
       "username": "admin",
       "password": "secret"
-    },
-    "readHdfsUserCommandPatternFrom" : "file"
+    }
   },
-  "dynamicConfigSource" : {
-  	"enabled" : true,
-  	"initDelayMillis" : 0,
-  	"delayMillis" : 30000
+  "dataSinkConfig": {
+    "topic" : "hdfs_audit_log_parsed",
+    "brokerList" : "server.eagle.apache.org:6667",
+    "serializerClass" : "kafka.serializer.StringEncoder",
+    "keySerializerClass" : "kafka.serializer.StringEncoder"
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/27513f7b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/log4j.properties b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/log4j.properties
index 4a22987..e442c46 100644
--- a/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/log4j.properties
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/log4j.properties
@@ -13,17 +13,12 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-log4j.rootLogger=DEBUG, stdout, DRFA
+log4j.rootLogger=INFO, stdout, DRFA
 
 eagle.log.dir=./logs
 eagle.log.file=eagle.log
 
 
-#log4j.logger.org.apache.eagle.security.auditlog.IPZoneDataJoinExecutor=DEBUG
-#log4j.logger.org.apache.eagle.security.auditlog.FileSensitivityDataJoinExecutor=DEBUG
-log4j.logger.org.apache.eagle.security.auditlog.HdfsUserCommandReassembler=DEBUG
-#log4j.logger.org.apache.eagle.executor.AlertExecutor=DEBUG
-# standard output
 log4j.appender.stdout=org.apache.log4j.ConsoleAppender
 log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
 log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/27513f7b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/security-auditlog-storm.yaml
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/security-auditlog-storm.yaml b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/security-auditlog-storm.yaml
deleted file mode 100644
index a68a323..0000000
--- a/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/security-auditlog-storm.yaml
+++ /dev/null
@@ -1,18 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-topology.workers: 1
-topology.acker.executors: 1
-topology.tasks: 1
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/27513f7b/eagle-security/eagle-security-hdfs-auditlog/src/test/java/org/apache/eagle/security/auditlog/TestUserCommandReassembler.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/test/java/org/apache/eagle/security/auditlog/TestUserCommandReassembler.java b/eagle-security/eagle-security-hdfs-auditlog/src/test/java/org/apache/eagle/security/auditlog/TestUserCommandReassembler.java
index a19e9b6..753eb41 100644
--- a/eagle-security/eagle-security-hdfs-auditlog/src/test/java/org/apache/eagle/security/auditlog/TestUserCommandReassembler.java
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/test/java/org/apache/eagle/security/auditlog/TestUserCommandReassembler.java
@@ -22,6 +22,8 @@ package org.apache.eagle.security.auditlog;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import org.apache.eagle.datastream.Collector;
+import org.apache.eagle.security.hdfs.HDFSAuditLogObject;
+import org.apache.eagle.security.hdfs.HDFSAuditLogParser;
 import org.junit.Assert;
 import org.junit.Test;
 import scala.Tuple2;
@@ -32,9 +34,18 @@ import java.util.*;
  * Created by yonzhang on 11/24/15.
  */
 public class TestUserCommandReassembler {
-    private Map<String, Object> parseEvent(String log){
-        HdfsAuditLogKafkaDeserializer deserializer = new HdfsAuditLogKafkaDeserializer(null);
-        return (Map<String, Object>)deserializer.deserialize(log.getBytes());
+    private Map parseEvent(String log) throws Exception{
+        HDFSAuditLogParser deserializer = new HDFSAuditLogParser();
+        HDFSAuditLogObject entity = deserializer.parse(log);
+        Map<String, Object> map = new TreeMap<String, Object>();
+        map.put("src", entity.src);
+        map.put("dst", entity.dst);
+        map.put("host", entity.host);
+        map.put("timestamp", entity.timestamp);
+        map.put("allowed", entity.allowed);
+        map.put("user", entity.user);
+        map.put("cmd", entity.cmd);
+        return map;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/27513f7b/eagle-security/eagle-security-hdfs-auditlog/src/test/resources/securityAuditLog
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/test/resources/securityAuditLog b/eagle-security/eagle-security-hdfs-auditlog/src/test/resources/securityAuditLog
new file mode 100644
index 0000000..361304d
--- /dev/null
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/test/resources/securityAuditLog
@@ -0,0 +1,17 @@
+2015-04-24 12:49:16,145 INFO FSNamesystem.audit: allowed=true	ugi=hdfs (auth:SIMPLE)	ip=/10.0.2.15	cmd=getfileinfo	src=/tmp	dst=null	perm=null	proto=rpc
+2015-04-24 12:49:16,192 INFO FSNamesystem.audit: allowed=true	ugi=hdfs (auth:SIMPLE)	ip=/10.0.2.15	cmd=getfileinfo	src=/user/ambari-qa	dst=null	perm=null	proto=rpc
+2015-04-24 12:49:20,518 INFO FSNamesystem.audit: allowed=true	ugi=hdfs (auth:SIMPLE)	ip=/10.0.2.15	cmd=getfileinfo	src=/tmp	dst=null	perm=null	proto=rpc
+2015-04-24 12:49:20,570 INFO FSNamesystem.audit: allowed=true	ugi=hdfs (auth:SIMPLE)	ip=/10.0.2.15	cmd=getfileinfo	src=/user/ambari-qa	dst=null	perm=null	proto=rpc
+2015-04-24 12:49:20,587 INFO FSNamesystem.audit: allowed=true	ugi=hdfs (auth:SIMPLE)	ip=/10.0.2.15	cmd=getfileinfo	src=/	dst=null	perm=null	proto=rpc
+2015-04-24 12:49:20,664 INFO FSNamesystem.audit: allowed=true	ugi=hdfs (auth:SIMPLE)	ip=/10.0.2.15	cmd=mkdirs	src=/tmp	dst=null	perm=hdfs:hdfs:rwxr-xr-x	proto=rpc
+2015-04-24 12:49:20,677 INFO FSNamesystem.audit: allowed=true	ugi=hdfs (auth:SIMPLE)	ip=/10.0.2.15	cmd=getfileinfo	src=/user	dst=null	perm=null	proto=rpc
+2015-04-24 12:49:20,686 INFO FSNamesystem.audit: allowed=true	ugi=hdfs (auth:SIMPLE)	ip=/10.0.2.15	cmd=mkdirs	src=/user/ambari-qa	dst=null	perm=hdfs:hdfs:rwxr-xr-x	proto=rpc
+2015-04-24 12:49:24,828 INFO FSNamesystem.audit: allowed=true	ugi=hdfs (auth:SIMPLE)	ip=/10.0.2.15	cmd=getfileinfo	src=/tmp	dst=null	perm=null	proto=rpc
+2015-04-24 12:49:24,915 INFO FSNamesystem.audit: allowed=true	ugi=hdfs (auth:SIMPLE)	ip=/10.0.2.15	cmd=setPermission	src=/tmp	dst=null	perm=hdfs:hdfs:rwxrwxrwx	proto=rpc
+2015-04-24 12:49:29,375 INFO FSNamesystem.audit: allowed=true	ugi=hdfs (auth:SIMPLE)	ip=/10.0.2.15	cmd=getfileinfo	src=/user/ambari-qa	dst=null	perm=null	proto=rpc
+2015-04-24 12:49:29,453 INFO FSNamesystem.audit: allowed=true	ugi=hdfs (auth:SIMPLE)	ip=/10.0.2.15	cmd=setPermission	src=/user/ambari-qa	dst=null	perm=hdfs:hdfs:rwxrwx---	proto=rpc
+2015-04-24 12:49:33,542 INFO FSNamesystem.audit: allowed=true	ugi=hdfs (auth:SIMPLE)	ip=/10.0.2.15	cmd=getfileinfo	src=/tmp	dst=null	perm=null	proto=rpc
+2015-04-24 12:49:37,844 INFO FSNamesystem.audit: allowed=true	ugi=hdfs (auth:SIMPLE)	ip=/10.0.2.15	cmd=getfileinfo	src=/user/ambari-qa	dst=null	perm=null	proto=rpc
+2015-04-24 12:49:37,929 INFO FSNamesystem.audit: allowed=true	ugi=hdfs (auth:SIMPLE)	ip=/10.0.2.15	cmd=setOwner	src=/user/ambari-qa	dst=null	perm=ambari-qa:hdfs:rwxrwx---	proto=rpc
+2015-04-24 12:51:31,798 INFO FSNamesystem.audit: allowed=true	ugi=hdfs (auth:SIMPLE)	ip=/10.0.2.15	cmd=getfileinfo	src=/apps/hbase/data	dst=null	perm=null	proto=rpc
+2015-04-24 12:51:31,863 INFO FSNamesystem.audit: allowed=true	ugi=hdfs (auth:SIMPLE)	ip=/10.0.2.15	cmd=getfileinfo	src=/apps/hbase/staging	dst=null	perm=null	proto=rpc

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/27513f7b/eagle-security/eagle-security-hdfs-web/src/main/java/org/apache/eagle/service/security/hdfs/resolver/HDFSCommandResolver.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-web/src/main/java/org/apache/eagle/service/security/hdfs/resolver/HDFSCommandResolver.java b/eagle-security/eagle-security-hdfs-web/src/main/java/org/apache/eagle/service/security/hdfs/resolver/HDFSCommandResolver.java
index b2a2671..a0a230a 100644
--- a/eagle-security/eagle-security-hdfs-web/src/main/java/org/apache/eagle/service/security/hdfs/resolver/HDFSCommandResolver.java
+++ b/eagle-security/eagle-security-hdfs-web/src/main/java/org/apache/eagle/service/security/hdfs/resolver/HDFSCommandResolver.java
@@ -16,6 +16,8 @@
  */
 package org.apache.eagle.service.security.hdfs.resolver;
 
+import com.typesafe.config.Config;
+import org.apache.eagle.metadata.service.ApplicationEntityService;
 import org.apache.eagle.service.alert.resolver.AttributeResolvable;
 import org.apache.eagle.service.alert.resolver.AttributeResolveException;
 import org.apache.eagle.service.alert.resolver.BadAttributeResolveRequestException;
@@ -33,6 +35,10 @@ import java.util.regex.Pattern;
 public class HDFSCommandResolver implements AttributeResolvable<GenericAttributeResolveRequest,String> {
     private final static Logger LOG = LoggerFactory.getLogger(HDFSCommandResolver.class);
 
+    public HDFSCommandResolver(ApplicationEntityService entityService, Config eagleServerConfig){
+
+    }
+    
     private final static String [] cmdStrs = {"open", "create", "append", "delete", "listfileinfo", "rename",
             "mkdirs", "listStatus", "setReplication", "setOwner", "setPermission", "setTimes", "setXAttr", "removeXAttr", "getXAttrs",
             "contentSummary", "createEncryptionZone", "checkAccess"};

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/27513f7b/eagle-security/eagle-security-hdfs-web/src/main/java/org/apache/eagle/service/security/hdfs/resolver/HDFSResourceResolver.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-web/src/main/java/org/apache/eagle/service/security/hdfs/resolver/HDFSResourceResolver.java b/eagle-security/eagle-security-hdfs-web/src/main/java/org/apache/eagle/service/security/hdfs/resolver/HDFSResourceResolver.java
index 4326c93..370d9a3 100644
--- a/eagle-security/eagle-security-hdfs-web/src/main/java/org/apache/eagle/service/security/hdfs/resolver/HDFSResourceResolver.java
+++ b/eagle-security/eagle-security-hdfs-web/src/main/java/org/apache/eagle/service/security/hdfs/resolver/HDFSResourceResolver.java
@@ -18,16 +18,19 @@ package org.apache.eagle.service.security.hdfs.resolver;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 import com.typesafe.config.Config;
-import org.apache.eagle.security.resolver.MetadataAccessConfigRepo;
+import org.apache.eagle.metadata.model.ApplicationEntity;
+import org.apache.eagle.metadata.service.ApplicationEntityService;
 import org.apache.eagle.service.alert.resolver.AttributeResolvable;
 import org.apache.eagle.service.alert.resolver.AttributeResolveException;
 import org.apache.eagle.service.alert.resolver.BadAttributeResolveRequestException;
 import org.apache.eagle.service.alert.resolver.GenericAttributeResolveRequest;
 
+import org.apache.eagle.service.security.hdfs.rest.HDFSResourceWebResource;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.slf4j.Logger;
@@ -45,6 +48,12 @@ import org.apache.eagle.service.security.hdfs.HDFSResourceConstants;
  */
 public class HDFSResourceResolver  implements AttributeResolvable<GenericAttributeResolveRequest,String> {
 	private final static Logger LOG = LoggerFactory.getLogger(HDFSResourceResolver.class);
+	private ApplicationEntityService entityService;
+
+	public HDFSResourceResolver(ApplicationEntityService entityService, Config eagleServerConfig){
+		this.entityService = entityService;
+	}
+
 	/**
 	 * HDFS Resource Resolve API
 	 *
@@ -54,10 +63,9 @@ public class HDFSResourceResolver  implements AttributeResolvable<GenericAttribu
 	public List<String> resolve(GenericAttributeResolveRequest request)
 			throws AttributeResolveException {
 		List<String> result = new ArrayList<>();
-		MetadataAccessConfigRepo repo = new MetadataAccessConfigRepo();
 		try {
-			Config config = repo.getConfig(HDFSResourceConstants.HDFS_APPLICATION, request.getSite().trim());
-			Configuration conf = repo.convert(config);
+			Map<String, Object> config = getAppConfig(request.getSite(), HDFSResourceWebResource.HDFS_APPLICATION);
+			Configuration conf = convert(config);
 			HDFSFileSystem fileSystem = new HDFSFileSystem(conf);
 			String query = request.getQuery().trim();
 			List<FileStatus> fileStatuses = null;
@@ -86,6 +94,19 @@ public class HDFSResourceResolver  implements AttributeResolvable<GenericAttribu
 		}
 	}
 
+	private Map<String, Object> getAppConfig(String site, String appType){
+		ApplicationEntity entity = entityService.getBySiteIdAndAppType(site, appType);
+		return entity.getConfiguration();
+	}
+
+	private Configuration convert(Map<String, Object> originalConfig) throws Exception {
+		Configuration config = new Configuration();
+		for (Map.Entry<String, Object> entry : originalConfig.entrySet()) {
+			config.set(entry.getKey().toString(), entry.getValue().toString());
+		}
+		return config;
+	}
+
 	/**
 	 * Validate the Passed Request Object
 	 * It should have Site Id and File Path

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/27513f7b/eagle-security/eagle-security-hdfs-web/src/main/java/org/apache/eagle/service/security/hdfs/resolver/HDFSSensitivityTypeResolver.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-web/src/main/java/org/apache/eagle/service/security/hdfs/resolver/HDFSSensitivityTypeResolver.java b/eagle-security/eagle-security-hdfs-web/src/main/java/org/apache/eagle/service/security/hdfs/resolver/HDFSSensitivityTypeResolver.java
index f1d8808..5f3ec54 100644
--- a/eagle-security/eagle-security-hdfs-web/src/main/java/org/apache/eagle/service/security/hdfs/resolver/HDFSSensitivityTypeResolver.java
+++ b/eagle-security/eagle-security-hdfs-web/src/main/java/org/apache/eagle/service/security/hdfs/resolver/HDFSSensitivityTypeResolver.java
@@ -16,6 +16,13 @@
  */
 package org.apache.eagle.service.security.hdfs.resolver;
 
+import com.google.inject.Inject;
+import com.typesafe.config.Config;
+import org.apache.eagle.metadata.service.ApplicationEntityService;
+import org.apache.eagle.security.service.HBaseSensitivityEntity;
+import org.apache.eagle.security.service.HdfsSensitivityEntity;
+import org.apache.eagle.security.service.ISecurityMetadataDAO;
+import org.apache.eagle.security.service.MetadataDaoFactory;
 import org.apache.eagle.service.alert.resolver.AttributeResolvable;
 import org.apache.eagle.service.alert.resolver.AttributeResolveException;
 import org.apache.eagle.service.alert.resolver.BadAttributeResolveRequestException;
@@ -24,16 +31,17 @@ import org.apache.eagle.service.security.hdfs.HDFSResourceSensitivityService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 import java.util.regex.Pattern;
 
 public class HDFSSensitivityTypeResolver implements AttributeResolvable<GenericAttributeResolveRequest,String> {
     private final static Logger LOG = LoggerFactory.getLogger(HDFSSensitivityTypeResolver.class);
-    private HDFSResourceSensitivityService dao = new HDFSResourceSensitivityService();
-    private Map<String, Map<String, String>> maps = dao.getAllFileSensitivityMap();
+    private ISecurityMetadataDAO dao;
 
+    @Inject
+    public HDFSSensitivityTypeResolver(ApplicationEntityService entityService, Config eagleServerConfig){
+        dao = MetadataDaoFactory.getMetadataDAO(eagleServerConfig);
+    }
 
     private final static String SENSITIVETYPE_ATTRIBUTE_RESOLVE_FORMAT_HINT = "Sensitive type should be composed of a-z, A-Z, 0-9 or -";
 
@@ -41,6 +49,7 @@ public class HDFSSensitivityTypeResolver implements AttributeResolvable<GenericA
         String query = request.getQuery().trim();
         String site = request.getSite().trim();
         List<String> res = new ArrayList<>();
+        Map<String, Map<String, String>> maps = getAllSensitivities();
         Map<String, String> map = maps.get(site);
 
         if(map == null) {
@@ -72,4 +81,16 @@ public class HDFSSensitivityTypeResolver implements AttributeResolvable<GenericA
     public Class<GenericAttributeResolveRequest> getRequestClass() {
         return GenericAttributeResolveRequest.class;
     }
+
+    private Map<String, Map<String, String>> getAllSensitivities(){
+        Map<String, Map<String, String>> all = new HashMap<>();
+        Collection<HdfsSensitivityEntity> entities = dao.listHdfsSensitivities();
+        for(HdfsSensitivityEntity entity : entities){
+            if(!all.containsKey(entity.getSite())){
+                all.put(entity.getSite(), new HashMap<>());
+            }
+            all.get(entity.getSite()).put(entity.getFiledir(), entity.getSensitivityType());
+        }
+        return all;
+    }
 }