You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by yo...@apache.org on 2016/08/14 23:52:53 UTC

[2/2] incubator-eagle git commit: clean up eagle-data-process Author: @yonzhang2012 Closes: #343

clean up eagle-data-process
Author: @yonzhang2012 <yo...@gmail.com>
Closes: #343


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/b4732cb2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/b4732cb2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/b4732cb2

Branch: refs/heads/develop
Commit: b4732cb2fc104350e2f9e6c3edb8518f41a21a11
Parents: c66b525
Author: yonzhang <yo...@gmail.com>
Authored: Sun Aug 14 16:56:44 2016 -0700
Committer: yonzhang <yo...@gmail.com>
Committed: Sun Aug 14 16:56:44 2016 -0700

----------------------------------------------------------------------
 .../AbstractDynamicApplication.scala            |  30 -----
 .../src/main/java/META-INF/MANIFEST.MF          |  19 ---
 .../impl/storm/kafka/JsonSerializer.java        |  58 ---------
 .../storm/kafka/KafkaSourcedSpoutProvider.java  | 104 ----------------
 .../storm/kafka/KafkaSourcedSpoutScheme.java    |  71 -----------
 .../impl/storm/kafka/KafkaSpoutProvider.java    | 118 +++++++++++++++++++
 .../kafka/NewKafkaSourcedSpoutProvider.java     | 118 -------------------
 .../eagle/datastream/utils/JavaReflections.java |  31 -----
 .../src/main/resources/application.conf         |   1 -
 .../eagle/datastream/utils/ReflectionS.scala    |  55 ---------
 .../entity/AbstractPolicyDefinitionEntity.java  |  27 -----
 .../alert/entity/AlertStreamSchemaEntity.java   | 111 -----------------
 .../eagle/policy/DefaultPolicyPartitioner.java  |  32 -----
 .../eagle/policy/PolicyEvaluationContext.java   |  34 ------
 .../apache/eagle/policy/PolicyEvaluator.java    |  61 ----------
 .../apache/eagle/policy/PolicyPartitioner.java  |  26 ----
 .../org/apache/eagle/policy/ResultRender.java   |  32 -----
 .../eagle/policy/executor/IPolicyExecutor.java  |  29 -----
 .../policy/siddhi/SiddhiEvaluationHandler.java  |  27 -----
 .../src/main/resources/log4j.properties         |  21 ----
 .../org/apache/eagle/gc/GCLogApplication.java   |   4 +-
 eagle-hadoop-metric/pom.xml                     |  24 ----
 .../assembly/eagle-hadoop-metric-assembly.xml   |  64 ----------
 .../hadoop/metric/HadoopJmxApplication.java     |  32 +++++
 .../eagle/hadoop/metric/JsonParserBolt.java     |  62 ++++++++++
 .../org/apache/eagle/hadoop/metric/Utils.java   |  64 ----------
 .../src/main/resources/application.conf         |  52 +++-----
 .../kafka/EagleMetricCollectorApplication.java  |  71 +----------
 .../metric/kafka/KafkaSourcedSpoutProvider.java |  93 +++++++++++++++
 .../metric/kafka/KafkaSourcedSpoutScheme.java   |  72 +++++++++++
 .../hbase/HBaseAuditLogApplication.java         |   4 +-
 .../AbstractHdfsAuditLogApplication.java        |   4 +-
 .../securitylog/HdfsAuthLogMonitoringMain.java  |   4 +-
 .../oozie/parse/OozieAuditLogApplication.java   |   4 +-
 34 files changed, 405 insertions(+), 1154 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b4732cb2/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/AbstractDynamicApplication.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/AbstractDynamicApplication.scala b/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/AbstractDynamicApplication.scala
