You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by yo...@apache.org on 2016/08/13 06:46:20 UTC

[2/2] incubator-eagle git commit: EAGLE-444 convert eagle-gc app to use new app framework convert eagle-gc app to use new app framework

EAGLE-444 convert eagle-gc app to use new app framework
convert eagle-gc app to use new app framework

Author: @yonzhang2012 <yo...@gmail.com>
Closes: #339


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/98458658
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/98458658
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/98458658

Branch: refs/heads/develop
Commit: 984586580b3840f254f36e2f24a786c1eca98370
Parents: 15e1c83
Author: yonzhang <yo...@gmail.com>
Authored: Fri Aug 12 23:50:10 2016 -0700
Committer: yonzhang <yo...@gmail.com>
Committed: Fri Aug 12 23:50:10 2016 -0700

----------------------------------------------------------------------
 .../kafka/NewKafkaSourcedSpoutProvider.java     | 118 ++++++++++
 eagle-gc/pom.xml                                |   5 +
 .../org/apache/eagle/gc/GCLogApplication.java   |  79 +++++++
 .../eagle/gc/GCLogApplicationProvider.java      |  34 +++
 .../org/apache/eagle/gc/GCLogProcessorMain.java |  42 ----
 .../gc/executor/GCLogAnalysorExecutor.java      |  77 -------
 .../eagle/gc/executor/GCLogAnalyzerBolt.java    |  85 +++++++
 .../gc/executor/GCMetricGeneratorBolt.java      | 125 +++++++++++
 .../gc/executor/GCMetricGeneratorExecutor.java  | 108 ---------
 .../eagle/gc/spout/GCLogDeserializer.java       |  39 ----
 ....security.hbase.GCLogApplicationProvider.xml | 221 +++++++++++++++++++
 ...org.apache.eagle.app.spi.ApplicationProvider |  37 ++++
 eagle-gc/src/main/resources/application.conf    |  48 ++--
 eagle-gc/src/main/resources/gc-storm.yaml       |  18 --
 eagle-gc/src/main/resources/log4j.properties    |   3 -
 eagle-hadoop-metric/pom.xml                     |   5 +
 .../hadoop/metric/HadoopJmxApplication.java     |  42 ++++
 .../hadoop/metric/HadoopJmxMetricMonitor.java   |  11 +-
 .../HadoopJmxMetricMonitoringTopology.java      |  37 ----
 .../kafka/EagleMetricCollectorApplication.java  | 148 +++++++++++++
 .../metric/kafka/EagleMetricCollectorMain.java  | 122 ----------
 .../kafka/KafkaMessageDistributionBolt.java     | 114 ++++++++++
 .../kafka/KafkaMessageDistributionExecutor.java | 103 ---------
 .../topo/NewKafkaSourcedSpoutProvider.java      | 115 ----------
 .../hbase/HBaseAuditLogAppProvider.java         |   7 -
 .../hbase/HBaseAuditLogApplication.java         |   3 +-
 .../AbstractHdfsAuditLogApplication.java        |   3 +-
 .../securitylog/HdfsAuthLogMonitoringMain.java  |   3 +-
 .../eagle-security-oozie-auditlog/pom.xml       |   5 +
 .../oozie/parse/OozieAuditLogApplication.java   |  75 +++++++
 .../oozie/parse/OozieAuditLogParserBolt.java    |  86 ++++++++
 .../oozie/parse/OozieAuditLogProcessorMain.java |  33 ---
 .../OozieResourceSensitivityDataJoinBolt.java   | 108 +++++++++
 ...ozieResourceSensitivityDataJoinExecutor.java |  89 --------
 34 files changed, 1315 insertions(+), 833 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/98458658/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/NewKafkaSourcedSpoutProvider.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/NewKafkaSourcedSpoutProvider.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/NewKafkaSourcedSpoutProvider.java
new file mode 100644
index 0000000..d764ac1
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/NewKafkaSourcedSpoutProvider.java
@@ -0,0 +1,118 @@
+/*
+ *
+ *  *
+ *  *    Licensed to the Apache Software Foundation (ASF) under one or more
+ *  *    contributor license agreements.  See the NOTICE file distributed with
+ *  *    this work for additional information regarding copyright ownership.
+ *  *    The ASF licenses this file to You under the Apache License, Version 2.0
+ *  *    (the "License"); you may not use this file except in compliance with
+ *  *    the License.  You may obtain a copy of the License at
+ *  *
+ *  *       http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  *    Unless required by applicable law or agreed to in writing, software
+ *  *    distributed under the License is distributed on an "AS IS" BASIS,
+ *  *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  *    See the License for the specific language governing permissions and
+ *  *    limitations under the License.
+ *  *
+ *
+ */
+
+package org.apache.eagle.dataproc.impl.storm.kafka;
+
+import backtype.storm.spout.Scheme;
+import backtype.storm.spout.SchemeAsMultiScheme;
+import backtype.storm.topology.base.BaseRichSpout;
+import com.typesafe.config.Config;
+import org.apache.eagle.dataproc.impl.storm.StormSpoutProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import storm.kafka.BrokerHosts;
+import storm.kafka.KafkaSpout;
+import storm.kafka.SpoutConfig;
+import storm.kafka.ZkHosts;
+
+import java.util.Arrays;
+
+/**
+ * Since 6/8/16.
+ */
+public class NewKafkaSourcedSpoutProvider implements StormSpoutProvider {
+    private final static Logger LOG = LoggerFactory.getLogger(NewKafkaSourcedSpoutProvider.class);
+
+    private String configPrefix = "dataSourceConfig";
+
+    public NewKafkaSourcedSpoutProvider(){}
+
+    public NewKafkaSourcedSpoutProvider(String prefix){
+        this.configPrefix = prefix;
+    }
+
+    @Override
+    public BaseRichSpout getSpout(Config config){
+        Config context = config;
+        if(this.configPrefix!=null) context = config.getConfig(configPrefix);
+        // Kafka topic
+        String topic = context.getString("topic");
+        // Kafka consumer group id
+        String groupId = context.getString("consumerGroupId");
+        // Kafka fetch size
+        int fetchSize = context.getInt("fetchSize");
+        // Kafka broker zk connection
+        String zkConnString = context.getString("zkConnection");
+        // transaction zkRoot
+        String zkRoot = context.getString("transactionZKRoot");
+
+        LOG.info(String.format("Use topic id: %s",topic));
+
+        String brokerZkPath = null;
+        if(context.hasPath("brokerZkPath")) {
+            brokerZkPath = context.getString("brokerZkPath");
+        }
+
+        BrokerHosts hosts;
+        if(brokerZkPath == null) {
+            hosts = new ZkHosts(zkConnString);
+        } else {
+            hosts = new ZkHosts(zkConnString, brokerZkPath);
+        }
+
+        SpoutConfig spoutConfig = new SpoutConfig(hosts,
+                topic,
+                zkRoot + "/" + topic,
+                groupId);
+
+        // transaction zkServers
+        spoutConfig.zkServers = Arrays.asList(context.getString("transactionZKServers").split(","));
+        // transaction zkPort
+        spoutConfig.zkPort = context.getInt("transactionZKPort");
+        // transaction update interval
+        spoutConfig.stateUpdateIntervalMs = context.getLong("transactionStateUpdateMS");
+        // Kafka fetch size
+        spoutConfig.fetchSizeBytes = fetchSize;
+        // "startOffsetTime" is for test usage, prod should not use this
+        if (context.hasPath("startOffsetTime")) {
+            spoutConfig.startOffsetTime = context.getInt("startOffsetTime");
+        }
+        // "forceFromStart" is for test usage, prod should not use this
+        if (context.hasPath("forceFromStart")) {
+            spoutConfig.startOffsetTime = kafka.api.OffsetRequest.EarliestTime();
+        }
+
+        if (context.hasPath("schemeCls")) {
+            try {
+                Scheme s = (Scheme)Class.forName(context.getString("schemeCls")).newInstance();
+                spoutConfig.scheme = new SchemeAsMultiScheme(s);
+            }catch(Exception ex){
+                LOG.error("error instantiating scheme object");
+                throw new IllegalStateException(ex);
+            }
+        }else{
+            String err = "schemeCls must be present";
+            LOG.error(err);
+            throw new IllegalStateException(err);
+        }
+        return new KafkaSpout(spoutConfig);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/98458658/eagle-gc/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-gc/pom.xml b/eagle-gc/pom.xml
index 66400cb..cb31156 100644
--- a/eagle-gc/pom.xml
+++ b/eagle-gc/pom.xml
@@ -46,5 +46,10 @@
       <artifactId>eagle-alert-process</artifactId>
       <version>${project.version}</version>
     </dependency>
+    <dependency>
+      <groupId>org.apache.eagle</groupId>
+      <artifactId>eagle-app-base</artifactId>
+      <version>${project.version}</version>
+    </dependency>
   </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/98458658/eagle-gc/src/main/java/org/apache/eagle/gc/GCLogApplication.java
----------------------------------------------------------------------
diff --git a/eagle-gc/src/main/java/org/apache/eagle/gc/GCLogApplication.java b/eagle-gc/src/main/java/org/apache/eagle/gc/GCLogApplication.java
new file mode 100644
index 0000000..86d8bc4
--- /dev/null
+++ b/eagle-gc/src/main/java/org/apache/eagle/gc/GCLogApplication.java
@@ -0,0 +1,79 @@
+/*
+ *
+ *    Licensed to the Apache Software Foundation (ASF) under one or more
+ *    contributor license agreements.  See the NOTICE file distributed with
+ *    this work for additional information regarding copyright ownership.
+ *    The ASF licenses this file to You under the Apache License, Version 2.0
+ *    (the "License"); you may not use this file except in compliance with
+ *    the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *    Unless required by applicable law or agreed to in writing, software
+ *    distributed under the License is distributed on an "AS IS" BASIS,
+ *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *    See the License for the specific language governing permissions and
+ *    limitations under the License.
+ *
+ */
+
+
+package org.apache.eagle.gc;
+
+import backtype.storm.generated.StormTopology;
+import backtype.storm.topology.BoltDeclarer;
+import backtype.storm.topology.IRichSpout;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.tuple.Fields;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import org.apache.eagle.app.StormApplication;
+import org.apache.eagle.app.environment.impl.StormEnvironment;
+import org.apache.eagle.app.sink.StormStreamSink;
+import org.apache.eagle.dataproc.impl.storm.kafka.NewKafkaSourcedSpoutProvider;
+import org.apache.eagle.gc.executor.GCLogAnalyzerBolt;
+import org.apache.eagle.gc.executor.GCMetricGeneratorBolt;
+import storm.kafka.StringScheme;
+
+/**
+ * Since 8/12/16.
+ */
+public class GCLogApplication extends StormApplication{
+    public final static String SPOUT_TASK_NUM = "topology.numOfSpoutTasks";
+    public final static String ANALYZER_TASK_NUM = "topology.numOfAnalyzerTasks";
+    public final static String GENERATOR_TASK_NUM = "topology.numOfGeneratorTasks";
+    public final static String SINK_TASK_NUM = "topology.numOfSinkTasks";
+
+    @Override
+    public StormTopology execute(Config config, StormEnvironment environment) {
+        TopologyBuilder builder = new TopologyBuilder();
+        NewKafkaSourcedSpoutProvider provider = new NewKafkaSourcedSpoutProvider();
+        IRichSpout spout = provider.getSpout(config);
+
+        int numOfSpoutTasks = config.getInt(SPOUT_TASK_NUM);
+        int numOfAnalyzerTasks = config.getInt(ANALYZER_TASK_NUM);
+        int numOfGeneratorTasks = config.getInt(GENERATOR_TASK_NUM);
+        int numOfSinkTasks = config.getInt(SINK_TASK_NUM);
+
+        builder.setSpout("ingest", spout, numOfSpoutTasks);
+
+        GCLogAnalyzerBolt bolt = new GCLogAnalyzerBolt();
+        BoltDeclarer boltDeclarer = builder.setBolt("analyzerBolt", bolt, numOfAnalyzerTasks);
+        boltDeclarer.fieldsGrouping("ingest", new Fields(StringScheme.STRING_SCHEME_KEY));
+
+        GCMetricGeneratorBolt generatorBolt = new GCMetricGeneratorBolt(config);
+        BoltDeclarer joinBoltDeclarer = builder.setBolt("generatorBolt", generatorBolt, numOfGeneratorTasks);
+        joinBoltDeclarer.fieldsGrouping("analyzerBolt", new Fields("f1"));
+
+        StormStreamSink sinkBolt = environment.getStreamSink("gc_log_stream",config);
+        BoltDeclarer kafkaBoltDeclarer = builder.setBolt("kafkaSink", sinkBolt, numOfSinkTasks);
+        kafkaBoltDeclarer.fieldsGrouping("generatorBolt", new Fields("f1"));
+        return builder.createTopology();
+    }
+
+    public static void main(String[] args){
+        Config config = ConfigFactory.load();
+        GCLogApplication app = new GCLogApplication();
+        app.run(config);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/98458658/eagle-gc/src/main/java/org/apache/eagle/gc/GCLogApplicationProvider.java
----------------------------------------------------------------------
diff --git a/eagle-gc/src/main/java/org/apache/eagle/gc/GCLogApplicationProvider.java b/eagle-gc/src/main/java/org/apache/eagle/gc/GCLogApplicationProvider.java
new file mode 100644
index 0000000..eae170a
--- /dev/null
+++ b/eagle-gc/src/main/java/org/apache/eagle/gc/GCLogApplicationProvider.java
@@ -0,0 +1,34 @@
+/*
+ *
+ *  *
+ *  *    Licensed to the Apache Software Foundation (ASF) under one or more
+ *  *    contributor license agreements.  See the NOTICE file distributed with
+ *  *    this work for additional information regarding copyright ownership.
+ *  *    The ASF licenses this file to You under the Apache License, Version 2.0
+ *  *    (the "License"); you may not use this file except in compliance with
+ *  *    the License.  You may obtain a copy of the License at
+ *  *
+ *  *       http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  *    Unless required by applicable law or agreed to in writing, software
+ *  *    distributed under the License is distributed on an "AS IS" BASIS,
+ *  *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  *    See the License for the specific language governing permissions and
+ *  *    limitations under the License.
+ *  *
+ *
+ */
+
+package org.apache.eagle.gc;
+
+import org.apache.eagle.app.spi.AbstractApplicationProvider;
+
+/**
+ * Since 8/12/16.
+ */
+public class GCLogApplicationProvider extends AbstractApplicationProvider<GCLogApplication> {
+    @Override
+    public GCLogApplication getApplication() {
+        return new GCLogApplication();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/98458658/eagle-gc/src/main/java/org/apache/eagle/gc/GCLogProcessorMain.java
----------------------------------------------------------------------
diff --git a/eagle-gc/src/main/java/org/apache/eagle/gc/GCLogProcessorMain.java b/eagle-gc/src/main/java/org/apache/eagle/gc/GCLogProcessorMain.java
deleted file mode 100644
index 278a5bc..0000000
--- a/eagle-gc/src/main/java/org/apache/eagle/gc/GCLogProcessorMain.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- *
- *    Licensed to the Apache Software Foundation (ASF) under one or more
- *    contributor license agreements.  See the NOTICE file distributed with
- *    this work for additional information regarding copyright ownership.
- *    The ASF licenses this file to You under the Apache License, Version 2.0
- *    (the "License"); you may not use this file except in compliance with
- *    the License.  You may obtain a copy of the License at
- *
- *       http://www.apache.org/licenses/LICENSE-2.0
- *
- *    Unless required by applicable law or agreed to in writing, software
- *    distributed under the License is distributed on an "AS IS" BASIS,
- *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *    See the License for the specific language governing permissions and
- *    limitations under the License.
- *
- */
-
-package org.apache.eagle.gc;
-
-import com.typesafe.config.Config;
-import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSourcedSpoutProvider;
-import org.apache.eagle.datastream.ExecutionEnvironments;
-import org.apache.eagle.datastream.storm.StormExecutionEnvironment;
-import org.apache.eagle.gc.executor.GCLogAnalysorExecutor;
-import org.apache.eagle.gc.executor.GCMetricGeneratorExecutor;
-
-public class GCLogProcessorMain {
-
-    public static void main(String[] args) throws Exception{
-        StormExecutionEnvironment env = ExecutionEnvironments.getStorm(args);
-        Config config = env.getConfig();
-        KafkaSourcedSpoutProvider provider = new KafkaSourcedSpoutProvider();
-        GCLogAnalysorExecutor logAnalysor = new GCLogAnalysorExecutor();
-        env.fromSpout(provider.getSpout(config)).withOutputFields(1).nameAs("kafkaMsgConsumer")
-                                    .flatMap(logAnalysor)
-                                    .flatMap(new GCMetricGeneratorExecutor())
-                                    .alertWithConsumer("NNGCLogStream", "NNGCAlert");
-        env.execute();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/98458658/eagle-gc/src/main/java/org/apache/eagle/gc/executor/GCLogAnalysorExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-gc/src/main/java/org/apache/eagle/gc/executor/GCLogAnalysorExecutor.java b/eagle-gc/src/main/java/org/apache/eagle/gc/executor/GCLogAnalysorExecutor.java
deleted file mode 100644
index e74bc44..0000000
--- a/eagle-gc/src/main/java/org/apache/eagle/gc/executor/GCLogAnalysorExecutor.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- *
- *    Licensed to the Apache Software Foundation (ASF) under one or more
- *    contributor license agreements.  See the NOTICE file distributed with
- *    this work for additional information regarding copyright ownership.
- *    The ASF licenses this file to You under the Apache License, Version 2.0
- *    (the "License"); you may not use this file except in compliance with
- *    the License.  You may obtain a copy of the License at
- *
- *       http://www.apache.org/licenses/LICENSE-2.0
- *
- *    Unless required by applicable law or agreed to in writing, software
- *    distributed under the License is distributed on an "AS IS" BASIS,
- *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *    See the License for the specific language governing permissions and
- *    limitations under the License.
- *
- */
-
-package org.apache.eagle.gc.executor;
-
-import com.typesafe.config.Config;
-import org.apache.eagle.datastream.Collector;
-import org.apache.eagle.datastream.JavaStormStreamExecutor2;
-import org.apache.eagle.gc.model.GCPausedEvent;
-import org.apache.eagle.gc.stream.GCStreamBuilder;
-import org.apache.eagle.gc.parser.exception.IgnoredLogFormatException;
-import org.apache.eagle.gc.parser.exception.UnrecognizedLogFormatException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.Tuple2;
-
-import java.util.List;
-import java.util.Map;
-
-public class GCLogAnalysorExecutor extends JavaStormStreamExecutor2<String, Map> {
-
-    public final static Logger LOG = LoggerFactory.getLogger(GCLogAnalysorExecutor.class);
-
-    private Config config;
-
-    private long previousLogTime;
-
-    @Override
-    public void prepareConfig(Config config) {
-        this.config = config;
-    }
-
-    @Override
-    public void init() {
-
-    }
-
-    @Override
-    public void flatMap(List<Object> input, Collector<Tuple2<String, Map>> collector) {
-        String log = (String)input.get(0);
-        GCStreamBuilder builder = new GCStreamBuilder();
-        try {
-            GCPausedEvent pauseEvent = builder.build(log);
-            // Because some gc log like concurrent mode failure may miss timestamp info, so we set the previous log's timestamp for it
-            if (pauseEvent.getTimestamp() == 0) {
-                pauseEvent.setTimestamp(previousLogTime);
-            }
-            previousLogTime = pauseEvent.getTimestamp();
-            collector.collect(new Tuple2("GCLog", pauseEvent.toMap()));
-        }
-        catch (IgnoredLogFormatException ex1) {
-            //DO nothing
-        }
-        catch (UnrecognizedLogFormatException ex2) {
-            LOG.warn(ex2.getMessage());
-        }
-        catch (Exception ex3) {
-            LOG.error("Got an exception when parsing log: ", ex3);
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/98458658/eagle-gc/src/main/java/org/apache/eagle/gc/executor/GCLogAnalyzerBolt.java
----------------------------------------------------------------------
diff --git a/eagle-gc/src/main/java/org/apache/eagle/gc/executor/GCLogAnalyzerBolt.java b/eagle-gc/src/main/java/org/apache/eagle/gc/executor/GCLogAnalyzerBolt.java
new file mode 100644
index 0000000..59720a3
--- /dev/null
+++ b/eagle-gc/src/main/java/org/apache/eagle/gc/executor/GCLogAnalyzerBolt.java
@@ -0,0 +1,85 @@
+/*
+ *
+ *    Licensed to the Apache Software Foundation (ASF) under one or more
+ *    contributor license agreements.  See the NOTICE file distributed with
+ *    this work for additional information regarding copyright ownership.
+ *    The ASF licenses this file to You under the Apache License, Version 2.0
+ *    (the "License"); you may not use this file except in compliance with
+ *    the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *    Unless required by applicable law or agreed to in writing, software
+ *    distributed under the License is distributed on an "AS IS" BASIS,
+ *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *    See the License for the specific language governing permissions and
+ *    limitations under the License.
+ *
+ */
+
+package org.apache.eagle.gc.executor;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import com.typesafe.config.Config;
+import org.apache.eagle.datastream.Collector;
+import org.apache.eagle.datastream.JavaStormStreamExecutor2;
+import org.apache.eagle.gc.model.GCPausedEvent;
+import org.apache.eagle.gc.stream.GCStreamBuilder;
+import org.apache.eagle.gc.parser.exception.IgnoredLogFormatException;
+import org.apache.eagle.gc.parser.exception.UnrecognizedLogFormatException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Tuple2;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+public class GCLogAnalyzerBolt extends BaseRichBolt {
+    public final static Logger LOG = LoggerFactory.getLogger(GCLogAnalyzerBolt.class);
+    private OutputCollector collector;
+    private long previousLogTime;
+
+    @Override
+    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+        this.collector = collector;
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        declarer.declare(new Fields("f1", "f2"));
+    }
+
+
+
+    @Override
+    public void execute(Tuple input) {
+        String log = input.getString(0);
+        GCStreamBuilder builder = new GCStreamBuilder();
+        try {
+            GCPausedEvent pauseEvent = builder.build(log);
+            // Because some gc log like concurrent mode failure may miss timestamp info, so we set the previous log's timestamp for it
+            if (pauseEvent.getTimestamp() == 0) {
+                pauseEvent.setTimestamp(previousLogTime);
+            }
+            previousLogTime = pauseEvent.getTimestamp();
+            collector.emit(Arrays.asList("GCLog", pauseEvent.toMap()));
+        }
+        catch (IgnoredLogFormatException ex1) {
+            //DO nothing
+        }
+        catch (UnrecognizedLogFormatException ex2) {
+            LOG.warn(ex2.getMessage());
+        }
+        catch (Exception ex3) {
+            LOG.error("Got an exception when parsing log: ", ex3);
+        }finally {
+            collector.ack(input);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/98458658/eagle-gc/src/main/java/org/apache/eagle/gc/executor/GCMetricGeneratorBolt.java
----------------------------------------------------------------------
diff --git a/eagle-gc/src/main/java/org/apache/eagle/gc/executor/GCMetricGeneratorBolt.java b/eagle-gc/src/main/java/org/apache/eagle/gc/executor/GCMetricGeneratorBolt.java
new file mode 100644
index 0000000..2d1023b
--- /dev/null
+++ b/eagle-gc/src/main/java/org/apache/eagle/gc/executor/GCMetricGeneratorBolt.java
@@ -0,0 +1,125 @@
+/*
+ *
+ *    Licensed to the Apache Software Foundation (ASF) under one or more
+ *    contributor license agreements.  See the NOTICE file distributed with
+ *    this work for additional information regarding copyright ownership.
+ *    The ASF licenses this file to You under the Apache License, Version 2.0
+ *    (the "License"); you may not use this file except in compliance with
+ *    the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *    Unless required by applicable law or agreed to in writing, software
+ *    distributed under the License is distributed on an "AS IS" BASIS,
+ *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *    See the License for the specific language governing permissions and
+ *    limitations under the License.
+ *
+ */
+
+package org.apache.eagle.gc.executor;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import com.codahale.metrics.MetricRegistry;
+import com.typesafe.config.Config;
+import org.apache.eagle.common.config.EagleConfigConstants;
+import org.apache.eagle.common.config.EagleConfigHelper;
+import org.apache.eagle.datastream.*;
+import org.apache.eagle.gc.common.GCConstants;
+import org.apache.eagle.gc.model.GCPausedEvent;
+import org.apache.eagle.metric.reportor.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Tuple2;
+
+import java.util.*;
+
+public class GCMetricGeneratorBolt extends BaseRichBolt {
+
+    public final static Logger LOG = LoggerFactory.getLogger(GCMetricGeneratorBolt.class);
+    private Config config;
+    private MetricRegistry registry;
+    private String gcPausedTimeMetricName;
+    private String youngHeapUsageMetricName;
+    private String tenuredHeapUsageMetricName;
+    private String totalHeapUsageMetricName;
+    private Map<String, String> dimensions;
+    private List<EagleMetric> metrics = new ArrayList<>();
+
+    private EagleServiceReporterMetricListener listener;
+    private OutputCollector collector;
+
+    public GCMetricGeneratorBolt(Config config){
+        this.config = config;
+    }
+
+    @Override
+    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+        registry = new MetricRegistry();
+        String host = EagleConfigHelper.getServiceHost(config);
+        int port = EagleConfigHelper.getServicePort(config);
+        String username = EagleConfigHelper.getServiceUser(config);
+        String password = EagleConfigHelper.getServicePassword(config);
+        listener = new EagleServiceReporterMetricListener(host, port, username, password);
+        dimensions = new HashMap<>();
+        dimensions.put(EagleConfigConstants.SITE, EagleConfigHelper.getSite(config));
+        dimensions.put(EagleConfigConstants.APPLICATION, EagleConfigHelper.getApplication(config));
+        gcPausedTimeMetricName = MetricKeyCodeDecoder.codeMetricKey(GCConstants.GC_PAUSE_TIME_METRIC_NAME, dimensions);
+
+        this.collector = collector;
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        declarer.declare(new Fields("f1", "f2"));
+    }
+    public void registerMetricIfMissing(String metricName, EagleMetric metric) {
+        if (registry.getMetrics().get(metricName) == null) {
+            metric.registerListener(listener);
+            registry.register(metricName, metric);
+        }
+    }
+    @Override
+    public void execute(Tuple input) {
+        try {
+            Map<String, Object> map = (Map<String, Object>) input.getValue(1);
+            GCPausedEvent event = new GCPausedEvent(map);
+            // Generate gc paused time metric
+            EagleCounterMetric metric = new EagleCounterMetric(event.getTimestamp(), gcPausedTimeMetricName, event.getPausedGCTimeSec(), GCConstants.GC_PAUSE_TIME_METRIC_GRANULARITY);
+            registerMetricIfMissing(gcPausedTimeMetricName, metric);
+
+            // Generate young heap paused time metric
+            if (event.isYoungAreaGCed()) {
+                youngHeapUsageMetricName = MetricKeyCodeDecoder.codeTSMetricKey(event.getTimestamp(), GCConstants.GC_YOUNG_MEMORY_METRIC_NAME, dimensions);
+                EagleGaugeMetric metric2 = new EagleGaugeMetric(event.getTimestamp(), youngHeapUsageMetricName, event.getYoungUsedHeapK());
+                metrics.add(metric2);
+            }
+
+            // Generate tenured heap paused time metric
+            if (event.isTenuredAreaGCed()) {
+                tenuredHeapUsageMetricName = MetricKeyCodeDecoder.codeTSMetricKey(event.getTimestamp(), GCConstants.GC_TENURED_MEMORY_METRIC_NAME, dimensions);
+                EagleGaugeMetric metric3 = new EagleGaugeMetric(event.getTimestamp(), tenuredHeapUsageMetricName, event.getTenuredUsedHeapK());
+                metrics.add(metric3);
+            }
+
+            // Generate total heap paused time metric
+            if (event.isTotalHeapUsageAvailable()) {
+                totalHeapUsageMetricName = MetricKeyCodeDecoder.codeTSMetricKey(event.getTimestamp(), GCConstants.GC_TOTAL_MEMORY_METRIC_NAME, dimensions);
+                EagleGaugeMetric metric4 = new EagleGaugeMetric(event.getTimestamp(), totalHeapUsageMetricName, event.getUsedTotalHeapK());
+                metrics.add(metric4);
+            }
+            listener.onMetricFlushed(metrics);
+            metrics.clear();
+            collector.emit(Arrays.asList(input.getValue(0), input.getValue(1)));
+        }catch(Exception ex){
+            LOG.error("error in gc metric generating", ex);
+        }finally {
+            collector.ack(input);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/98458658/eagle-gc/src/main/java/org/apache/eagle/gc/executor/GCMetricGeneratorExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-gc/src/main/java/org/apache/eagle/gc/executor/GCMetricGeneratorExecutor.java b/eagle-gc/src/main/java/org/apache/eagle/gc/executor/GCMetricGeneratorExecutor.java
deleted file mode 100644
index ebc5a6c..0000000
--- a/eagle-gc/src/main/java/org/apache/eagle/gc/executor/GCMetricGeneratorExecutor.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- *
- *    Licensed to the Apache Software Foundation (ASF) under one or more
- *    contributor license agreements.  See the NOTICE file distributed with
- *    this work for additional information regarding copyright ownership.
- *    The ASF licenses this file to You under the Apache License, Version 2.0
- *    (the "License"); you may not use this file except in compliance with
- *    the License.  You may obtain a copy of the License at
- *
- *       http://www.apache.org/licenses/LICENSE-2.0
- *
- *    Unless required by applicable law or agreed to in writing, software
- *    distributed under the License is distributed on an "AS IS" BASIS,
- *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *    See the License for the specific language governing permissions and
- *    limitations under the License.
- *
- */
-
-package org.apache.eagle.gc.executor;
-
-import com.codahale.metrics.MetricRegistry;
-import com.typesafe.config.Config;
-import org.apache.eagle.common.config.EagleConfigConstants;
-import org.apache.eagle.common.config.EagleConfigHelper;
-import org.apache.eagle.datastream.*;
-import org.apache.eagle.gc.common.GCConstants;
-import org.apache.eagle.gc.model.GCPausedEvent;
-import org.apache.eagle.metric.reportor.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.Tuple2;
-
-import java.util.*;
-
-public class GCMetricGeneratorExecutor extends JavaStormStreamExecutor2<String, Map> {
-
-    public final static Logger LOG = LoggerFactory.getLogger(GCMetricGeneratorExecutor.class);
-    private Config config;
-    private MetricRegistry registry;
-    private String gcPausedTimeMetricName;
-    private String youngHeapUsageMetricName;
-    private String tenuredHeapUsageMetricName;
-    private String totalHeapUsageMetricName;
-    private Map<String, String> dimensions;
-    private List<EagleMetric> metrics = new ArrayList<>();
-
-    private EagleServiceReporterMetricListener listener;
-
-    @Override
-    public void prepareConfig(Config config) {
-        this.config = config;
-    }
-
-    @Override
-    public void init() {
-        registry = new MetricRegistry();
-        String host = EagleConfigHelper.getServiceHost(config);
-        int port = EagleConfigHelper.getServicePort(config);
-        String username = EagleConfigHelper.getServiceUser(config);
-        String password = EagleConfigHelper.getServicePassword(config);
-        listener = new EagleServiceReporterMetricListener(host, port, username, password);
-        dimensions = new HashMap<>();
-        dimensions.put(EagleConfigConstants.SITE, EagleConfigHelper.getSite(config));
-        dimensions.put(EagleConfigConstants.APPLICATION, EagleConfigHelper.getApplication(config));
-        gcPausedTimeMetricName = MetricKeyCodeDecoder.codeMetricKey(GCConstants.GC_PAUSE_TIME_METRIC_NAME, dimensions);
-    }
-
-    public void registerMetricIfMissing(String metricName, EagleMetric metric) {
-        if (registry.getMetrics().get(metricName) == null) {
-            metric.registerListener(listener);
-            registry.register(metricName, metric);
-        }
-    }
-
-    @Override
-    public void flatMap(List<Object> input, Collector<Tuple2<String, Map>> collector) {
-        Map<String, Object> map = (Map<String, Object>) input.get(1);
-        GCPausedEvent event = new GCPausedEvent(map);
-        // Generate gc paused time metric
-        EagleCounterMetric metric = new EagleCounterMetric(event.getTimestamp(), gcPausedTimeMetricName, event.getPausedGCTimeSec(), GCConstants.GC_PAUSE_TIME_METRIC_GRANULARITY);
-        registerMetricIfMissing(gcPausedTimeMetricName, metric);
-
-        // Generate young heap paused time metric
-        if (event.isYoungAreaGCed()) {
-            youngHeapUsageMetricName = MetricKeyCodeDecoder.codeTSMetricKey(event.getTimestamp(), GCConstants.GC_YOUNG_MEMORY_METRIC_NAME, dimensions);
-            EagleGaugeMetric metric2 = new EagleGaugeMetric(event.getTimestamp(), youngHeapUsageMetricName, event.getYoungUsedHeapK());
-            metrics.add(metric2);
-        }
-
-        // Generate tenured heap paused time metric
-        if (event.isTenuredAreaGCed()) {
-            tenuredHeapUsageMetricName = MetricKeyCodeDecoder.codeTSMetricKey(event.getTimestamp(), GCConstants.GC_TENURED_MEMORY_METRIC_NAME, dimensions);
-            EagleGaugeMetric metric3 = new EagleGaugeMetric(event.getTimestamp(), tenuredHeapUsageMetricName, event.getTenuredUsedHeapK());
-            metrics.add(metric3);
-        }
-
-        // Generate total heap paused time metric
-        if (event.isTotalHeapUsageAvailable()) {
-            totalHeapUsageMetricName = MetricKeyCodeDecoder.codeTSMetricKey(event.getTimestamp(), GCConstants.GC_TOTAL_MEMORY_METRIC_NAME, dimensions);
-            EagleGaugeMetric metric4 = new EagleGaugeMetric(event.getTimestamp(), totalHeapUsageMetricName, event.getUsedTotalHeapK());
-            metrics.add(metric4);
-        }
-        listener.onMetricFlushed(metrics);
-        metrics.clear();
-        collector.collect(new Tuple2(input.get(0), input.get(1)));
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/98458658/eagle-gc/src/main/java/org/apache/eagle/gc/spout/GCLogDeserializer.java
----------------------------------------------------------------------
diff --git a/eagle-gc/src/main/java/org/apache/eagle/gc/spout/GCLogDeserializer.java b/eagle-gc/src/main/java/org/apache/eagle/gc/spout/GCLogDeserializer.java
deleted file mode 100644
index f1b3105..0000000
--- a/eagle-gc/src/main/java/org/apache/eagle/gc/spout/GCLogDeserializer.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- *
- *    Licensed to the Apache Software Foundation (ASF) under one or more
- *    contributor license agreements.  See the NOTICE file distributed with
- *    this work for additional information regarding copyright ownership.
- *    The ASF licenses this file to You under the Apache License, Version 2.0
- *    (the "License"); you may not use this file except in compliance with
- *    the License.  You may obtain a copy of the License at
- *
- *       http://www.apache.org/licenses/LICENSE-2.0
- *
- *    Unless required by applicable law or agreed to in writing, software
- *    distributed under the License is distributed on an "AS IS" BASIS,
- *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *    See the License for the specific language governing permissions and
- *    limitations under the License.
- *
- */
-
-package org.apache.eagle.gc.spout;
-
-import org.apache.eagle.dataproc.impl.storm.kafka.SpoutKafkaMessageDeserializer;
-import org.apache.hadoop.hbase.util.Bytes;
-
-import java.util.Properties;
-
-public class GCLogDeserializer implements SpoutKafkaMessageDeserializer {
-
-    private Properties props;
-
-    public  GCLogDeserializer(Properties props){
-        this.props = props;
-    }
-
-    @Override
-    public Object deserialize(byte[] arg0) {
-        return Bytes.toString(arg0);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/98458658/eagle-gc/src/main/resources/META-INF/providers/org.apache.eagle.security.hbase.GCLogApplicationProvider.xml
----------------------------------------------------------------------
diff --git a/eagle-gc/src/main/resources/META-INF/providers/org.apache.eagle.security.hbase.GCLogApplicationProvider.xml b/eagle-gc/src/main/resources/META-INF/providers/org.apache.eagle.security.hbase.GCLogApplicationProvider.xml
new file mode 100644
index 0000000..213132d
--- /dev/null
+++ b/eagle-gc/src/main/resources/META-INF/providers/org.apache.eagle.security.hbase.GCLogApplicationProvider.xml
@@ -0,0 +1,221 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+  ~ /*
+  ~  *
+  ~  *    Licensed to the Apache Software Foundation (ASF) under one or more
+  ~  *    contributor license agreements.  See the NOTICE file distributed with
+  ~  *    this work for additional information regarding copyright ownership.
+  ~  *    The ASF licenses this file to You under the Apache License, Version 2.0
+  ~  *    (the "License"); you may not use this file except in compliance with
+  ~  *    the License.  You may obtain a copy of the License at
+  ~  *
+  ~  *       http://www.apache.org/licenses/LICENSE-2.0
+  ~  *
+  ~  *    Unless required by applicable law or agreed to in writing, software
+  ~  *    distributed under the License is distributed on an "AS IS" BASIS,
+  ~  *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~  *    See the License for the specific language governing permissions and
+  ~  *    limitations under the License.
+  ~  *
+  ~  */
+  -->
+
+<application>
+    <type>GCLogApplication</type>
+    <name>GC Log Monitoring Application</name>
+    <version>0.5.0-incubating</version>
+    <appClass>org.apache.eagle.gc.GCLogApplication</appClass>
+    <viewPath>/apps/example</viewPath>
+    <configuration>
+        <property>
+            <name>dataSourceConfig.topic</name>
+            <displayName>dataSourceConfig.topic</displayName>
+            <value>gc_log</value>
+            <description>data source topic</description>
+        </property>
+        <property>
+            <name>dataSourceConfig.zkConnection</name>
+            <displayName>dataSourceConfig.zkConnection</displayName>
+            <value>server.eagle.apache.org:2181</value>
+            <description>zk connection</description>
+        </property>
+        <property>
+            <name>dataSourceConfig.zkConnectionTimeoutMS</name>
+            <displayName>dataSourceConfig.zkConnectionTimeoutMS</displayName>
+            <value>15000</value>
+            <description>zk connection timeout in milliseconds</description>
+        </property>
+        <property>
+            <name>dataSourceConfig.fetchSize</name>
+            <displayName>dataSourceConfig.fetchSize</displayName>
+            <value>1048586</value>
+            <description>kafka fetch size</description>
+        </property>
+        <property>
+            <name>dataSourceConfig.transactionZKServers</name>
+            <displayName>dataSourceConfig.transactionZKServers</displayName>
+            <value>server.eagle.apache.org</value>
+            <description>zookeeper server for offset transaction</description>
+        </property>
+        <property>
+            <name>dataSourceConfig.transactionZKPort</name>
+            <displayName>dataSourceConfig.transactionZKPort</displayName>
+            <value>2181</value>
+            <description>zookeeper server port for offset transaction</description>
+        </property>
+        <property>
+            <name>dataSourceConfig.transactionZKRoot</name>
+            <displayName>dataSourceConfig.transactionZKRoot</displayName>
+            <value>/consumers</value>
+            <description>offset transaction root</description>
+        </property>
+        <property>
+            <name>dataSourceConfig.consumerGroupId</name>
+            <displayName>dataSourceConfig.consumerGroupId</displayName>
+            <value>eagle.hbaseaudit.consumer</value>
+            <description>kafka consumer group Id</description>
+        </property>
+        <property>
+            <name>dataSourceConfig.transactionStateUpdateMS</name>
+            <displayName>dataSourceConfig.transactionStateUpdateMS</displayName>
+            <value>2000</value>
+            <description>zk upate</description>
+        </property>
+        <property>
+            <name>dataSourceConfig.schemeCls</name>
+            <displayName>dataSourceConfig.schemeCls</displayName>
+            <value>storm.kafka.StringScheme</value>
+            <description>scheme class</description>
+        </property>
+        <property>
+            <name>topology.numOfSpoutTasks</name>
+            <displayName>topology.numOfSpoutTasks</displayName>
+            <value>2</value>
+            <description>number of spout tasks</description>
+        </property>
+        <property>
+            <name>topology.numOfAnalyzerTasks</name>
+            <displayName>topology.numOfAnalyzerTasks</displayName>
+            <value>2</value>
+            <description>number of analyzer tasks</description>
+        </property>
+        <property>
+            <name>topology.numOfGeneratorTasks</name>
+            <displayName>topology.numOfGeneratorTasks</displayName>
+            <value>2</value>
+            <description>number of generator tasks</description>
+        </property>
+        <property>
+            <name>topology.numOfSinkTasks</name>
+            <displayName>topology.numOfSinkTasks</displayName>
+            <value>2</value>
+            <description>number of sink tasks</description>
+        </property>
+        <property>
+            <name>eagleProps.eagleService.host</name>
+            <displayName>eagleProps.eagleService.host</displayName>
+            <value>localhost</value>
+            <description>eagle service host</description>
+        </property>
+        <property>
+            <name>eagleProps.eagleService.port</name>
+            <displayName>eagleProps.eagleService.port</displayName>
+            <value>8080</value>
+            <description>eagle service port</description>
+        </property>
+        <property>
+            <name>eagleProps.eagleService.username</name>
+            <displayName>eagleProps.eagleService.username</displayName>
+            <value>admin</value>
+            <description>eagle service username</description>
+        </property>
+        <property>
+            <name>eagleProps.eagleService.password</name>
+            <displayName>eagleProps.eagleService.password</displayName>
+            <value>secret</value>
+            <description>eagle service password</description>
+        </property>
+        <property>
+            <name>dataSinkConfig.topic</name>
+            <displayName>dataSinkConfig.topic</displayName>
+            <value>hbase_audit_log_parsed</value>
+            <description>topic for kafka data sink</description>
+        </property>
+        <property>
+            <name>dataSinkConfig.brokerList</name>
+            <displayName>dataSinkConfig.brokerList</displayName>
+            <value>sandbox.hortonworks.com:6667</value>
+            <description>kafka broker list</description>
+        </property>
+        <property>
+            <name>dataSinkConfig.serializerClass</name>
+            <displayName>dataSinkConfig.serializerClass</displayName>
+            <value>kafka.serializer.StringEncoder</value>
+            <description>serializer class Kafka message value</description>
+        </property>
+        <property>
+            <name>dataSinkConfig.keySerializerClass</name>
+            <displayName>dataSinkConfig.keySerializerClass</displayName>
+            <value>kafka.serializer.StringEncoder</value>
+            <description>serializer class Kafka message key</description>
+        </property>
+    </configuration>
+    <streams>
+        <stream>
+            <streamId>gc_log_stream</streamId>
+            <description>GC Log Stream</description>
+            <validate>true</validate>
+            <timeseries>true</timeseries>
+            <columns>
+                <column>
+                    <name>action</name>
+                    <type>string</type>
+                </column>
+                <column>
+                    <name>host</name>
+                    <type>string</type>
+                </column>
+                <column>
+                    <name>status</name>
+                    <type>string</type>
+                </column>
+                <column>
+                    <name>timestamp</name>
+                    <type>long</type>
+                </column>
+            </columns>
+        </stream>
+    </streams>
+    <docs>
+        <install>
+            # Step 1: Create source kafka topic named "${site}_example_source_topic"
+
+            ./bin/kafka-topics.sh --create --topic example_source_topic --replication-factor 1 --replication 1
+
+            # Step 2: Set up data collector to flow data into kafka topic in
+
+            ./bin/logstash -f log_collector.conf
+
+            ## `log_collector.conf` sample as following:
+
+            input {
+
+            }
+            filter {
+
+            }
+            output{
+
+            }
+
+            # Step 3: start application
+
+            # Step 4: monitor with featured portal or alert with policies
+        </install>
+        <uninstall>
+            # Step 1: stop and uninstall application
+            # Step 2: delete kafka topic named "${site}_example_source_topic"
+            # Step 3: stop logstash
+        </uninstall>
+    </docs>
+</application>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/98458658/eagle-gc/src/main/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider
----------------------------------------------------------------------
diff --git a/eagle-gc/src/main/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider b/eagle-gc/src/main/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider
new file mode 100644
index 0000000..fc9406f
--- /dev/null
+++ b/eagle-gc/src/main/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider
@@ -0,0 +1,37 @@
+#
+# /*
+#  *
+#  *    Licensed to the Apache Software Foundation (ASF) under one or more
+#  *    contributor license agreements.  See the NOTICE file distributed with
+#  *    this work for additional information regarding copyright ownership.
+#  *    The ASF licenses this file to You under the Apache License, Version 2.0
+#  *    (the "License"); you may not use this file except in compliance with
+#  *    the License.  You may obtain a copy of the License at
+#  *
+#  *       http://www.apache.org/licenses/LICENSE-2.0
+#  *
+#  *    Unless required by applicable law or agreed to in writing, software
+#  *    distributed under the License is distributed on an "AS IS" BASIS,
+#  *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  *    See the License for the specific language governing permissions and
+#  *    limitations under the License.
+#  *
+#  */
+#
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.eagle.gc.GCLogApplicationProvider
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/98458658/eagle-gc/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-gc/src/main/resources/application.conf b/eagle-gc/src/main/resources/application.conf
index ed14063..33fa733 100644
--- a/eagle-gc/src/main/resources/application.conf
+++ b/eagle-gc/src/main/resources/application.conf
@@ -13,48 +13,40 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 {
-  "envContextConfig" : {
-    "env" : "storm",
-    "mode" : "local",
-    "topologyName" : "GCAnalysorTopology",
-    "stormConfigFile" : "gc-storm.yaml",
-    "parallelismConfig" : {
-      "kafkaMsgConsumer" : 1
-    }
+  "appId" : "GCLogApp",
+  "mode" : "LOCAL",
+  "siteId" : "testsite",
+  "topology" : {
+    "numOfTotalWorkers" : 2,
+    "numOfSpoutTasks" : 2,
+    "numOfAnalyzerTasks" : 2,
+    "numOfGeneratorTasks" : 2,
+    "numOfSinkTasks" : 2
   },
   "dataSourceConfig": {
-    "site" : "sandbox",
-    "topic" : "sandbox-namenode-gc_log",
-    "consumerGroupId" : "gc.log.eagle.consumer",
-    "zkSessionTimeoutMs" : 15000,
-    "zkRetryTimes" : 3,
-    "zkRetryInterval" : 2000,
+    "topic" : "gc_log",
+    "zkConnection" : "server.eagle.apache.org:2181",
     "zkConnectionTimeoutMS" : 15000,
     "fetchSize" : 1048586,
-    "deserializerClass" : "org.apache.eagle.gc.spout.GCLogDeserializer",
-    "zkConnection" : "localhost:2181",
-    "transactionZKServers" : "localhost",
+    "transactionZKServers" : "server.eagle.apache.org",
     "transactionZKPort" : "2181",
     "transactionZKRoot" : "/consumers",
+    "consumerGroupId" : "gc.log.eagle.consumer",
     "transactionStateUpdateMS" : 2000,
-    "kafkaEndPoints" : "localhost:6667"
+    "schemeCls" : "storm.kafka.StringScheme"
   },
   "eagleProps" : {
-    "site" : "sandbox",
-    "application": "NNGCLog",
-    "mailHost" : "www.xyz.com",
-    "mailSmtpPort":"25",
-    "mailDebug" : "true",
     "eagleService": {
       "host": "localhost",
-      "port": 38080,
+      "port": 9090,
       "username": "admin",
       "password": "secret"
     }
   },
-  "dynamicConfigSource" : {
-    "enabled" : true,
-    "initDelayMillis" : 0,
-    "delayMillis" : 30000
+  "dataSinkConfig": {
+    "topic" : "gc_log_parsed",
+    "brokerList" : "server.eagle.apache.org:6667",
+    "serializerClass" : "kafka.serializer.StringEncoder",
+    "keySerializerClass" : "kafka.serializer.StringEncoder"
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/98458658/eagle-gc/src/main/resources/gc-storm.yaml
----------------------------------------------------------------------
diff --git a/eagle-gc/src/main/resources/gc-storm.yaml b/eagle-gc/src/main/resources/gc-storm.yaml
deleted file mode 100644
index a68a323..0000000
--- a/eagle-gc/src/main/resources/gc-storm.yaml
+++ /dev/null
@@ -1,18 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-topology.workers: 1
-topology.acker.executors: 1
-topology.tasks: 1
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/98458658/eagle-gc/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/eagle-gc/src/main/resources/log4j.properties b/eagle-gc/src/main/resources/log4j.properties
index 8a0919a..f5fb8a8 100644
--- a/eagle-gc/src/main/resources/log4j.properties
+++ b/eagle-gc/src/main/resources/log4j.properties
@@ -19,9 +19,6 @@ eagle.log.dir=./logs
 eagle.log.file=eagle.log
 
 
-#log4j.logger.eagle.security.auditlog.IPZoneDataJoinExecutor=DEBUG
-#log4j.logger.eagle.security.auditlog.FileSensitivityDataJoinExecutor=DEBUG
-#log4j.logger.eagle.executor.AlertExecutor=DEBUG
 # standard output
 log4j.appender.stdout=org.apache.log4j.ConsoleAppender
 log4j.appender.stdout.layout=org.apache.log4j.PatternLayout

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/98458658/eagle-hadoop-metric/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-hadoop-metric/pom.xml b/eagle-hadoop-metric/pom.xml
index ce13ee9..389a481 100644
--- a/eagle-hadoop-metric/pom.xml
+++ b/eagle-hadoop-metric/pom.xml
@@ -35,6 +35,11 @@
             <artifactId>eagle-stream-application-manager</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.eagle</groupId>
+            <artifactId>eagle-app-base</artifactId>
+            <version>${project.version}</version>
+        </dependency>
     </dependencies>
 
     <build>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/98458658/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/HadoopJmxApplication.java
----------------------------------------------------------------------
diff --git a/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/HadoopJmxApplication.java b/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/HadoopJmxApplication.java
new file mode 100644
index 0000000..20ef5d0
--- /dev/null
+++ b/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/HadoopJmxApplication.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.hadoop.metric;
+
+import backtype.storm.generated.StormTopology;
+import backtype.storm.topology.TopologyBuilder;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import org.apache.eagle.app.StormApplication;
+import org.apache.eagle.app.environment.impl.StormEnvironment;
+
+/**
+ * Since 8/12/16.
+ */
+public class HadoopJmxApplication extends StormApplication {
+    @Override
+    public StormTopology execute(Config config, StormEnvironment environment) {
+        TopologyBuilder builder = new TopologyBuilder();
+        return builder.createTopology();
+    }
+
+    public static void main(String[] args){
+        Config config = ConfigFactory.load();
+        HadoopJmxApplication app = new HadoopJmxApplication();
+        app.run(config);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/98458658/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/HadoopJmxMetricMonitor.java
----------------------------------------------------------------------
diff --git a/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/HadoopJmxMetricMonitor.java b/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/HadoopJmxMetricMonitor.java
index 9202da4..c0b183b 100644
--- a/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/HadoopJmxMetricMonitor.java
+++ b/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/HadoopJmxMetricMonitor.java
@@ -17,7 +17,6 @@
 package org.apache.eagle.hadoop.metric;
 
 import org.apache.eagle.datastream.ExecutionEnvironments;
-import org.apache.eagle.datastream.core.StreamProducer;
 import org.apache.eagle.datastream.storm.StormExecutionEnvironment;
 
 /**
@@ -26,10 +25,10 @@ import org.apache.eagle.datastream.storm.StormExecutionEnvironment;
 public class HadoopJmxMetricMonitor {
 
     public static void main(String[] args) {
-        StormExecutionEnvironment env = ExecutionEnvironments.get(args, StormExecutionEnvironment.class);
-        String streamName = "hadoopJmxMetricEventStream";
-        StreamProducer sp = env.fromSpout(Utils.createProvider(env.getConfig())).withOutputFields(2).nameAs(streamName);
-        sp.alertWithConsumer(streamName, "hadoopJmxMetricAlertExecutor");
-        env.execute();
+//        StormExecutionEnvironment env = ExecutionEnvironments.get(args, StormExecutionEnvironment.class);
+//        String streamName = "hadoopJmxMetricEventStream";
+//        StreamProducer sp = env.fromSpout(Utils.createProvider(env.getConfig())).withOutputFields(2).nameAs(streamName);
+//        sp.alertWithConsumer(streamName, "hadoopJmxMetricAlertExecutor");
+//        env.execute();
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/98458658/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/HadoopJmxMetricMonitoringTopology.java
----------------------------------------------------------------------
diff --git a/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/HadoopJmxMetricMonitoringTopology.java b/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/HadoopJmxMetricMonitoringTopology.java
deleted file mode 100644
index 044a48f..0000000
--- a/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/HadoopJmxMetricMonitoringTopology.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-
-package org.apache.eagle.hadoop.metric;
-
-
-import com.typesafe.config.Config;
-import org.apache.eagle.datastream.ExecutionEnvironments;
-import org.apache.eagle.datastream.core.StreamProducer;
-import org.apache.eagle.datastream.storm.StormExecutionEnvironment;
-import org.apache.eagle.stream.application.TopologyExecutable;
-
-public class HadoopJmxMetricMonitoringTopology implements TopologyExecutable {
-    @Override
-    public void submit(String topology, Config config) {
-        StormExecutionEnvironment env = ExecutionEnvironments.getStorm(config);
-        String streamName = "hadoopJmxMetricEventStream";
-        StreamProducer sp = env.fromSpout(Utils.createProvider(env.getConfig())).withOutputFields(2).nameAs(streamName);
-        sp.alertWithConsumer(streamName, "hadoopJmxMetricAlertExecutor");
-        env.execute();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/98458658/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/EagleMetricCollectorApplication.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/EagleMetricCollectorApplication.java b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/EagleMetricCollectorApplication.java
new file mode 100644
index 0000000..c738b90
--- /dev/null
+++ b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/EagleMetricCollectorApplication.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.metric.kafka;
+
+import backtype.storm.generated.StormTopology;
+import backtype.storm.spout.SchemeAsMultiScheme;
+import backtype.storm.topology.BoltDeclarer;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.topology.base.BaseRichSpout;
+import backtype.storm.tuple.Fields;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import org.apache.eagle.app.StormApplication;
+import org.apache.eagle.app.environment.impl.StormEnvironment;
+import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSourcedSpoutProvider;
+import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSourcedSpoutScheme;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import storm.kafka.BrokerHosts;
+import storm.kafka.KafkaSpout;
+import storm.kafka.SpoutConfig;
+import storm.kafka.ZkHosts;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Since 8/12/16.
+ */
+public class EagleMetricCollectorApplication extends StormApplication{
+    private static final Logger LOG = LoggerFactory.getLogger(EagleMetricCollectorApplication.class);
+
+    public final static String SPOUT_TASK_NUM = "topology.numOfSpoutTasks";
+    public final static String DISTRIBUTION_TASK_NUM = "topology.numOfDistributionTasks";
+
+    @Override
+    public StormTopology execute(Config config, StormEnvironment environment) {
+        String deserClsName = config.getString("dataSourceConfig.deserializerClass");
+        final KafkaSourcedSpoutScheme scheme = new KafkaSourcedSpoutScheme(deserClsName, config) {
+            @Override
+            public List<Object> deserialize(byte[] ser) {
+                Object tmp = deserializer.deserialize(ser);
+                Map<String, Object> map = (Map<String, Object>)tmp;
+                if(tmp == null) return null;
+                return Arrays.asList(map.get("user"), map.get("timestamp"));
+            }
+        };
+
+        // TODO: Refactored the anonymous in to independen class file, avoiding too complex logic in main method
+        KafkaSourcedSpoutProvider kafkaMessageSpoutProvider = new KafkaSourcedSpoutProvider() {
+            @Override
+            public BaseRichSpout getSpout(Config context) {
+                // Kafka topic
+                String topic = context.getString("dataSourceConfig.topic");
+                // Kafka consumer group id
+                String groupId = context.getString("dataSourceConfig.metricCollectionConsumerId");
+                // Kafka fetch size
+                int fetchSize = context.getInt("dataSourceConfig.fetchSize");
+                // Kafka deserializer class
+                String deserClsName = context.getString("dataSourceConfig.deserializerClass");
+
+                // Kafka broker zk connection
+                String zkConnString = context.getString("dataSourceConfig.zkQuorum");
+
+                // transaction zkRoot
+                String zkRoot = context.getString("dataSourceConfig.transactionZKRoot");
+
+                LOG.info(String.format("Use topic id: %s",topic));
+
+                String brokerZkPath = null;
+                if(context.hasPath("dataSourceConfig.brokerZkPath")) {
+                    brokerZkPath = context.getString("dataSourceConfig.brokerZkPath");
+                }
+
+                BrokerHosts hosts;
+                if(brokerZkPath == null) {
+                    hosts = new ZkHosts(zkConnString);
+                } else {
+                    hosts = new ZkHosts(zkConnString, brokerZkPath);
+                }
+
+                SpoutConfig spoutConfig = new SpoutConfig(hosts,
+                        topic,
+                        zkRoot + "/" + topic,
+                        groupId);
+
+                // transaction zkServers
+                String[] zkConnections = zkConnString.split(",");
+                List<String> zkHosts = new ArrayList<>();
+                for (String zkConnection : zkConnections) {
+                    zkHosts.add(zkConnection.split(":")[0]);
+                }
+                Integer zkPort = Integer.valueOf(zkConnections[0].split(":")[1]);
+
+                spoutConfig.zkServers = zkHosts;
+                // transaction zkPort
+                spoutConfig.zkPort = zkPort;
+                // transaction update interval
+                spoutConfig.stateUpdateIntervalMs = context.getLong("dataSourceConfig.transactionStateUpdateMS");
+                // Kafka fetch size
+                spoutConfig.fetchSizeBytes = fetchSize;
+
+                spoutConfig.scheme = new SchemeAsMultiScheme(scheme);
+                KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
+                return kafkaSpout;
+            }
+        };
+
+        TopologyBuilder builder = new TopologyBuilder();
+        BaseRichSpout spout1 = new KafkaOffsetSourceSpoutProvider().getSpout(config);
+        BaseRichSpout spout2 = kafkaMessageSpoutProvider.getSpout(config);
+
+        int numOfSpoutTasks = config.getInt(SPOUT_TASK_NUM);
+        int numOfDistributionTasks = config.getInt(DISTRIBUTION_TASK_NUM);
+
+        builder.setSpout("kafkaLogLagChecker", spout1, numOfSpoutTasks);
+        builder.setSpout("kafkaMessageFetcher", spout2, numOfSpoutTasks);
+
+        KafkaMessageDistributionBolt bolt = new KafkaMessageDistributionBolt(config);
+        BoltDeclarer bolteclarer = builder.setBolt("distributionBolt", bolt, numOfDistributionTasks);
+        bolteclarer.fieldsGrouping("kafkaLogLagChecker", new Fields("f1"));
+        bolteclarer.fieldsGrouping("kafkaLogLagChecker", new Fields("f1"));
+        return builder.createTopology();
+    }
+
+
+    public static void main(String[] args){
+        Config config = ConfigFactory.load();
+        EagleMetricCollectorApplication app = new EagleMetricCollectorApplication();
+        app.run(config);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/98458658/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/EagleMetricCollectorMain.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/EagleMetricCollectorMain.java b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/EagleMetricCollectorMain.java
deleted file mode 100644
index 4eeda05..0000000
--- a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/EagleMetricCollectorMain.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.metric.kafka;
-
-import backtype.storm.spout.SchemeAsMultiScheme;
-import backtype.storm.topology.base.BaseRichSpout;
-import com.typesafe.config.Config;
-import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSourcedSpoutProvider;
-import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSourcedSpoutScheme;
-import org.apache.eagle.datastream.ExecutionEnvironments;
-import org.apache.eagle.datastream.storm.StormExecutionEnvironment;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import storm.kafka.BrokerHosts;
-import storm.kafka.KafkaSpout;
-import storm.kafka.SpoutConfig;
-import storm.kafka.ZkHosts;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-
-public class EagleMetricCollectorMain {
-
-    private static final Logger LOG = LoggerFactory.getLogger(EagleMetricCollectorMain.class);
-
-    public static void main(String[] args) throws Exception {
-        StormExecutionEnvironment env = ExecutionEnvironments.getStorm(args);
-        Config config = env.getConfig();
-        String deserClsName = config.getString("dataSourceConfig.deserializerClass");
-        final KafkaSourcedSpoutScheme scheme = new KafkaSourcedSpoutScheme(deserClsName, config) {
-            @Override
-            public List<Object> deserialize(byte[] ser) {
-                Object tmp = deserializer.deserialize(ser);
-                Map<String, Object> map = (Map<String, Object>)tmp;
-                if(tmp == null) return null;
-                return Arrays.asList(map.get("user"), map.get("timestamp"));
-            }
-        };
-
-
-        // TODO: Refactored the anonymous in to independen class file, avoiding too complex logic in main method
-        KafkaSourcedSpoutProvider kafkaMessageSpoutProvider = new KafkaSourcedSpoutProvider() {
-            @Override
-            public BaseRichSpout getSpout(Config context) {
-                // Kafka topic
-                String topic = context.getString("dataSourceConfig.topic");
-                // Kafka consumer group id
-                String groupId = context.getString("dataSourceConfig.metricCollectionConsumerId");
-                // Kafka fetch size
-                int fetchSize = context.getInt("dataSourceConfig.fetchSize");
-                // Kafka deserializer class
-                String deserClsName = context.getString("dataSourceConfig.deserializerClass");
-
-                // Kafka broker zk connection
-                String zkConnString = context.getString("dataSourceConfig.zkQuorum");
-
-                // transaction zkRoot
-                String zkRoot = context.getString("dataSourceConfig.transactionZKRoot");
-
-                LOG.info(String.format("Use topic id: %s",topic));
-
-                String brokerZkPath = null;
-                if(context.hasPath("dataSourceConfig.brokerZkPath")) {
-                    brokerZkPath = context.getString("dataSourceConfig.brokerZkPath");
-                }
-
-                BrokerHosts hosts;
-                if(brokerZkPath == null) {
-                    hosts = new ZkHosts(zkConnString);
-                } else {
-                    hosts = new ZkHosts(zkConnString, brokerZkPath);
-                }
-
-                SpoutConfig spoutConfig = new SpoutConfig(hosts,
-                        topic,
-                        zkRoot + "/" + topic,
-                        groupId);
-
-                // transaction zkServers
-                String[] zkConnections = zkConnString.split(",");
-                List<String> zkHosts = new ArrayList<>();
-                for (String zkConnection : zkConnections) {
-                    zkHosts.add(zkConnection.split(":")[0]);
-                }
-                Integer zkPort = Integer.valueOf(zkConnections[0].split(":")[1]);
-
-                spoutConfig.zkServers = zkHosts;
-                // transaction zkPort
-                spoutConfig.zkPort = zkPort;
-                // transaction update interval
-                spoutConfig.stateUpdateIntervalMs = context.getLong("dataSourceConfig.transactionStateUpdateMS");
-                // Kafka fetch size
-                spoutConfig.fetchSizeBytes = fetchSize;
-
-                spoutConfig.scheme = new SchemeAsMultiScheme(scheme);
-                KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
-                return kafkaSpout;
-            }
-        };
-
-        env.fromSpout(new KafkaOffsetSourceSpoutProvider()).withOutputFields(0).nameAs("kafkaLogLagChecker");
-        env.fromSpout(kafkaMessageSpoutProvider).withOutputFields(2).nameAs("kafkaMessageFetcher").groupBy(Arrays.asList(0))
-                .flatMap(new KafkaMessageDistributionExecutor());
-        env.execute();
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/98458658/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaMessageDistributionBolt.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaMessageDistributionBolt.java b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaMessageDistributionBolt.java
new file mode 100644
index 0000000..0caa64c
--- /dev/null
+++ b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaMessageDistributionBolt.java
@@ -0,0 +1,114 @@
+/*
+ *
+ *    Licensed to the Apache Software Foundation (ASF) under one or more
+ *    contributor license agreements.  See the NOTICE file distributed with
+ *    this work for additional information regarding copyright ownership.
+ *    The ASF licenses this file to You under the Apache License, Version 2.0
+ *    (the "License"); you may not use this file except in compliance with
+ *    the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *    Unless required by applicable law or agreed to in writing, software
+ *    distributed under the License is distributed on an "AS IS" BASIS,
+ *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *    See the License for the specific language governing permissions and
+ *    limitations under the License.
+ *
+ */
+
+package org.apache.eagle.metric.kafka;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.tuple.Tuple;
+import com.codahale.metrics.MetricRegistry;
+import com.typesafe.config.Config;
+import org.apache.commons.lang.time.DateUtils;
+import org.apache.eagle.common.config.EagleConfigConstants;
+import org.apache.eagle.datastream.Collector;
+import org.apache.eagle.datastream.JavaStormStreamExecutor1;
+import org.apache.eagle.metric.reportor.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Tuple1;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class KafkaMessageDistributionBolt extends BaseRichBolt {
+    private static final Logger LOG = LoggerFactory.getLogger(KafkaMessageDistributionBolt.class);
+    private static final long DEFAULT_METRIC_GRANULARITY = 60 * 1000;
+    private Config config;
+    private Map<String, String> baseMetricDimension;
+    private MetricRegistry registry;
+    private EagleMetricListener listener;
+    private long granularity;
+    private OutputCollector collector;
+
+    public KafkaMessageDistributionBolt(Config config){
+        this.config = config;
+    }
+
+    @Override
+    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+        String site = config.getString("dataSourceConfig.site");
+        String topic = config.getString("dataSourceConfig.topic");
+        this.baseMetricDimension = new HashMap<>();
+        this.baseMetricDimension.put("site", site);
+        this.baseMetricDimension.put("topic", topic);
+        registry = new MetricRegistry();
+
+        this.granularity = DEFAULT_METRIC_GRANULARITY;
+        if (config.hasPath("dataSourceConfig.kafkaDistributionDataIntervalMin")) {
+            this.granularity = config.getInt("dataSourceConfig.kafkaDistributionDataIntervalMin") * DateUtils.MILLIS_PER_MINUTE;
+        }
+
+        String host = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.HOST);
+        int port = config.getInt(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PORT);
+        String username = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.USERNAME);
+        String password = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PASSWORD);
+        listener = new EagleServiceReporterMetricListener(host, port, username, password);
+    }
+
+    public String generateMetricKey(String user) {
+        Map<String, String> dimensions = new HashMap<>();
+        dimensions.putAll(baseMetricDimension);
+        dimensions.put("user", user);
+        String metricName = "eagle.kafka.message.count";
+        String encodedMetricName = MetricKeyCodeDecoder.codeMetricKey(metricName, dimensions);
+        return encodedMetricName;
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+
+    }
+
+    @Override
+    public void execute(Tuple input) {
+        try {
+            String user = input.getString(0);
+            Long timestamp = input.getLong(1);
+            String metricKey = generateMetricKey(user);
+            if (registry.getMetrics().get(metricKey) == null) {
+                EagleCounterMetric metric = new EagleCounterMetric(timestamp, metricKey, 1.0, granularity);
+                metric.registerListener(listener);
+                registry.register(metricKey, metric);
+            }
+            else {
+                EagleMetric metric = (EagleMetric)registry.getMetrics().get(metricKey);
+                metric.update(1, timestamp);
+                //TODO: if we need to remove metric from registry
+            }
+        }
+        catch (Exception ex) {
+            LOG.error("Got an exception, ex: ", ex);
+        }finally {
+            collector.ack(input);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/98458658/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaMessageDistributionExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaMessageDistributionExecutor.java b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaMessageDistributionExecutor.java
deleted file mode 100644
index dbb6e9a..0000000
--- a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaMessageDistributionExecutor.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- *
- *    Licensed to the Apache Software Foundation (ASF) under one or more
- *    contributor license agreements.  See the NOTICE file distributed with
- *    this work for additional information regarding copyright ownership.
- *    The ASF licenses this file to You under the Apache License, Version 2.0
- *    (the "License"); you may not use this file except in compliance with
- *    the License.  You may obtain a copy of the License at
- *
- *       http://www.apache.org/licenses/LICENSE-2.0
- *
- *    Unless required by applicable law or agreed to in writing, software
- *    distributed under the License is distributed on an "AS IS" BASIS,
- *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *    See the License for the specific language governing permissions and
- *    limitations under the License.
- *
- */
-
-package org.apache.eagle.metric.kafka;
-
-import com.codahale.metrics.MetricRegistry;
-import com.typesafe.config.Config;
-import org.apache.commons.lang.time.DateUtils;
-import org.apache.eagle.common.config.EagleConfigConstants;
-import org.apache.eagle.datastream.Collector;
-import org.apache.eagle.datastream.JavaStormStreamExecutor1;
-import org.apache.eagle.metric.reportor.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.Tuple1;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class KafkaMessageDistributionExecutor extends JavaStormStreamExecutor1<String> {
-
-    private Config config;
-    private Map<String, String> baseMetricDimension;
-    private MetricRegistry registry;
-    private EagleMetricListener listener;
-    private long granularity;
-    private static final long DEFAULT_METRIC_GRANULARITY = 60 * 1000;
-    private static final Logger LOG = LoggerFactory.getLogger(KafkaMessageDistributionExecutor.class);
-
-    @Override
-    public void prepareConfig(Config config) {
-        this.config = config;
-    }
-
-    @Override
-    public void init() {
-        String site = config.getString("dataSourceConfig.site");
-        String topic = config.getString("dataSourceConfig.topic");
-        this.baseMetricDimension = new HashMap<>();
-        this.baseMetricDimension.put("site", site);
-        this.baseMetricDimension.put("topic", topic);
-        registry = new MetricRegistry();
-
-        this.granularity = DEFAULT_METRIC_GRANULARITY;
-        if (config.hasPath("dataSourceConfig.kafkaDistributionDataIntervalMin")) {
-            this.granularity = config.getInt("dataSourceConfig.kafkaDistributionDataIntervalMin") * DateUtils.MILLIS_PER_MINUTE;
-        }
-
-        String host = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.HOST);
-        int port = config.getInt(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PORT);
-        String username = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.USERNAME);
-        String password = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PASSWORD);
-        listener = new EagleServiceReporterMetricListener(host, port, username, password);
-    }
-
-    public String generateMetricKey(String user) {
-        Map<String, String> dimensions = new HashMap<>();
-        dimensions.putAll(baseMetricDimension);
-        dimensions.put("user", user);
-        String metricName = "eagle.kafka.message.count";
-        String encodedMetricName = MetricKeyCodeDecoder.codeMetricKey(metricName, dimensions);
-        return encodedMetricName;
-    }
-
-    @Override
-    public void flatMap(List<Object> input, Collector<Tuple1<String>> collector) {
-        try {
-            String user = (String) input.get(0);
-            Long timestamp = (Long) input.get(1);
-            String metricKey = generateMetricKey(user);
-            if (registry.getMetrics().get(metricKey) == null) {
-                EagleCounterMetric metric = new EagleCounterMetric(timestamp, metricKey, 1.0, granularity);
-                metric.registerListener(listener);
-                registry.register(metricKey, metric);
-            }
-            else {
-                EagleMetric metric = (EagleMetric)registry.getMetrics().get(metricKey);
-                metric.update(1, timestamp);
-                //TODO: if we need to remove metric from registry
-            }
-        }
-        catch (Exception ex) {
-            LOG.error("Got an exception, ex: ", ex);
-        }
-    }
-}