deleted file mode 100644
index b5fbf59..0000000
--- a/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/AbstractDynamicApplication.scala
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-
-package org.apache.eagle.stream.application
-
-import com.typesafe.config.Config
-import org.apache.eagle.datastream.core.StreamContext
-
-
-trait AbstractDynamicApplication extends TopologyExecutable {
-  def compileStream(application: String, config: Config): StreamContext = {
-    null
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b4732cb2/eagle-core/eagle-common/src/main/java/META-INF/MANIFEST.MF
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-common/src/main/java/META-INF/MANIFEST.MF b/eagle-core/eagle-common/src/main/java/META-INF/MANIFEST.MF
deleted file mode 100644
index c67816b..0000000
--- a/eagle-core/eagle-common/src/main/java/META-INF/MANIFEST.MF
+++ /dev/null
@@ -1,19 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-Manifest-Version: 1.0
-Class-Path: 
-

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b4732cb2/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/JsonSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/JsonSerializer.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/JsonSerializer.java
deleted file mode 100644
index 416aaa3..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/JsonSerializer.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.dataproc.impl.storm.kafka;
-
-import org.apache.kafka.common.serialization.Serializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.codehaus.jackson.map.SerializationConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.Map;
-
-public class JsonSerializer implements Serializer<Object> {
-    private final StringSerializer stringSerializer = new StringSerializer();
-    private static final Logger logger = LoggerFactory.getLogger(JsonSerializer.class);
-    private static final ObjectMapper om = new ObjectMapper();
-
-    static {
-        om.configure(SerializationConfig.Feature.WRITE_DATES_AS_TIMESTAMPS, true);
-    }
-
-    @Override
-    public void configure(Map<String, ?> configs, boolean isKey) {
-        stringSerializer.configure(configs,isKey);
-    }
-
-    @Override
-    public byte[] serialize(String topic, Object data) {
-        String str = null;
-        try {
-            str = om.writeValueAsString(data);
-        } catch (IOException e) {
-            logger.error("Kafka serialization for send error!", e);
-        }
-        return stringSerializer.serialize(topic, str);
-    }
-
-    @Override
-    public void close() {
-        stringSerializer.close();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b4732cb2/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutProvider.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutProvider.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutProvider.java
deleted file mode 100644
index c6a4983..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutProvider.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.dataproc.impl.storm.kafka;
-
-import java.util.Arrays;
-
-import com.typesafe.config.Config;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import storm.kafka.BrokerHosts;
-import storm.kafka.KafkaSpout;
-import storm.kafka.SpoutConfig;
-import storm.kafka.ZkHosts;
-import backtype.storm.spout.SchemeAsMultiScheme;
-import backtype.storm.topology.base.BaseRichSpout;
-
-import org.apache.eagle.dataproc.impl.storm.StormSpoutProvider;
-
-public class KafkaSourcedSpoutProvider implements StormSpoutProvider {
-    private final static Logger LOG = LoggerFactory.getLogger(KafkaSourcedSpoutProvider.class);
-
-	public SchemeAsMultiScheme getStreamScheme(String deserClsName, Config context) {
-		return new SchemeAsMultiScheme(new KafkaSourcedSpoutScheme(deserClsName, context));
-	}
-
-    private String configPrefix = "dataSourceConfig";
-
-    public KafkaSourcedSpoutProvider(){}
-
-    public KafkaSourcedSpoutProvider(String prefix){
-        this.configPrefix = prefix;
-    }
-
-	@Override
-	public BaseRichSpout getSpout(Config config){
-        Config context = config;
-        if(this.configPrefix!=null) context = config.getConfig(configPrefix);
-		// Kafka topic
-		String topic = context.getString("topic");
-		// Kafka consumer group id
-		String groupId = context.getString("consumerGroupId");
-		// Kafka fetch size
-		int fetchSize = context.getInt("fetchSize");
-		// Kafka deserializer class
-		String deserClsName = context.getString("deserializerClass");
-		// Kafka broker zk connection
-		String zkConnString = context.getString("zkConnection");
-		// transaction zkRoot
-		String zkRoot = context.getString("transactionZKRoot");
-
-        LOG.info(String.format("Use topic id: %s",topic));
-
-        String brokerZkPath = null;
-        if(context.hasPath("brokerZkPath")) {
-            brokerZkPath = context.getString("brokerZkPath");
-        }
-
-        BrokerHosts hosts;
-        if(brokerZkPath == null) {
-            hosts = new ZkHosts(zkConnString);
-        } else {
-            hosts = new ZkHosts(zkConnString, brokerZkPath);
-        }
-        
-		SpoutConfig spoutConfig = new SpoutConfig(hosts, 
-				topic,
-				zkRoot + "/" + topic,
-				groupId);
-		
-		// transaction zkServers
-		spoutConfig.zkServers = Arrays.asList(context.getString("transactionZKServers").split(","));
-		// transaction zkPort
-		spoutConfig.zkPort = context.getInt("transactionZKPort");
-		// transaction update interval
-		spoutConfig.stateUpdateIntervalMs = context.getLong("transactionStateUpdateMS");
-		// Kafka fetch size
-		spoutConfig.fetchSizeBytes = fetchSize;		
-		// "startOffsetTime" is for test usage, prod should not use this
-		if (context.hasPath("startOffsetTime")) {
-			spoutConfig.startOffsetTime = context.getInt("startOffsetTime");
-		}		
-		// "forceFromStart" is for test usage, prod should not use this 
-		if (context.hasPath("forceFromStart")) {
-			spoutConfig.startOffsetTime = kafka.api.OffsetRequest.EarliestTime();
-		}
-		
-		spoutConfig.scheme = getStreamScheme(deserClsName, context);
-        return new KafkaSpout(spoutConfig);
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b4732cb2/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutScheme.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutScheme.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutScheme.java
deleted file mode 100644
index 15401fd..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutScheme.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.dataproc.impl.storm.kafka;
-
-import backtype.storm.spout.Scheme;
-import backtype.storm.tuple.Fields;
-import com.typesafe.config.Config;
-import org.apache.eagle.datastream.utils.NameConstants;
-
-import java.lang.reflect.Constructor;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Properties;
-
-/**
- * This scheme defines how a kafka message is deserialized and the output field name for storm stream
- * it includes the following:
- * 1. data source is kafka, so need kafka message deserializer class
- * 2. output field declaration
- */
-public class KafkaSourcedSpoutScheme implements Scheme {
-	protected SpoutKafkaMessageDeserializer deserializer;
-	
-	public KafkaSourcedSpoutScheme(String deserClsName, Config context){
-		try{
-			Properties prop = new Properties();
-            if(context.hasPath("eagleProps")) {
-                prop.putAll(context.getObject("eagleProps"));
-            }
-			Constructor<?> constructor =  Class.forName(deserClsName).getConstructor(Properties.class);
-			deserializer = (SpoutKafkaMessageDeserializer) constructor.newInstance(prop);
-		}catch(Exception ex){
-			throw new RuntimeException("Failed to create new instance for decoder class " + deserClsName, ex);
-		}
-	}
-	
-	@Override
-	public List<Object> deserialize(byte[] ser) {
-		Object tmp = deserializer.deserialize(ser);
-		if(tmp == null)
-			return null;
-		// the following tasks are executed within the same process of kafka spout
-		return Arrays.asList(tmp);
-	}
-
-    /**
-     * Default only f0, but it requires to be overrode if different
-     *
-     * TODO: Handle the schema with KeyValue based structure
-     *
-     * @return Fields
-     */
-	@Override
-	public Fields getOutputFields() {
-        return new Fields(NameConstants.FIELD_PREFIX()+"0");
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b4732cb2/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSpoutProvider.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSpoutProvider.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSpoutProvider.java
new file mode 100644
index 0000000..2d2936c
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSpoutProvider.java
@@ -0,0 +1,118 @@
+/*
+ *
+ *  *
+ *  *    Licensed to the Apache Software Foundation (ASF) under one or more
+ *  *    contributor license agreements.  See the NOTICE file distributed with
+ *  *    this work for additional information regarding copyright ownership.
+ *  *    The ASF licenses this file to You under the Apache License, Version 2.0
+ *  *    (the "License"); you may not use this file except in compliance with
+ *  *    the License.  You may obtain a copy of the License at
+ *  *
+ *  *       http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  *    Unless required by applicable law or agreed to in writing, software
+ *  *    distributed under the License is distributed on an "AS IS" BASIS,
+ *  *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  *    See the License for the specific language governing permissions and
+ *  *    limitations under the License.
+ *  *
+ *
+ */
+
+package org.apache.eagle.dataproc.impl.storm.kafka;
+
+import backtype.storm.spout.Scheme;
+import backtype.storm.spout.SchemeAsMultiScheme;
+import backtype.storm.topology.base.BaseRichSpout;
+import com.typesafe.config.Config;
+import org.apache.eagle.dataproc.impl.storm.StormSpoutProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import storm.kafka.BrokerHosts;
+import storm.kafka.KafkaSpout;
+import storm.kafka.SpoutConfig;
+import storm.kafka.ZkHosts;
+
+import java.util.Arrays;
+
+/**
+ * Since 6/8/16.
+ */
+public class KafkaSpoutProvider implements StormSpoutProvider {
+    private final static Logger LOG = LoggerFactory.getLogger(KafkaSpoutProvider.class);
+
+    private String configPrefix = "dataSourceConfig";
+
+    public KafkaSpoutProvider(){}
+
+    public KafkaSpoutProvider(String prefix){
+        this.configPrefix = prefix;
+    }
+
+    @Override
+    public BaseRichSpout getSpout(Config config){
+        Config context = config;
+        if(this.configPrefix!=null) context = config.getConfig(configPrefix);
+        // Kafka topic
+        String topic = context.getString("topic");
+        // Kafka consumer group id
+        String groupId = context.getString("consumerGroupId");
+        // Kafka fetch size
+        int fetchSize = context.getInt("fetchSize");
+        // Kafka broker zk connection
+        String zkConnString = context.getString("zkConnection");
+        // transaction zkRoot
+        String zkRoot = context.getString("transactionZKRoot");
+
+        LOG.info(String.format("Use topic id: %s",topic));
+
+        String brokerZkPath = null;
+        if(context.hasPath("brokerZkPath")) {
+            brokerZkPath = context.getString("brokerZkPath");
+        }
+
+        BrokerHosts hosts;
+        if(brokerZkPath == null) {
+            hosts = new ZkHosts(zkConnString);
+        } else {
+            hosts = new ZkHosts(zkConnString, brokerZkPath);
+        }
+
+        SpoutConfig spoutConfig = new SpoutConfig(hosts,
+                topic,
+                zkRoot + "/" + topic,
+                groupId);
+
+        // transaction zkServers
+        spoutConfig.zkServers = Arrays.asList(context.getString("transactionZKServers").split(","));
+        // transaction zkPort
+        spoutConfig.zkPort = context.getInt("transactionZKPort");
+        // transaction update interval
+        spoutConfig.stateUpdateIntervalMs = context.getLong("transactionStateUpdateMS");
+        // Kafka fetch size
+        spoutConfig.fetchSizeBytes = fetchSize;
+        // "startOffsetTime" is for test usage, prod should not use this
+        if (context.hasPath("startOffsetTime")) {
+            spoutConfig.startOffsetTime = context.getInt("startOffsetTime");
+        }
+        // "forceFromStart" is for test usage, prod should not use this
+        if (context.hasPath("forceFromStart")) {
+            spoutConfig.startOffsetTime = kafka.api.OffsetRequest.EarliestTime();
+        }
+
+        if (context.hasPath("schemeCls")) {
+            try {
+                Scheme s = (Scheme)Class.forName(context.getString("schemeCls")).newInstance();
+                spoutConfig.scheme = new SchemeAsMultiScheme(s);
+            }catch(Exception ex){
+                LOG.error("error instantiating scheme object");
+                throw new IllegalStateException(ex);
+            }
+        }else{
+            String err = "schemeCls must be present";
+            LOG.error(err);
+            throw new IllegalStateException(err);
+        }
+        return new KafkaSpout(spoutConfig);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b4732cb2/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/NewKafkaSourcedSpoutProvider.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/NewKafkaSourcedSpoutProvider.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/NewKafkaSourcedSpoutProvider.java
deleted file mode 100644
index d764ac1..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/NewKafkaSourcedSpoutProvider.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- *
- *  *
- *  *    Licensed to the Apache Software Foundation (ASF) under one or more
- *  *    contributor license agreements.  See the NOTICE file distributed with
- *  *    this work for additional information regarding copyright ownership.
- *  *    The ASF licenses this file to You under the Apache License, Version 2.0
- *  *    (the "License"); you may not use this file except in compliance with
- *  *    the License.  You may obtain a copy of the License at
- *  *
- *  *       http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  *    Unless required by applicable law or agreed to in writing, software
- *  *    distributed under the License is distributed on an "AS IS" BASIS,
- *  *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  *    See the License for the specific language governing permissions and
- *  *    limitations under the License.
- *  *
- *
- */
-
-package org.apache.eagle.dataproc.impl.storm.kafka;
-
-import backtype.storm.spout.Scheme;
-import backtype.storm.spout.SchemeAsMultiScheme;
-import backtype.storm.topology.base.BaseRichSpout;
-import com.typesafe.config.Config;
-import org.apache.eagle.dataproc.impl.storm.StormSpoutProvider;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import storm.kafka.BrokerHosts;
-import storm.kafka.KafkaSpout;
-import storm.kafka.SpoutConfig;
-import storm.kafka.ZkHosts;
-
-import java.util.Arrays;
-
-/**
- * Since 6/8/16.
- */
-public class NewKafkaSourcedSpoutProvider implements StormSpoutProvider {
-    private final static Logger LOG = LoggerFactory.getLogger(NewKafkaSourcedSpoutProvider.class);
-
-    private String configPrefix = "dataSourceConfig";
-
-    public NewKafkaSourcedSpoutProvider(){}
-
-    public NewKafkaSourcedSpoutProvider(String prefix){
-        this.configPrefix = prefix;
-    }
-
-    @Override
-    public BaseRichSpout getSpout(Config config){
-        Config context = config;
-        if(this.configPrefix!=null) context = config.getConfig(configPrefix);
-        // Kafka topic
-        String topic = context.getString("topic");
-        // Kafka consumer group id
-        String groupId = context.getString("consumerGroupId");
-        // Kafka fetch size
-        int fetchSize = context.getInt("fetchSize");
-        // Kafka broker zk connection
-        String zkConnString = context.getString("zkConnection");
-        // transaction zkRoot
-        String zkRoot = context.getString("transactionZKRoot");
-
-        LOG.info(String.format("Use topic id: %s",topic));
-
-        String brokerZkPath = null;
-        if(context.hasPath("brokerZkPath")) {
-            brokerZkPath = context.getString("brokerZkPath");
-        }
-
-        BrokerHosts hosts;
-        if(brokerZkPath == null) {
-            hosts = new ZkHosts(zkConnString);
-        } else {
-            hosts = new ZkHosts(zkConnString, brokerZkPath);
-        }
-
-        SpoutConfig spoutConfig = new SpoutConfig(hosts,
-                topic,
-                zkRoot + "/" + topic,
-                groupId);
-
-        // transaction zkServers
-        spoutConfig.zkServers = Arrays.asList(context.getString("transactionZKServers").split(","));
-        // transaction zkPort
-        spoutConfig.zkPort = context.getInt("transactionZKPort");
-        // transaction update interval
-        spoutConfig.stateUpdateIntervalMs = context.getLong("transactionStateUpdateMS");
-        // Kafka fetch size
-        spoutConfig.fetchSizeBytes = fetchSize;
-        // "startOffsetTime" is for test usage, prod should not use this
-        if (context.hasPath("startOffsetTime")) {
-            spoutConfig.startOffsetTime = context.getInt("startOffsetTime");
-        }
-        // "forceFromStart" is for test usage, prod should not use this
-        if (context.hasPath("forceFromStart")) {
-            spoutConfig.startOffsetTime = kafka.api.OffsetRequest.EarliestTime();
-        }
-
-        if (context.hasPath("schemeCls")) {
-            try {
-                Scheme s = (Scheme)Class.forName(context.getString("schemeCls")).newInstance();
-                spoutConfig.scheme = new SchemeAsMultiScheme(s);
-            }catch(Exception ex){
-                LOG.error("error instantiating scheme object");
-                throw new IllegalStateException(ex);
-            }
-        }else{
-            String err = "schemeCls must be present";
-            LOG.error(err);
-            throw new IllegalStateException(err);
-        }
-        return new KafkaSpout(spoutConfig);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b4732cb2/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/datastream/utils/JavaReflections.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/datastream/utils/JavaReflections.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/datastream/utils/JavaReflections.java
deleted file mode 100644
index 04b4bed..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/datastream/utils/JavaReflections.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.datastream.utils;
-
-import java.lang.reflect.ParameterizedType;
-
-/**
- * @since 12/7/15
- */
-class JavaReflections {
-    @SuppressWarnings("unchecked")
-    public static Class<?> getGenericTypeClass(final Object obj,int index) {
-        return (Class<?>) ((ParameterizedType) obj
-                .getClass()
-                .getGenericSuperclass()).getActualTypeArguments()[index];
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b4732cb2/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/resources/application.conf b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/resources/application.conf
index 72c2ae5..c386a71 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/resources/application.conf
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/resources/application.conf
@@ -30,7 +30,6 @@
     "zkConnectionTimeoutMS" : 15000,
     "consumerGroupId" : "eagle.consumer",
     "fetchSize" : 1048586,
-    "deserializerClass" : "org.apache.eagle.datastream.storm.JsonMessageDeserializer",
     "transactionZKServers" : "sandbox.hortonworks.com",
     "transactionZKPort" : 2181,
     "transactionZKRoot" : "/consumers",

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b4732cb2/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/utils/ReflectionS.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/utils/ReflectionS.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/utils/ReflectionS.scala
deleted file mode 100644
index 1d48752..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/utils/ReflectionS.scala
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.datastream.utils
-
-import scala.reflect.api
-import scala.reflect.runtime.{universe => ru}
-
-/**
- * @since  12/7/15
- */
-object Reflections{
-  private val UNIT_CLASS = classOf[Unit]
-  private val UNIT_TYPE_TAG = ru.typeTag[Unit]
-
-  /**
-   * Class to TypeTag
-   * @param clazz class
-   * @tparam T Type T
-   * @return
-   */
-  def typeTag[T](clazz:Class[T]):ru.TypeTag[T]={
-    if(clazz == null){
-      null
-    }else if(clazz == UNIT_CLASS) {
-      UNIT_TYPE_TAG.asInstanceOf[ru.TypeTag[T]]
-    } else {
-      val mirror = ru.runtimeMirror(clazz.getClassLoader)
-      val sym = mirror.staticClass(clazz.getCanonicalName)
-      val tpe = sym.selfType
-      ru.TypeTag(mirror, new api.TypeCreator {
-        def apply[U <: api.Universe with Singleton](m: api.Mirror[U]) =
-          if (m eq mirror) tpe.asInstanceOf[U#Type]
-          else throw new IllegalArgumentException(s"Type tag defined in $mirror cannot be migrated to other mirrors.")
-      })
-    }
-  }
-
-  def javaTypeClass[T](obj: AnyRef, index: Int = 0):Class[T] = JavaReflections.getGenericTypeClass(obj,index).asInstanceOf[Class[T]]
-  def javaTypeTag[T](obj: AnyRef, index: Int = 0):ru.TypeTag[T] = typeTag(JavaReflections.getGenericTypeClass(obj,index)).asInstanceOf[ru.TypeTag[T]]
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b4732cb2/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AbstractPolicyDefinitionEntity.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AbstractPolicyDefinitionEntity.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AbstractPolicyDefinitionEntity.java
deleted file mode 100644
index 3f45be7..0000000
--- a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AbstractPolicyDefinitionEntity.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.entity;
-
-import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
-
-@SuppressWarnings("serial")
-public abstract class AbstractPolicyDefinitionEntity extends TaggedLogAPIEntity {
-	
-	public abstract String getPolicyDef();
-	
-	public abstract boolean isEnabled();
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b4732cb2/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertStreamSchemaEntity.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertStreamSchemaEntity.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertStreamSchemaEntity.java
deleted file mode 100644
index 4dd9006..0000000
--- a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertStreamSchemaEntity.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.entity;
-
-import org.apache.eagle.policy.common.Constants;
-import org.codehaus.jackson.annotate.JsonIgnoreProperties;
-import org.codehaus.jackson.map.annotate.JsonSerialize;
-
-import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
-import org.apache.eagle.log.entity.meta.Column;
-import org.apache.eagle.log.entity.meta.ColumnFamily;
-import org.apache.eagle.log.entity.meta.Prefix;
-import org.apache.eagle.log.entity.meta.Service;
-import org.apache.eagle.log.entity.meta.Table;
-import org.apache.eagle.log.entity.meta.Tags;
-import org.apache.eagle.log.entity.meta.TimeSeries;
-
-/**
- * ddl to create streammetadata table
- * 
- * create 'alertStreamSchema', {NAME => 'f', BLOOMFILTER => 'ROW', VERSIONS => '1', COMPRESSION => 'SNAPPY'}
- */
-@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
-@Table("alertStreamSchema")
-@ColumnFamily("f")
-@Prefix("alertStreamSchema")
-@Service(Constants.ALERT_STREAM_SCHEMA_SERVICE_ENDPOINT_NAME)
-@JsonIgnoreProperties(ignoreUnknown = true)
-@TimeSeries(false)
-@Tags({"application", "streamName", "attrName"})
-public class AlertStreamSchemaEntity extends TaggedLogAPIEntity{
-	@Column("a")
-	private String attrType;
-	@Column("b")
-	private String category;
-	@Column("c")
-	private String attrValueResolver;
-	/* all tags form the key for alert de-duplication */
-	@Column("d")
-	private Boolean usedAsTag;
-	@Column("e")
-	private String attrDescription;
-	@Column("f")
-	private String attrDisplayName;	
-	@Column("g")
-	private String defaultValue;
-
-	public String getAttrType() {
-		return attrType;
-	}
-	public void setAttrType(String attrType) {
-		this.attrType = attrType;
-		valueChanged("attrType");
-	}
-	public String getCategory() {
-		return category;
-	}
-	public void setCategory(String category) {
-		this.category = category;
-		valueChanged("category");
-	}
-	public String getAttrValueResolver() {
-		return attrValueResolver;
-	}
-	public void setAttrValueResolver(String attrValueResolver) {
-		this.attrValueResolver = attrValueResolver;
-		valueChanged("attrValueResolver");
-	}
-	public Boolean getUsedAsTag() {
-		return usedAsTag;
-	}
-	public void setUsedAsTag(Boolean usedAsTag) {
-		this.usedAsTag = usedAsTag;
-		valueChanged("usedAsTag");
-	}
-	public String getAttrDescription() {
-		return attrDescription;
-	}
-	public void setAttrDescription(String attrDescription) {
-		this.attrDescription = attrDescription;
-		valueChanged("attrDescription");
-	}
-	public String getAttrDisplayName() {
-		return attrDisplayName;
-	}
-	public void setAttrDisplayName(String attrDisplayName) {
-		this.attrDisplayName = attrDisplayName;
-		valueChanged("attrDisplayName");
-	}
-	public String getDefaultValue() {
-		return defaultValue;
-	}
-	public void setDefaultValue(String defaultValue) {
-		this.defaultValue = defaultValue;
-		valueChanged("defaultValue");
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b4732cb2/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/DefaultPolicyPartitioner.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/DefaultPolicyPartitioner.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/DefaultPolicyPartitioner.java
deleted file mode 100644
index 1143b11..0000000
--- a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/DefaultPolicyPartitioner.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.policy;
-
-
-public class DefaultPolicyPartitioner implements PolicyPartitioner{
-	@Override
-	public int partition(int numTotalPartitions, String policyType,
-			String policyId) {
-		final int prime = 31;
-		int result = 1;
-		result = result * prime + policyType.hashCode();
-		result = result < 0 ? result*-1 : result;
-		result = result * prime + policyId.hashCode();
-		result = result < 0 ? result*-1 : result;
-		return result % numTotalPartitions;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b4732cb2/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyEvaluationContext.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyEvaluationContext.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyEvaluationContext.java
deleted file mode 100644
index 7dad895..0000000
--- a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyEvaluationContext.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.policy;
-
-import org.apache.eagle.alert.entity.AbstractPolicyDefinitionEntity;
-import org.apache.eagle.datastream.Collector;
-import org.apache.eagle.policy.executor.IPolicyExecutor;
-
-public class PolicyEvaluationContext<T extends AbstractPolicyDefinitionEntity, K> {
-	
-	public IPolicyExecutor<T, K> alertExecutor;
-	
-	public String policyId;
-	
-	public PolicyEvaluator<T> evaluator;
-	
-	public Collector outputCollector;
-	
-	public ResultRender<T, K> resultRender;
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b4732cb2/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyEvaluator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyEvaluator.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyEvaluator.java
deleted file mode 100644
index 46a63ee..0000000
--- a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyEvaluator.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.policy;
-
-import java.util.Map;
-
-import org.apache.eagle.alert.entity.AbstractPolicyDefinitionEntity;
-import org.apache.eagle.dataproc.core.ValuesArray;
-
-/***
- * 
- * @param <T> - The policy definition entity
- */
-public interface PolicyEvaluator<T extends AbstractPolicyDefinitionEntity> {
-	/**
-	 * take input and evaluate expression
-	 * input has 3 fields, first is siddhiAlertContext, second one is streamName, the third is map of attribute name/value
-	 * @param input
-	 * @throws Exception
-	 */
-	public void evaluate(ValuesArray input) throws Exception;
-	
-	/**
-	 * notify policy evaluator that policy is updated
-	 */
-	public void onPolicyUpdate(T newAlertDef);
-	
-	/**
-	 * notify policy evaluator that policy is deleted, here is cleanup work for this policy evaluator
-	 */
-	public void onPolicyDelete();
-	
-	/**
-	 * get additional context
-	 */	
-	public Map<String, String> getAdditionalContext();
-
-	/**
-	 * Get markdown status for the policy.
-	 */
-	public boolean isMarkdownEnabled();
-
-	/**
-	 * Get markdown reason for the given policy.
-	 */
-	public String getMarkdownReason();
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b4732cb2/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyPartitioner.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyPartitioner.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyPartitioner.java
deleted file mode 100644
index fa9620c..0000000
--- a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyPartitioner.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.policy;
-
-import java.io.Serializable;
-
-/**
- * partition policies so that policies can be distributed into different alert evaluators
- */
-public interface PolicyPartitioner extends Serializable {
-	int partition(int numTotalPartitions, String policyType, String policyId);
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b4732cb2/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/ResultRender.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/ResultRender.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/ResultRender.java
deleted file mode 100644
index cc59880..0000000
--- a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/ResultRender.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.policy;
-
-import com.typesafe.config.Config;
-import org.apache.eagle.alert.entity.AbstractPolicyDefinitionEntity;
-
-import java.util.List;
-
-/**
- * @since Dec 17, 2015
- *
- */
-public interface ResultRender<T extends AbstractPolicyDefinitionEntity, K> {
-
-	K render(Config config, List<Object> rets, PolicyEvaluationContext<T, K> siddhiAlertContext, long timestamp);
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b4732cb2/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/executor/IPolicyExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/executor/IPolicyExecutor.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/executor/IPolicyExecutor.java
deleted file mode 100644
index c9d28a2..0000000
--- a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/executor/IPolicyExecutor.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.policy.executor;
-
-import org.apache.eagle.alert.entity.AbstractPolicyDefinitionEntity;
-import org.apache.eagle.policy.siddhi.SiddhiEvaluationHandler;
-
-/**
- * Created on 1/10/16.
- */
-public interface IPolicyExecutor<T extends AbstractPolicyDefinitionEntity, K> extends SiddhiEvaluationHandler<T, K> {
-    String getExecutorId();
-
-    int getPartitionSeq();
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b4732cb2/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiEvaluationHandler.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiEvaluationHandler.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiEvaluationHandler.java
deleted file mode 100644
index 2e8fc55..0000000
--- a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiEvaluationHandler.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.policy.siddhi;
-
-import org.apache.eagle.policy.PolicyEvaluationContext;
-import org.apache.eagle.alert.entity.AbstractPolicyDefinitionEntity;
-
-import java.util.List;
-
-public interface SiddhiEvaluationHandler<T extends AbstractPolicyDefinitionEntity, K> {
-
-	void onEvalEvents(PolicyEvaluationContext<T, K> context, List<K> alerts);
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b4732cb2/eagle-examples/eagle-topology-example/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/eagle-examples/eagle-topology-example/src/main/resources/log4j.properties b/eagle-examples/eagle-topology-example/src/main/resources/log4j.properties
deleted file mode 100644
index d59ded6..0000000
--- a/eagle-examples/eagle-topology-example/src/main/resources/log4j.properties
+++ /dev/null
@@ -1,21 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-log4j.rootLogger=INFO, stdout
-
-# standard output
-log4j.appender.stdout=org.apache.log4j.ConsoleAppender
-log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
-log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b4732cb2/eagle-gc/src/main/java/org/apache/eagle/gc/GCLogApplication.java
----------------------------------------------------------------------
diff --git a/eagle-gc/src/main/java/org/apache/eagle/gc/GCLogApplication.java b/eagle-gc/src/main/java/org/apache/eagle/gc/GCLogApplication.java
index 86d8bc4..e2ac91a 100644
--- a/eagle-gc/src/main/java/org/apache/eagle/gc/GCLogApplication.java
+++ b/eagle-gc/src/main/java/org/apache/eagle/gc/GCLogApplication.java
@@ -30,7 +30,7 @@ import com.typesafe.config.ConfigFactory;
 import org.apache.eagle.app.StormApplication;
 import org.apache.eagle.app.environment.impl.StormEnvironment;
 import org.apache.eagle.app.sink.StormStreamSink;
-import org.apache.eagle.dataproc.impl.storm.kafka.NewKafkaSourcedSpoutProvider;
+import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSpoutProvider;
 import org.apache.eagle.gc.executor.GCLogAnalyzerBolt;
 import org.apache.eagle.gc.executor.GCMetricGeneratorBolt;
 import storm.kafka.StringScheme;
@@ -47,7 +47,7 @@ public class GCLogApplication extends StormApplication{
     @Override
     public StormTopology execute(Config config, StormEnvironment environment) {
         TopologyBuilder builder = new TopologyBuilder();
-        NewKafkaSourcedSpoutProvider provider = new NewKafkaSourcedSpoutProvider();
+        KafkaSpoutProvider provider = new KafkaSpoutProvider();
         IRichSpout spout = provider.getSpout(config);
 
         int numOfSpoutTasks = config.getInt(SPOUT_TASK_NUM);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b4732cb2/eagle-hadoop-metric/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-hadoop-metric/pom.xml b/eagle-hadoop-metric/pom.xml
index 0d612df..15eea00 100644
--- a/eagle-hadoop-metric/pom.xml
+++ b/eagle-hadoop-metric/pom.xml
@@ -36,28 +36,4 @@
             <version>${project.version}</version>
         </dependency>
     </dependencies>
-
-    <build>
-        <plugins>
-            <plugin>
-                <artifactId>maven-assembly-plugin</artifactId>
-                <configuration>
-                    <descriptor>src/assembly/eagle-hadoop-metric-assembly.xml</descriptor>
-                    <finalName>eagle-hadoop-metric-${project.version}</finalName>
-                </configuration>
-                <executions>
-                    <execution>
-                        <phase>package</phase>
-                        <goals>
-                            <goal>single</goal>
-                        </goals>
-                        <configuration>
-                            <tarLongFileMode>posix</tarLongFileMode>
-                        </configuration>
-                    </execution>
-                </executions>
-            </plugin>
-        </plugins>
-    </build>
-
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b4732cb2/eagle-hadoop-metric/src/assembly/eagle-hadoop-metric-assembly.xml
----------------------------------------------------------------------
diff --git a/eagle-hadoop-metric/src/assembly/eagle-hadoop-metric-assembly.xml b/eagle-hadoop-metric/src/assembly/eagle-hadoop-metric-assembly.xml
deleted file mode 100644
index b581fbc..0000000
--- a/eagle-hadoop-metric/src/assembly/eagle-hadoop-metric-assembly.xml
+++ /dev/null
@@ -1,64 +0,0 @@
-<!--
-   Licensed to the Apache Software Foundation (ASF) under one or more
-   contributor license agreements.  See the NOTICE file distributed with
-   this work for additional information regarding copyright ownership.
-   The ASF licenses this file to You under the Apache License, Version 2.0
-   (the "License"); you may not use this file except in compliance with
-   the License.  You may obtain a copy of the License at
- 
-      http://www.apache.org/licenses/LICENSE-2.0
-
-   Unless required by applicable law or agreed to in writing, software
-   distributed under the License is distributed on an "AS IS" BASIS,
-   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-   See the License for the specific language governing permissions and
-   limitations under the License.
--->
-
-<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
-          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-          xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd">
-    <id>assembly</id>
-    <formats>
-        <format>jar</format>
-    </formats>
-    <includeBaseDirectory>false</includeBaseDirectory>
-    <dependencySets>
-        <dependencySet>
-            <outputDirectory>/</outputDirectory>
-            <useProjectArtifact>false</useProjectArtifact>
-            <unpack>true</unpack>
-            <scope>runtime</scope>
-            <unpackOptions>
-                <excludes>
-                    <exclude>**/application.conf</exclude>
-                    <exclude>**/defaults.yaml</exclude>
-                    <exclude>**/*storm.yaml</exclude>
-                    <exclude>**/*storm.yaml.1</exclude>
-                    <exclude>**/log4j.properties</exclude>
-                </excludes>
-            </unpackOptions>
-            <excludes>
-                <exclude>org.apache.storm:storm-core</exclude>
-                <exclude>org.slf4j:slf4j-api</exclude>
-                <exclude>org.slf4j:log4j-over-slf4j</exclude>
-                <exclude>org.slf4j:slf4j-log4j12</exclude>
-                <exclude>log4j:log4j</exclude>
-                <exclude>asm:asm</exclude>
-                <exclude>org.apache.log4j.wso2:log4j</exclude>
-            </excludes>
-        </dependencySet>
-    </dependencySets>
-
-    <fileSets>
-        <fileSet>
-            <directory>${project.build.outputDirectory}</directory>
-            <outputDirectory>/</outputDirectory>
-            <excludes>
-                <exclude>application.conf</exclude>
-                <exclude>log4j.properties</exclude>
-                <exclude>**/storm.yaml.1</exclude>
-            </excludes>
-        </fileSet>
-    </fileSets>
-</assembly>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b4732cb2/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/HadoopJmxApplication.java
----------------------------------------------------------------------
diff --git a/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/HadoopJmxApplication.java b/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/HadoopJmxApplication.java
index 20ef5d0..40d1a24 100644
--- a/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/HadoopJmxApplication.java
+++ b/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/HadoopJmxApplication.java
@@ -18,19 +18,51 @@
 package org.apache.eagle.hadoop.metric;
 
 import backtype.storm.generated.StormTopology;
+import backtype.storm.topology.BoltDeclarer;
+import backtype.storm.topology.IRichSpout;
 import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.tuple.Fields;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import org.apache.eagle.app.StormApplication;
 import org.apache.eagle.app.environment.impl.StormEnvironment;
+import org.apache.eagle.app.sink.StormStreamSink;
+import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSpoutProvider;
+import storm.kafka.StringScheme;
 
 /**
  * Since 8/12/16.
+ * This application just pass through data from jmx metric
+ * For persistence or alert purpose, it is not necessary to start application
+ * But keep this application in case of future business process
+ *
+ * Note: this application should be run as multiple instances based on different topic for data source
  */
 public class HadoopJmxApplication extends StormApplication {
+    public final static String SPOUT_TASK_NUM = "topology.numOfSpoutTasks";
+    public final static String PARSER_TASK_NUM = "topology.numOfParserTasks";
+    public final static String SINK_TASK_NUM = "topology.numOfSinkTasks";
+
     @Override
     public StormTopology execute(Config config, StormEnvironment environment) {
+        int numOfSpoutTasks = config.getInt(SPOUT_TASK_NUM);
+        int numOfParserTasks = config.getInt(PARSER_TASK_NUM);
+        int numOfSinkTasks = config.getInt(SINK_TASK_NUM);
+
         TopologyBuilder builder = new TopologyBuilder();
+
+        KafkaSpoutProvider provider = new KafkaSpoutProvider();
+        IRichSpout spout = provider.getSpout(config);
+        builder.setSpout("ingest", spout, numOfSpoutTasks);
+
+        JsonParserBolt bolt = new JsonParserBolt();
+        BoltDeclarer boltDeclarer = builder.setBolt("parserBolt", bolt, numOfParserTasks);
+        boltDeclarer.fieldsGrouping("ingest", new Fields(StringScheme.STRING_SCHEME_KEY));
+
+        StormStreamSink sinkBolt = environment.getStreamSink("hadoop_jmx_stream",config);
+        BoltDeclarer kafkaBoltDeclarer = builder.setBolt("kafkaSink", sinkBolt, numOfSinkTasks);
+        kafkaBoltDeclarer.fieldsGrouping("parserBolt", new Fields("f1"));
+
         return builder.createTopology();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b4732cb2/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/JsonParserBolt.java
----------------------------------------------------------------------
diff --git a/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/JsonParserBolt.java b/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/JsonParserBolt.java
new file mode 100644
index 0000000..7ca5ba6
--- /dev/null
+++ b/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/JsonParserBolt.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.hadoop.metric;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Map;
+
+/**
+ * Since 8/14/16.
+ */
+public class JsonParserBolt extends BaseRichBolt {
+    private Logger LOG = LoggerFactory.getLogger(JsonParserBolt.class);
+    private OutputCollector collector;
+    private ObjectMapper mapper = new ObjectMapper();
+
+    @Override
+    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+        this.collector = collector;
+    }
+
+    @Override
+    public void execute(Tuple input) {
+        String msg = input.getString(0);
+        try {
+            Map ret = mapper.readValue(msg, Map.class);
+            collector.emit(Arrays.asList(ret));
+        }catch(Exception ex){
+            LOG.error("error in passing json message", ex);
+        }finally{
+            collector.ack(input);
+        }
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        declarer.declare(new Fields("msg"));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b4732cb2/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/Utils.java
----------------------------------------------------------------------
diff --git a/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/Utils.java b/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/Utils.java
deleted file mode 100644
index 173441c..0000000
--- a/eagle-hadoop-metric/src/main/java/org/apache/eagle/hadoop/metric/Utils.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.hadoop.metric;
-
-import backtype.storm.spout.SchemeAsMultiScheme;
-import com.typesafe.config.Config;
-import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSourcedSpoutProvider;
-import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSourcedSpoutScheme;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Created on 1/25/16.
- */
-public class Utils {
-
-    /**
-     * Creates a spout provider that have host-metric as the first tuple data, so that it's feasible for alert grouping.
-     *
-     * @param config
-     * @return
-     */
-    public static KafkaSourcedSpoutProvider createProvider(Config config) {
-        String deserClsName = config.getString("dataSourceConfig.deserializerClass");
-        final KafkaSourcedSpoutScheme scheme = new KafkaSourcedSpoutScheme(deserClsName, config) {
-
-            @Override
-            public List<Object> deserialize(byte[] ser) {
-                Object tmp = deserializer.deserialize(ser);
-                Map<String, Object> map = (Map<String, Object>) tmp;
-                if (tmp == null) return null;
-                // this is the key to be grouped by
-                return Arrays.asList(String.format("%s-%s", map.get("host"), map.get("metric")), tmp);
-            }
-
-        };
-
-        KafkaSourcedSpoutProvider provider = new KafkaSourcedSpoutProvider() {
-
-            @Override
-            public SchemeAsMultiScheme getStreamScheme(String deserClsName, Config context) {
-                return new SchemeAsMultiScheme(scheme);
-            }
-
-        };
-        return provider;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b4732cb2/eagle-hadoop-metric/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-hadoop-metric/src/main/resources/application.conf b/eagle-hadoop-metric/src/main/resources/application.conf
index dc1c7f3..e75355f 100644
--- a/eagle-hadoop-metric/src/main/resources/application.conf
+++ b/eagle-hadoop-metric/src/main/resources/application.conf
@@ -14,56 +14,38 @@
 # limitations under the License.
 
 {
-  "envContextConfig" : {
-    "env" : "storm",
-    "mode" : "local",
-    "topologyName" : "hadoopJmxMetricTopology",
-    "stormConfigFile" : "hadoopjmx.yaml",
-    "parallelismConfig" : {
-      "kafkaMsgConsumer" : 1,
-      "hadoopJmxMetricAlertExecutor*" : 1
-    }
+  "appId" : "HadoopJmxApplication",
+  "mode" : "LOCAL",
+  "siteId" : "testsite",
+  "topology" : {
+    "numOfSpoutTasks" : 2,
+    "numOfParserTasks" : 2,
+    "numOfSinkTasks" : 2
   },
   "dataSourceConfig": {
-    "topic" : "nn_jmx_metric_sandbox",
-    "zkConnection" : "sandbox.hortonworks.com:2181",
+    "topic" : "jmx_metric",
+    "zkConnection" : "server.eagle.apache.org:2181",
     "zkConnectionTimeoutMS" : 15000,
     "consumerGroupId" : "EagleConsumer",
     "fetchSize" : 1048586,
-    "deserializerClass" : "org.apache.eagle.datastream.storm.JsonMessageDeserializer",
-    "transactionZKServers" : "sandbox.hortonworks.com",
+    "transactionZKServers" : "server.eagle.apache.org",
     "transactionZKPort" : 2181,
     "transactionZKRoot" : "/consumers",
     "transactionStateUpdateMS" : 2000
-  },
-  "alertExecutorConfigs" : {
-     "hadoopJmxMetricAlertExecutor" : {
-       "parallelism" : 1,
-       "partitioner" : "org.apache.eagle.policy.DefaultPolicyPartitioner"
-       "needValidation" : "true"
-     }
+    "schemeCls" : "storm.kafka.StringScheme"
   },
   "eagleProps" : {
-    "site" : "sandbox",
-    "application": "hadoopJmxMetricDataSource",
-  	"dataJoinPollIntervalSec" : 30,
-    "mailHost" : "mailHost.com",
-    "mailSmtpPort":"25",
-    "mailDebug" : "true",
-    "balancePartitionEnabled" : true,
-    #"partitionRefreshIntervalInMin" : 60,
-    #"kafkaStatisticRangeInMin" : 60,
     "eagleService": {
       "host": "localhost",
-      "port": 9099,
+      "port": 9090,
       "username": "admin",
       "password": "secret"
     }
-    "readHdfsUserCommandPatternFrom" : "file"
   },
-  "dynamicConfigSource" : {
-  	"enabled" : true,
-  	"initDelayMillis" : 0,
-  	"delayMillis" : 30000
+  "dataSinkConfig": {
+    "topic" : "jmx_metric_parsed",
+    "brokerList" : "server.eagle.apache.org:6667",
+    "serializerClass" : "kafka.serializer.StringEncoder",
+    "keySerializerClass" : "kafka.serializer.StringEncoder"
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b4732cb2/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/EagleMetricCollectorApplication.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/EagleMetricCollectorApplication.java b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/EagleMetricCollectorApplication.java
index c738b90..4ef50ea 100644
--- a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/EagleMetricCollectorApplication.java
+++ b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/EagleMetricCollectorApplication.java
@@ -17,7 +17,6 @@
 package org.apache.eagle.metric.kafka;
 
 import backtype.storm.generated.StormTopology;
-import backtype.storm.spout.SchemeAsMultiScheme;
 import backtype.storm.topology.BoltDeclarer;
 import backtype.storm.topology.TopologyBuilder;
 import backtype.storm.topology.base.BaseRichSpout;
@@ -26,16 +25,8 @@ import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import org.apache.eagle.app.StormApplication;
 import org.apache.eagle.app.environment.impl.StormEnvironment;
-import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSourcedSpoutProvider;
-import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSourcedSpoutScheme;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import storm.kafka.BrokerHosts;
-import storm.kafka.KafkaSpout;
-import storm.kafka.SpoutConfig;
-import storm.kafka.ZkHosts;
-
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
@@ -62,69 +53,9 @@ public class EagleMetricCollectorApplication extends StormApplication{
             }
         };
 
-        // TODO: Refactored the anonymous in to independen class file, avoiding too complex logic in main method
-        KafkaSourcedSpoutProvider kafkaMessageSpoutProvider = new KafkaSourcedSpoutProvider() {
-            @Override
-            public BaseRichSpout getSpout(Config context) {
-                // Kafka topic
-                String topic = context.getString("dataSourceConfig.topic");
-                // Kafka consumer group id
-                String groupId = context.getString("dataSourceConfig.metricCollectionConsumerId");
-                // Kafka fetch size
-                int fetchSize = context.getInt("dataSourceConfig.fetchSize");
-                // Kafka deserializer class
-                String deserClsName = context.getString("dataSourceConfig.deserializerClass");
-
-                // Kafka broker zk connection
-                String zkConnString = context.getString("dataSourceConfig.zkQuorum");
-
-                // transaction zkRoot
-                String zkRoot = context.getString("dataSourceConfig.transactionZKRoot");
-
-                LOG.info(String.format("Use topic id: %s",topic));
-
-                String brokerZkPath = null;
-                if(context.hasPath("dataSourceConfig.brokerZkPath")) {
-                    brokerZkPath = context.getString("dataSourceConfig.brokerZkPath");
-                }
-
-                BrokerHosts hosts;
-                if(brokerZkPath == null) {
-                    hosts = new ZkHosts(zkConnString);
-                } else {
-                    hosts = new ZkHosts(zkConnString, brokerZkPath);
-                }
-
-                SpoutConfig spoutConfig = new SpoutConfig(hosts,
-                        topic,
-                        zkRoot + "/" + topic,
-                        groupId);
-
-                // transaction zkServers
-                String[] zkConnections = zkConnString.split(",");
-                List<String> zkHosts = new ArrayList<>();
-                for (String zkConnection : zkConnections) {
-                    zkHosts.add(zkConnection.split(":")[0]);
-                }
-                Integer zkPort = Integer.valueOf(zkConnections[0].split(":")[1]);
-
-                spoutConfig.zkServers = zkHosts;
-                // transaction zkPort
-                spoutConfig.zkPort = zkPort;
-                // transaction update interval
-                spoutConfig.stateUpdateIntervalMs = context.getLong("dataSourceConfig.transactionStateUpdateMS");
-                // Kafka fetch size
-                spoutConfig.fetchSizeBytes = fetchSize;
-
-                spoutConfig.scheme = new SchemeAsMultiScheme(scheme);
-                KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
-                return kafkaSpout;
-            }
-        };
-
         TopologyBuilder builder = new TopologyBuilder();
         BaseRichSpout spout1 = new KafkaOffsetSourceSpoutProvider().getSpout(config);
-        BaseRichSpout spout2 = kafkaMessageSpoutProvider.getSpout(config);
+        BaseRichSpout spout2 = KafkaSourcedSpoutProvider.getSpout(config, scheme);
 
         int numOfSpoutTasks = config.getInt(SPOUT_TASK_NUM);
         int numOfDistributionTasks = config.getInt(DISTRIBUTION_TASK_NUM);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b4732cb2/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaSourcedSpoutProvider.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaSourcedSpoutProvider.java b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaSourcedSpoutProvider.java
new file mode 100644
index 0000000..382ae1a
--- /dev/null
+++ b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaSourcedSpoutProvider.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.metric.kafka;
+
+import backtype.storm.spout.Scheme;
+import backtype.storm.spout.SchemeAsMultiScheme;
+import backtype.storm.topology.base.BaseRichSpout;
+import com.typesafe.config.Config;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import storm.kafka.BrokerHosts;
+import storm.kafka.KafkaSpout;
+import storm.kafka.SpoutConfig;
+import storm.kafka.ZkHosts;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Since 8/14/16.
+ */
+public class KafkaSourcedSpoutProvider {
+    private final static Logger LOG = LoggerFactory.getLogger(KafkaSourcedSpoutProvider.class);
+    public static BaseRichSpout getSpout(Config context, Scheme scheme) {
+        // Kafka topic
+        String topic = context.getString("dataSourceConfig.topic");
+        // Kafka consumer group id
+        String groupId = context.getString("dataSourceConfig.metricCollectionConsumerId");
+        // Kafka fetch size
+        int fetchSize = context.getInt("dataSourceConfig.fetchSize");
+        // Kafka deserializer class
+        String deserClsName = context.getString("dataSourceConfig.deserializerClass");
+
+        // Kafka broker zk connection
+        String zkConnString = context.getString("dataSourceConfig.zkQuorum");
+
+        // transaction zkRoot
+        String zkRoot = context.getString("dataSourceConfig.transactionZKRoot");
+
+        LOG.info(String.format("Use topic id: %s",topic));
+
+        String brokerZkPath = null;
+        if(context.hasPath("dataSourceConfig.brokerZkPath")) {
+            brokerZkPath = context.getString("dataSourceConfig.brokerZkPath");
+        }
+
+        BrokerHosts hosts;
+        if(brokerZkPath == null) {
+            hosts = new ZkHosts(zkConnString);
+        } else {
+            hosts = new ZkHosts(zkConnString, brokerZkPath);
+        }
+
+        SpoutConfig spoutConfig = new SpoutConfig(hosts,
+                topic,
+                zkRoot + "/" + topic,
+                groupId);
+
+        // transaction zkServers
+        String[] zkConnections = zkConnString.split(",");
+        List<String> zkHosts = new ArrayList<>();
+        for (String zkConnection : zkConnections) {
+            zkHosts.add(zkConnection.split(":")[0]);
+        }
+        Integer zkPort = Integer.valueOf(zkConnections[0].split(":")[1]);
+
+        spoutConfig.zkServers = zkHosts;
+        // transaction zkPort
+        spoutConfig.zkPort = zkPort;
+        // transaction update interval
+        spoutConfig.stateUpdateIntervalMs = context.getLong("dataSourceConfig.transactionStateUpdateMS");
+        // Kafka fetch size
+        spoutConfig.fetchSizeBytes = fetchSize;
+
+        spoutConfig.scheme = new SchemeAsMultiScheme(scheme);
+        KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
+        return kafkaSpout;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b4732cb2/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaSourcedSpoutScheme.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaSourcedSpoutScheme.java b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaSourcedSpoutScheme.java
new file mode 100644
index 0000000..14b2384
--- /dev/null
+++ b/eagle-security/eagle-metric-collection/src/main/java/org/apache/eagle/metric/kafka/KafkaSourcedSpoutScheme.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.metric.kafka;
+
+import backtype.storm.spout.Scheme;
+import backtype.storm.tuple.Fields;
+import com.typesafe.config.Config;
+import org.apache.eagle.dataproc.impl.storm.kafka.SpoutKafkaMessageDeserializer;
+import org.apache.eagle.datastream.utils.NameConstants;
+
+import java.lang.reflect.Constructor;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * This scheme defines how a kafka message is deserialized and the output field name for storm stream
+ * it includes the following:
+ * 1. data source is kafka, so need kafka message deserializer class
+ * 2. output field declaration
+ */
+public class KafkaSourcedSpoutScheme implements Scheme {
+	protected SpoutKafkaMessageDeserializer deserializer;
+	
+	public KafkaSourcedSpoutScheme(String deserClsName, Config context){
+		try{
+			Properties prop = new Properties();
+            if(context.hasPath("eagleProps")) {
+                prop.putAll(context.getObject("eagleProps"));
+            }
+			Constructor<?> constructor =  Class.forName(deserClsName).getConstructor(Properties.class);
+			deserializer = (SpoutKafkaMessageDeserializer) constructor.newInstance(prop);
+		}catch(Exception ex){
+			throw new RuntimeException("Failed to create new instance for decoder class " + deserClsName, ex);
+		}
+	}
+	
+	@Override
+	public List<Object> deserialize(byte[] ser) {
+		Object tmp = deserializer.deserialize(ser);
+		if(tmp == null)
+			return null;
+		// the following tasks are executed within the same process of kafka spout
+		return Arrays.asList(tmp);
+	}
+
+    /**
+     * Default only f0, but it requires to be overrode if different
+     *
+     * TODO: Handle the schema with KeyValue based structure
+     *
+     * @return Fields
+     */
+	@Override
+	public Fields getOutputFields() {
+        return new Fields(NameConstants.FIELD_PREFIX()+"0");
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b4732cb2/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogApplication.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogApplication.java b/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogApplication.java
index 432043f..f5753cf 100644
--- a/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogApplication.java
+++ b/eagle-security/eagle-security-hbase-auditlog/src/main/java/org/apache/eagle/security/hbase/HBaseAuditLogApplication.java
@@ -26,7 +26,7 @@ import com.typesafe.config.ConfigFactory;
 import org.apache.eagle.app.StormApplication;
 import org.apache.eagle.app.environment.impl.StormEnvironment;
 import org.apache.eagle.app.sink.StormStreamSink;
-import org.apache.eagle.dataproc.impl.storm.kafka.NewKafkaSourcedSpoutProvider;
+import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSpoutProvider;
 
 /**
  * Since 7/27/16.
@@ -40,7 +40,7 @@ public class HBaseAuditLogApplication extends StormApplication {
     @Override
     public StormTopology execute(Config config, StormEnvironment environment) {
         TopologyBuilder builder = new TopologyBuilder();
-        NewKafkaSourcedSpoutProvider provider = new NewKafkaSourcedSpoutProvider();
+        KafkaSpoutProvider provider = new KafkaSpoutProvider();
         IRichSpout spout = provider.getSpout(config);
 
         HBaseAuditLogParserBolt bolt = new HBaseAuditLogParserBolt();