You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2016/01/20 07:48:21 UTC

[1/4] incubator-eagle git commit: EAGLE-130 Eagle Pipeline DSL: Parser, Compiler, Runner

Repository: incubator-eagle
Updated Branches:
  refs/heads/master f6c63e78a -> c1485aac5


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-security/eagle-security-hbase-securitylog/src/main/java/org/apache/eagle/security/hbase/sensitivity/HbaseResourceSensitivityDataJoinExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hbase-securitylog/src/main/java/org/apache/eagle/security/hbase/sensitivity/HbaseResourceSensitivityDataJoinExecutor.java b/eagle-security/eagle-security-hbase-securitylog/src/main/java/org/apache/eagle/security/hbase/sensitivity/HbaseResourceSensitivityDataJoinExecutor.java
index 5bb2aff..92cc206 100644
--- a/eagle-security/eagle-security-hbase-securitylog/src/main/java/org/apache/eagle/security/hbase/sensitivity/HbaseResourceSensitivityDataJoinExecutor.java
+++ b/eagle-security/eagle-security-hbase-securitylog/src/main/java/org/apache/eagle/security/hbase/sensitivity/HbaseResourceSensitivityDataJoinExecutor.java
@@ -21,7 +21,7 @@ import com.typesafe.config.Config;
 import org.apache.eagle.datastream.Collector;
 import org.apache.eagle.datastream.JavaStormStreamExecutor2;
 import org.apache.eagle.datastream.Tuple2;
-import org.apache.eagle.security.hbase.HbaseResourceSensitivityAPIEntity;
+import org.apache.eagle.security.entity.HbaseResourceSensitivityAPIEntity;
 import org.apache.eagle.security.util.ExternalDataCache;
 import org.apache.eagle.security.util.ExternalDataJoiner;
 import org.slf4j.Logger;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-security/eagle-security-hbase-securitylog/src/main/java/org/apache/eagle/security/hbase/sensitivity/HbaseResourceSensitivityPollingJob.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hbase-securitylog/src/main/java/org/apache/eagle/security/hbase/sensitivity/HbaseResourceSensitivityPollingJob.java b/eagle-security/eagle-security-hbase-securitylog/src/main/java/org/apache/eagle/security/hbase/sensitivity/HbaseResourceSensitivityPollingJob.java
index f2df58c..a5d3978 100644
--- a/eagle-security/eagle-security-hbase-securitylog/src/main/java/org/apache/eagle/security/hbase/sensitivity/HbaseResourceSensitivityPollingJob.java
+++ b/eagle-security/eagle-security-hbase-securitylog/src/main/java/org/apache/eagle/security/hbase/sensitivity/HbaseResourceSensitivityPollingJob.java
@@ -19,10 +19,7 @@ package org.apache.eagle.security.hbase.sensitivity;
 
 import com.google.common.base.Function;
 import com.google.common.collect.Maps;
-import org.apache.eagle.security.hbase.HbaseResourceSensitivityAPIEntity;
-import org.apache.eagle.security.util.AbstractResourceSensitivityPollingJob;
-import org.apache.eagle.security.util.ExternalDataCache;
-import org.apache.eagle.security.hbase.HbaseResourceSensitivityAPIEntity;
+import org.apache.eagle.security.entity.HbaseResourceSensitivityAPIEntity;
 import org.apache.eagle.security.util.AbstractResourceSensitivityPollingJob;
 import org.apache.eagle.security.util.ExternalDataCache;
 import org.quartz.Job;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-security/eagle-security-hbase-web/src/main/java/org/apache/eagle/service/security/hbase/HbaseMetadataBrowseWebResource.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hbase-web/src/main/java/org/apache/eagle/service/security/hbase/HbaseMetadataBrowseWebResource.java b/eagle-security/eagle-security-hbase-web/src/main/java/org/apache/eagle/service/security/hbase/HbaseMetadataBrowseWebResource.java
index 4a0bdf8..5dc98ca 100644
--- a/eagle-security/eagle-security-hbase-web/src/main/java/org/apache/eagle/service/security/hbase/HbaseMetadataBrowseWebResource.java
+++ b/eagle-security/eagle-security-hbase-web/src/main/java/org/apache/eagle/service/security/hbase/HbaseMetadataBrowseWebResource.java
@@ -16,7 +16,7 @@
  */
 package org.apache.eagle.service.security.hbase;
 
-import org.apache.eagle.security.hbase.HbaseResourceEntity;
+import org.apache.eagle.security.entity.HbaseResourceEntity;
 import org.apache.eagle.service.common.EagleExceptionWrapper;
 import org.apache.eagle.service.security.hbase.dao.HbaseMetadataAccessConfig;
 import org.apache.eagle.service.security.hbase.dao.HbaseMetadataAccessConfigDAOImpl;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-security/eagle-security-hbase-web/src/main/java/org/apache/eagle/service/security/hbase/HbaseMetadataBrowseWebResponse.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hbase-web/src/main/java/org/apache/eagle/service/security/hbase/HbaseMetadataBrowseWebResponse.java b/eagle-security/eagle-security-hbase-web/src/main/java/org/apache/eagle/service/security/hbase/HbaseMetadataBrowseWebResponse.java
index 5a7455a..960a411 100644
--- a/eagle-security/eagle-security-hbase-web/src/main/java/org/apache/eagle/service/security/hbase/HbaseMetadataBrowseWebResponse.java
+++ b/eagle-security/eagle-security-hbase-web/src/main/java/org/apache/eagle/service/security/hbase/HbaseMetadataBrowseWebResponse.java
@@ -18,7 +18,7 @@
 package org.apache.eagle.service.security.hbase;
 
 
-import org.apache.eagle.security.hbase.HbaseResourceEntity;
+import org.apache.eagle.security.entity.HbaseResourceEntity;
 
 import java.util.List;
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-security/eagle-security-hbase-web/src/main/java/org/apache/eagle/service/security/hbase/HbaseSensitivityResourceService.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hbase-web/src/main/java/org/apache/eagle/service/security/hbase/HbaseSensitivityResourceService.java b/eagle-security/eagle-security-hbase-web/src/main/java/org/apache/eagle/service/security/hbase/HbaseSensitivityResourceService.java
index 8a1c551..f707bb1 100644
--- a/eagle-security/eagle-security-hbase-web/src/main/java/org/apache/eagle/service/security/hbase/HbaseSensitivityResourceService.java
+++ b/eagle-security/eagle-security-hbase-web/src/main/java/org/apache/eagle/service/security/hbase/HbaseSensitivityResourceService.java
@@ -19,7 +19,7 @@ package org.apache.eagle.service.security.hbase;
 
 
 import org.apache.eagle.log.entity.ListQueryAPIResponseEntity;
-import org.apache.eagle.security.hbase.HbaseResourceSensitivityAPIEntity;
+import org.apache.eagle.security.entity.HbaseResourceSensitivityAPIEntity;
 import org.apache.eagle.service.generic.ListQueryResource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-security/eagle-security-hbase-web/src/main/java/org/apache/eagle/service/security/hbase/dao/HbaseMetadataAccessConfigDAOImpl.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hbase-web/src/main/java/org/apache/eagle/service/security/hbase/dao/HbaseMetadataAccessConfigDAOImpl.java b/eagle-security/eagle-security-hbase-web/src/main/java/org/apache/eagle/service/security/hbase/dao/HbaseMetadataAccessConfigDAOImpl.java
index 56090c5..1378032 100644
--- a/eagle-security/eagle-security-hbase-web/src/main/java/org/apache/eagle/service/security/hbase/dao/HbaseMetadataAccessConfigDAOImpl.java
+++ b/eagle-security/eagle-security-hbase-web/src/main/java/org/apache/eagle/service/security/hbase/dao/HbaseMetadataAccessConfigDAOImpl.java
@@ -50,8 +50,7 @@ public class HbaseMetadataAccessConfigDAOImpl {
         /* parameters are: query, startTime, endTime, pageSzie, startRowkey, treeAgg, timeSeries, intervalmin, top, filterIfMissing,
         * parallel, metricName*/
         String queryFormat = "AlertDataSourceService[@dataSource=\"hbaseSecurityLog\" AND @site=\"%s\"]{*}";
-        ListQueryAPIResponseEntity ret = resource.listQuery(String.format(queryFormat, site), null, null, Integer.MAX_VALUE, null, false, false, 0L, 0, false,
-                0, null);
+        ListQueryAPIResponseEntity ret = resource.listQuery(String.format(queryFormat, site), null, null, Integer.MAX_VALUE, null, false, false, 0L, 0, false, 0, null);
         List<AlertDataSourceEntity> list = (List<AlertDataSourceEntity>) ret.getObj();
         if(list == null || list.size() ==0)
             throw new BadMetadataAccessConfigException("config is empty for site " + site);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/FileSensitivityDataJoinExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/FileSensitivityDataJoinExecutor.java b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/FileSensitivityDataJoinExecutor.java
index a988432..60b7b57 100644
--- a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/FileSensitivityDataJoinExecutor.java
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/FileSensitivityDataJoinExecutor.java
@@ -22,10 +22,9 @@ import org.apache.eagle.datastream.JavaStormStreamExecutor2;
 import org.apache.eagle.datastream.Tuple2;
 import org.apache.eagle.security.auditlog.timer.FileSensitivityPollingJob;
 import org.apache.eagle.security.auditlog.util.SimplifyPath;
-import org.apache.eagle.security.hdfs.entity.FileSensitivityAPIEntity;
+import org.apache.eagle.security.entity.FileSensitivityAPIEntity;
 import org.apache.eagle.security.util.ExternalDataCache;
 import org.apache.eagle.security.util.ExternalDataJoiner;
-import org.apache.storm.guava.collect.ImmutableMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsUserCommandPatternByDBImpl.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsUserCommandPatternByDBImpl.java b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsUserCommandPatternByDBImpl.java
index f8fe0e8..e3123e3 100644
--- a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsUserCommandPatternByDBImpl.java
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsUserCommandPatternByDBImpl.java
@@ -17,7 +17,7 @@
 package org.apache.eagle.security.auditlog;
 
 import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
-import org.apache.eagle.security.hdfs.entity.HdfsUserCommandPatternEntity;
+import org.apache.eagle.security.entity.HdfsUserCommandPatternEntity;
 import org.apache.eagle.service.client.EagleServiceConnector;
 import org.apache.eagle.service.client.IEagleServiceClient;
 import org.apache.eagle.service.client.impl.EagleServiceClientImpl;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsUserCommandPatternByFileImpl.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsUserCommandPatternByFileImpl.java b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsUserCommandPatternByFileImpl.java
index bdb63d5..8a5168d 100644
--- a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsUserCommandPatternByFileImpl.java
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsUserCommandPatternByFileImpl.java
@@ -18,7 +18,7 @@
  */
 package org.apache.eagle.security.auditlog;
 
-import org.apache.eagle.security.hdfs.entity.HdfsUserCommandPatternEntity;
+import org.apache.eagle.security.entity.HdfsUserCommandPatternEntity;
 import org.codehaus.jackson.map.ObjectMapper;
 import org.codehaus.jackson.type.TypeReference;
 import org.slf4j.Logger;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsUserCommandPatternDAO.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsUserCommandPatternDAO.java b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsUserCommandPatternDAO.java
index e1bcf93..a06616b 100644
--- a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsUserCommandPatternDAO.java
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsUserCommandPatternDAO.java
@@ -19,7 +19,7 @@
 
 package org.apache.eagle.security.auditlog;
 
-import org.apache.eagle.security.hdfs.entity.HdfsUserCommandPatternEntity;
+import org.apache.eagle.security.entity.HdfsUserCommandPatternEntity;
 
 import java.util.List;
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsUserCommandReassembler.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsUserCommandReassembler.java b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsUserCommandReassembler.java
index 1f56be9..ea1ea8d 100644
--- a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsUserCommandReassembler.java
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsUserCommandReassembler.java
@@ -24,7 +24,7 @@ import org.apache.eagle.policy.siddhi.SiddhiStreamMetadataUtils;
 import org.apache.eagle.datastream.Collector;
 import org.apache.eagle.datastream.JavaStormStreamExecutor2;
 import org.apache.eagle.datastream.Tuple2;
-import org.apache.eagle.security.hdfs.entity.HdfsUserCommandPatternEntity;
+import org.apache.eagle.security.entity.HdfsUserCommandPatternEntity;
 import org.apache.eagle.service.client.EagleServiceConnector;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/IPZoneDataJoinExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/IPZoneDataJoinExecutor.java b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/IPZoneDataJoinExecutor.java
index 5fbcfd8..b98f0cf 100644
--- a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/IPZoneDataJoinExecutor.java
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/IPZoneDataJoinExecutor.java
@@ -21,10 +21,9 @@ import org.apache.eagle.datastream.Collector;
 import org.apache.eagle.datastream.JavaStormStreamExecutor2;
 import org.apache.eagle.datastream.Tuple2;
 import org.apache.eagle.security.auditlog.timer.IPZonePollingJob;
-import org.apache.eagle.security.hdfs.entity.IPZoneEntity;
+import org.apache.eagle.security.entity.IPZoneEntity;
 import org.apache.eagle.security.util.ExternalDataCache;
 import org.apache.eagle.security.util.ExternalDataJoiner;
-import org.apache.storm.guava.collect.ImmutableMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/timer/FileSensitivityPollingJob.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/timer/FileSensitivityPollingJob.java b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/timer/FileSensitivityPollingJob.java
index 371c706..a28e532 100644
--- a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/timer/FileSensitivityPollingJob.java
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/timer/FileSensitivityPollingJob.java
@@ -21,8 +21,7 @@ import java.util.Map;
 
 import org.apache.eagle.common.config.EagleConfigConstants;
 import org.apache.eagle.security.util.ExternalDataCache;
-import org.apache.eagle.security.hdfs.entity.FileSensitivityAPIEntity;
-import org.apache.eagle.security.util.ExternalDataCache;
+import org.apache.eagle.security.entity.FileSensitivityAPIEntity;
 import org.quartz.Job;
 import org.quartz.JobDataMap;
 import org.quartz.JobExecutionContext;
@@ -31,7 +30,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
-import org.apache.eagle.security.hdfs.entity.FileSensitivityAPIEntity;
 import org.apache.eagle.service.client.IEagleServiceClient;
 import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
 import com.google.common.base.Function;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/timer/IPZonePollingJob.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/timer/IPZonePollingJob.java b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/timer/IPZonePollingJob.java
index d20a8dd..ca6dc16 100644
--- a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/timer/IPZonePollingJob.java
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/timer/IPZonePollingJob.java
@@ -21,7 +21,6 @@ import java.util.Map;
 
 import org.apache.eagle.common.config.EagleConfigConstants;
 import org.apache.eagle.security.util.ExternalDataCache;
-import org.apache.eagle.security.util.ExternalDataCache;
 import org.quartz.Job;
 import org.quartz.JobDataMap;
 import org.quartz.JobExecutionContext;
@@ -30,7 +29,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
-import org.apache.eagle.security.hdfs.entity.IPZoneEntity;
+import org.apache.eagle.security.entity.IPZoneEntity;
 import org.apache.eagle.service.client.IEagleServiceClient;
 import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
 import com.google.common.base.Function;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-security/eagle-security-hdfs-auditlog/src/test/java/org/apache/eagle/security/auditlog/TestHdfsUserCommandPatternByDB.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/test/java/org/apache/eagle/security/auditlog/TestHdfsUserCommandPatternByDB.java b/eagle-security/eagle-security-hdfs-auditlog/src/test/java/org/apache/eagle/security/auditlog/TestHdfsUserCommandPatternByDB.java
index 03a6507..57be7ea 100644
--- a/eagle-security/eagle-security-hdfs-auditlog/src/test/java/org/apache/eagle/security/auditlog/TestHdfsUserCommandPatternByDB.java
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/test/java/org/apache/eagle/security/auditlog/TestHdfsUserCommandPatternByDB.java
@@ -21,7 +21,7 @@ package org.apache.eagle.security.auditlog;
 
 import java.util.List;
 
-import org.apache.eagle.security.hdfs.entity.HdfsUserCommandPatternEntity;
+import org.apache.eagle.security.entity.HdfsUserCommandPatternEntity;
 import org.apache.eagle.service.client.EagleServiceConnector;
 import org.junit.Ignore;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-security/eagle-security-hdfs-auditlog/src/test/java/org/apache/eagle/security/auditlog/TestHdfsUserCommandPatternByFile.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/test/java/org/apache/eagle/security/auditlog/TestHdfsUserCommandPatternByFile.java b/eagle-security/eagle-security-hdfs-auditlog/src/test/java/org/apache/eagle/security/auditlog/TestHdfsUserCommandPatternByFile.java
index 2886372..4076185 100644
--- a/eagle-security/eagle-security-hdfs-auditlog/src/test/java/org/apache/eagle/security/auditlog/TestHdfsUserCommandPatternByFile.java
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/test/java/org/apache/eagle/security/auditlog/TestHdfsUserCommandPatternByFile.java
@@ -19,8 +19,7 @@
 
 package org.apache.eagle.security.auditlog;
 
-import org.apache.eagle.security.hdfs.entity.HdfsUserCommandPatternEntity;
-import org.apache.eagle.service.client.EagleServiceConnector;
+import org.apache.eagle.security.entity.HdfsUserCommandPatternEntity;
 import org.junit.Test;
 
 import java.util.List;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-security/eagle-security-hdfs-web/src/main/java/org/apache/eagle/service/security/hdfs/HDFSResourceSensitivityDataJoiner.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-web/src/main/java/org/apache/eagle/service/security/hdfs/HDFSResourceSensitivityDataJoiner.java b/eagle-security/eagle-security-hdfs-web/src/main/java/org/apache/eagle/service/security/hdfs/HDFSResourceSensitivityDataJoiner.java
index 8ffe3a0..b74a185 100644
--- a/eagle-security/eagle-security-hdfs-web/src/main/java/org/apache/eagle/service/security/hdfs/HDFSResourceSensitivityDataJoiner.java
+++ b/eagle-security/eagle-security-hdfs-web/src/main/java/org/apache/eagle/service/security/hdfs/HDFSResourceSensitivityDataJoiner.java
@@ -25,7 +25,7 @@ import java.util.Set;
 
 import org.apache.hadoop.fs.FileStatus;
 
-import org.apache.eagle.security.hdfs.entity.FileStatusEntity;
+import org.apache.eagle.security.entity.FileStatusEntity;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-security/eagle-security-hdfs-web/src/main/java/org/apache/eagle/service/security/hdfs/HDFSResourceSensitivityService.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-web/src/main/java/org/apache/eagle/service/security/hdfs/HDFSResourceSensitivityService.java b/eagle-security/eagle-security-hdfs-web/src/main/java/org/apache/eagle/service/security/hdfs/HDFSResourceSensitivityService.java
index c1dd968..283e25a 100644
--- a/eagle-security/eagle-security-hdfs-web/src/main/java/org/apache/eagle/service/security/hdfs/HDFSResourceSensitivityService.java
+++ b/eagle-security/eagle-security-hdfs-web/src/main/java/org/apache/eagle/service/security/hdfs/HDFSResourceSensitivityService.java
@@ -23,7 +23,7 @@ import java.util.Map;
 
 import org.apache.eagle.log.entity.ListQueryAPIResponseEntity;
 import org.apache.eagle.service.generic.ListQueryResource;
-import org.apache.eagle.security.hdfs.entity.FileSensitivityAPIEntity;
+import org.apache.eagle.security.entity.FileSensitivityAPIEntity;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-security/eagle-security-hdfs-web/src/main/java/org/apache/eagle/service/security/hdfs/rest/HDFSResourceWebResource.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-web/src/main/java/org/apache/eagle/service/security/hdfs/rest/HDFSResourceWebResource.java b/eagle-security/eagle-security-hdfs-web/src/main/java/org/apache/eagle/service/security/hdfs/rest/HDFSResourceWebResource.java
index e65d492..588979f 100644
--- a/eagle-security/eagle-security-hdfs-web/src/main/java/org/apache/eagle/service/security/hdfs/rest/HDFSResourceWebResource.java
+++ b/eagle-security/eagle-security-hdfs-web/src/main/java/org/apache/eagle/service/security/hdfs/rest/HDFSResourceWebResource.java
@@ -36,7 +36,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.eagle.service.security.hdfs.HDFSFileSystem;
 import org.apache.eagle.service.security.hdfs.HDFSResourceUtils;
-import org.apache.eagle.security.hdfs.entity.FileStatusEntity;
+import org.apache.eagle.security.entity.FileStatusEntity;
 
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-security/eagle-security-hdfs-web/src/main/java/org/apache/eagle/service/security/hdfs/rest/HDFSResourceWebResponse.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-web/src/main/java/org/apache/eagle/service/security/hdfs/rest/HDFSResourceWebResponse.java b/eagle-security/eagle-security-hdfs-web/src/main/java/org/apache/eagle/service/security/hdfs/rest/HDFSResourceWebResponse.java
index 817a8b9..dbdefbf 100644
--- a/eagle-security/eagle-security-hdfs-web/src/main/java/org/apache/eagle/service/security/hdfs/rest/HDFSResourceWebResponse.java
+++ b/eagle-security/eagle-security-hdfs-web/src/main/java/org/apache/eagle/service/security/hdfs/rest/HDFSResourceWebResponse.java
@@ -16,7 +16,7 @@
  */
 package org.apache.eagle.service.security.hdfs.rest;
 
-import org.apache.eagle.security.hdfs.entity.FileStatusEntity;
+import org.apache.eagle.security.entity.FileStatusEntity;
 
 import java.util.List;
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-security/eagle-security-hive-web/src/main/java/org/apache/eagle/service/security/hive/dao/HiveSensitivityMetadataDAOImpl.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hive-web/src/main/java/org/apache/eagle/service/security/hive/dao/HiveSensitivityMetadataDAOImpl.java b/eagle-security/eagle-security-hive-web/src/main/java/org/apache/eagle/service/security/hive/dao/HiveSensitivityMetadataDAOImpl.java
index 2310120..e948e31 100644
--- a/eagle-security/eagle-security-hive-web/src/main/java/org/apache/eagle/service/security/hive/dao/HiveSensitivityMetadataDAOImpl.java
+++ b/eagle-security/eagle-security-hive-web/src/main/java/org/apache/eagle/service/security/hive/dao/HiveSensitivityMetadataDAOImpl.java
@@ -18,7 +18,7 @@ package org.apache.eagle.service.security.hive.dao;
 
 import org.apache.eagle.log.entity.ListQueryAPIResponseEntity;
 import org.apache.eagle.service.generic.ListQueryResource;
-import org.apache.eagle.security.hive.entity.HiveResourceSensitivityAPIEntity;
+import org.apache.eagle.security.entity.HiveResourceSensitivityAPIEntity;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-security/eagle-security-hive-web/src/main/java/org/apache/eagle/service/security/hive/res/HiveMetadataBrowseWebResource.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hive-web/src/main/java/org/apache/eagle/service/security/hive/res/HiveMetadataBrowseWebResource.java b/eagle-security/eagle-security-hive-web/src/main/java/org/apache/eagle/service/security/hive/res/HiveMetadataBrowseWebResource.java
index dc58c40..7139000 100644
--- a/eagle-security/eagle-security-hive-web/src/main/java/org/apache/eagle/service/security/hive/res/HiveMetadataBrowseWebResource.java
+++ b/eagle-security/eagle-security-hive-web/src/main/java/org/apache/eagle/service/security/hive/res/HiveMetadataBrowseWebResource.java
@@ -17,7 +17,7 @@
 package org.apache.eagle.service.security.hive.res;
 
 import org.apache.eagle.service.common.EagleExceptionWrapper;
-import org.apache.eagle.security.hive.entity.HiveResourceEntity;
+import org.apache.eagle.security.entity.HiveResourceEntity;
 import org.apache.eagle.service.security.hive.dao.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-security/eagle-security-hive-web/src/main/java/org/apache/eagle/service/security/hive/res/HiveMetadataBrowseWebResponse.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hive-web/src/main/java/org/apache/eagle/service/security/hive/res/HiveMetadataBrowseWebResponse.java b/eagle-security/eagle-security-hive-web/src/main/java/org/apache/eagle/service/security/hive/res/HiveMetadataBrowseWebResponse.java
index 66cf13a..cf9b39d 100644
--- a/eagle-security/eagle-security-hive-web/src/main/java/org/apache/eagle/service/security/hive/res/HiveMetadataBrowseWebResponse.java
+++ b/eagle-security/eagle-security-hive-web/src/main/java/org/apache/eagle/service/security/hive/res/HiveMetadataBrowseWebResponse.java
@@ -17,7 +17,7 @@
 package org.apache.eagle.service.security.hive.res;
 
 
-import org.apache.eagle.security.hive.entity.HiveResourceEntity;
+import org.apache.eagle.security.entity.HiveResourceEntity;
 
 import java.util.List;
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/sensitivity/HiveResourceSensitivityDataJoinExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/sensitivity/HiveResourceSensitivityDataJoinExecutor.java b/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/sensitivity/HiveResourceSensitivityDataJoinExecutor.java
index 7b886b2..56c694d 100644
--- a/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/sensitivity/HiveResourceSensitivityDataJoinExecutor.java
+++ b/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/sensitivity/HiveResourceSensitivityDataJoinExecutor.java
@@ -20,7 +20,7 @@ import com.typesafe.config.Config;
 import org.apache.eagle.datastream.Collector;
 import org.apache.eagle.datastream.JavaStormStreamExecutor2;
 import org.apache.eagle.datastream.Tuple2;
-import org.apache.eagle.security.hive.entity.HiveResourceSensitivityAPIEntity;
+import org.apache.eagle.security.entity.HiveResourceSensitivityAPIEntity;
 import org.apache.eagle.security.util.ExternalDataCache;
 import org.apache.eagle.security.util.ExternalDataJoiner;
 import org.slf4j.Logger;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/sensitivity/HiveResourceSensitivityPollingJob.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/sensitivity/HiveResourceSensitivityPollingJob.java b/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/sensitivity/HiveResourceSensitivityPollingJob.java
index e8fda53..ff7462c 100644
--- a/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/sensitivity/HiveResourceSensitivityPollingJob.java
+++ b/eagle-security/eagle-security-hive/src/main/java/org/apache/eagle/security/hive/sensitivity/HiveResourceSensitivityPollingJob.java
@@ -21,10 +21,9 @@ import com.google.common.collect.Maps;
 import org.apache.eagle.common.config.EagleConfigConstants;
 import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
 import org.apache.eagle.security.util.ExternalDataCache;
-import org.apache.eagle.security.util.ExternalDataCache;
 import org.apache.eagle.service.client.IEagleServiceClient;
 import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
-import org.apache.eagle.security.hive.entity.HiveResourceSensitivityAPIEntity;
+import org.apache.eagle.security.entity.HiveResourceSensitivityAPIEntity;
 import org.quartz.Job;
 import org.quartz.JobDataMap;
 import org.quartz.JobExecutionContext;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-topology-assembly/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-topology-assembly/pom.xml b/eagle-topology-assembly/pom.xml
index 2132bb3..d8753f7 100644
--- a/eagle-topology-assembly/pom.xml
+++ b/eagle-topology-assembly/pom.xml
@@ -59,6 +59,11 @@
             <artifactId>eagle-metric-collection</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>eagle</groupId>
+            <artifactId>eagle-stream-pipeline</artifactId>
+            <version>${project.version}</version>
+        </dependency>
     </dependencies>
 
     <build>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-webservice/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-webservice/pom.xml b/eagle-webservice/pom.xml
index 1f65842..faef5fa 100644
--- a/eagle-webservice/pom.xml
+++ b/eagle-webservice/pom.xml
@@ -175,6 +175,11 @@
 			<version>${project.version}</version>
 		</dependency>
 		<dependency>
+			<groupId>eagle</groupId>
+			<artifactId>eagle-stream-process-dsl</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+		<dependency>
 			<groupId>org.wso2.siddhi</groupId>
 			<artifactId>siddhi-core</artifactId>
 			<exclusions>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-webservice/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-webservice/src/main/resources/application.conf b/eagle-webservice/src/main/resources/application.conf
index 6b7c8bc..c67ea3c 100644
--- a/eagle-webservice/src/main/resources/application.conf
+++ b/eagle-webservice/src/main/resources/application.conf
@@ -16,10 +16,10 @@
 eagle{
 	service{
 		storage-type="hbase"
-		hbase-zookeeper-quorum="www.xyz.com"
+		hbase-zookeeper-quorum="sandbox.hortonworks.com"
 		hbase-zookeeper-property-clientPort=2181
-		zookeeper-znode-parent="/hbase",
+		zookeeper-znode-parent="/hbase-unsecure",
 		springActiveProfile="sandbox"
 		audit-enabled=true
 	}
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 24375a3..ca82337 100755
--- a/pom.xml
+++ b/pom.xml
@@ -197,6 +197,8 @@
         <log4j-over-slf4j.version>1.7.2</log4j-over-slf4j.version>
         <quartz.version>2.2.1</quartz.version>
         <scopt.version>3.3.0</scopt.version>
+        <akka.actor.version>2.3.14</akka.actor.version>
+        <reflections.version>0.9.10</reflections.version>
 
         <!-- Streaming -->
         <kafka.version>0.8.1.2.2.0.0-2041</kafka.version>
@@ -370,6 +372,12 @@
                 <version>${scopt.version}</version>
             </dependency>
 
+            <dependency>
+                <groupId>org.reflections</groupId>
+                <artifactId>reflections</artifactId>
+                <version>${reflections.version}</version>
+            </dependency>
+
             <!-- Serialization -->
             <dependency>
                 <groupId>com.fasterxml.jackson.core</groupId>


[2/4] incubator-eagle git commit: EAGLE-130 Eagle Pipeline DSL: Parser, Compiler, Runner

Posted by ha...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormTopologyExecutorImpl.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormTopologyExecutorImpl.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormTopologyExecutorImpl.scala
index 11ec46d..1f753d1 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormTopologyExecutorImpl.scala
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormTopologyExecutorImpl.scala
@@ -23,9 +23,11 @@ import backtype.storm.generated.StormTopology
 import backtype.storm.utils.Utils
 import backtype.storm.{Config, LocalCluster, StormSubmitter}
 import org.apache.eagle.datastream.core.AbstractTopologyExecutor
+import org.slf4j.LoggerFactory
 import org.yaml.snakeyaml.Yaml
 
 case class StormTopologyExecutorImpl(topology: StormTopology, config: com.typesafe.config.Config) extends AbstractTopologyExecutor {
+  val LOG = LoggerFactory.getLogger(classOf[StormTopologyExecutorImpl])
   @throws(classOf[Exception])
   def execute {
     val localMode: Boolean = config.getString("envContextConfig.mode").equalsIgnoreCase("local")
@@ -45,7 +47,10 @@ case class StormTopologyExecutorImpl(topology: StormTopology, config: com.typesa
           val stormConf = yaml.load(inputFileStream).asInstanceOf[java.util.LinkedHashMap[String, Object]]
           if(stormConf != null) conf.putAll(stormConf)
         } catch {
-          case _: Throwable => ()
+          case t: Throwable => {
+            LOG.error(s"Got example $t",t)
+            throw t
+          }
         } finally {
           if(inputFileStream != null) inputFileStream.close()
         }
@@ -54,8 +59,10 @@ case class StormTopologyExecutorImpl(topology: StormTopology, config: com.typesa
 
     val topologyName = config.getString("envContextConfig.topologyName")
     if (!localMode) {
+      LOG.info("Submitting as cluster mode")
       StormSubmitter.submitTopologyWithProgressBar(topologyName, conf, topology)
     } else {
+      LOG.info("Submitting as local mode")
       val cluster: LocalCluster = new LocalCluster
       cluster.submitTopology(topologyName, conf, topology)
       while(true) {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestExecutionEnvironment.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestExecutionEnvironment.scala
index b43f42e..ccd3deb 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestExecutionEnvironment.scala
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestExecutionEnvironment.scala
@@ -27,7 +27,7 @@ object TestExecutionEnvironment extends App{
   val env0 = ExecutionEnvironments.get[StormExecutionEnvironment]
   println(env0)
   val config = ConfigFactory.load()
-  val env1 = ExecutionEnvironments.get[StormExecutionEnvironment](config)
+  val env1 = ExecutionEnvironments.getWithConfig[StormExecutionEnvironment](config)
   println(env1)
   val env2 = ExecutionEnvironments.get[StormExecutionEnvironment](Array[String]("-D","key=value"))
   println(env2)

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestTypeSafedDSL.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestTypeSafedDSL.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestTypeSafedDSL.scala
index cf95304..7754765 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestTypeSafedDSL.scala
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/test/scala/org/apache/eagle/datastream/TestTypeSafedDSL.scala
@@ -18,6 +18,7 @@
 
 package org.apache.eagle.datastream
 
+import org.apache.eagle.datastream.core.StreamContext
 import org.apache.eagle.datastream.storm.StormExecutionEnvironment
 
 /**
@@ -51,8 +52,34 @@ object TestIterableWithGroupBy extends App {
   env.execute()
 }
 
+object TestIterableWithGroupByWithStreamContext extends App {
+  val stream = StreamContext(args)
+
+  val tuples = Seq(
+    Entity("a", 1),
+    Entity("a", 2),
+    Entity("a", 3),
+    Entity("b", 2),
+    Entity("c", 3),
+    Entity("d", 3)
+  )
+
+  stream.from(tuples)
+    .groupByKey(_.name)
+    .map(o => {o.inc += 2;o})
+    .filter(_.name != "b")
+    .filter(_.name != "c")
+    .groupByKey(o=>(o.name,o.value))
+    .map(o => (o.name,o))
+    .map(o => (o._1,o._2.value,o._2.inc))
+    .foreach(println)
+
+  stream.submit[StormExecutionEnvironment]
+}
+
 object TestIterableWithGroupByCircularly extends App{
   val env = ExecutionEnvironments.get[StormExecutionEnvironment](args)
+
   val tuples = Seq(
     Entity("a", 1),
     Entity("a", 2),
@@ -61,6 +88,7 @@ object TestIterableWithGroupByCircularly extends App{
     Entity("c", 3),
     Entity("d", 3)
   )
+
   env.from(tuples,recycle = true)
     .map(o => {o.inc += 2;o})
     .groupByKey(_.name)

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/dataproc/util/AbstractConfigOptionParser.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/dataproc/util/AbstractConfigOptionParser.java b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/dataproc/util/AbstractConfigOptionParser.java
index 19853ef..f40bb76 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/dataproc/util/AbstractConfigOptionParser.java
+++ b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/dataproc/util/AbstractConfigOptionParser.java
@@ -27,12 +27,12 @@ import java.util.Map;
  */
 public abstract class AbstractConfigOptionParser {
 
-    private final Options options;
+    // private final Options options;
     private final Parser parser;
 
     public AbstractConfigOptionParser(){
         parser = parser();
-        options = options();
+        //options = options();
     }
 
     /**
@@ -63,6 +63,6 @@ public abstract class AbstractConfigOptionParser {
     }
 
     public CommandLine parse(String[] arguments) throws ParseException {
-        return this.parser.parse(this.options,arguments);
+        return this.parser.parse(this.options(),arguments);
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/dataproc/util/ConfigOptionParser.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/dataproc/util/ConfigOptionParser.java b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/dataproc/util/ConfigOptionParser.java
index 0bd7559..bbd4e38 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/dataproc/util/ConfigOptionParser.java
+++ b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/java/org/apache/eagle/dataproc/util/ConfigOptionParser.java
@@ -40,7 +40,10 @@ public class ConfigOptionParser extends AbstractConfigOptionParser {
     @Override
     public Map<String,String> parseConfig(String[] arguments) throws ParseException {
         CommandLine cmd = parse(arguments);
+        return parseCommand(cmd);
+    }
 
+    protected Map<String,String> parseCommand(CommandLine cmd) throws ParseException {
         Map<String,String> result = new HashMap<>();
         if(cmd.hasOption(CONFIG_OPT_FLAG)){
             String[] values = cmd.getOptionValues(CONFIG_OPT_FLAG);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/org/apache/eagle/datastream/Collector.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/org/apache/eagle/datastream/Collector.scala b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/org/apache/eagle/datastream/Collector.scala
index 2532063..5361001 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/org/apache/eagle/datastream/Collector.scala
+++ b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/org/apache/eagle/datastream/Collector.scala
@@ -16,6 +16,6 @@
  */
 package org.apache.eagle.datastream
 
-trait Collector[R] {
+trait Collector[-R] {
   def collect(r : R);
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/org/apache/eagle/datastream/FlatMapper.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/org/apache/eagle/datastream/FlatMapper.scala b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/org/apache/eagle/datastream/FlatMapper.scala
index 8569ae5..ddea46f 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/org/apache/eagle/datastream/FlatMapper.scala
+++ b/eagle-core/eagle-data-process/eagle-stream-process-base/src/main/scala/org/apache/eagle/datastream/FlatMapper.scala
@@ -18,4 +18,8 @@ package org.apache.eagle.datastream
 
 trait FlatMapper[T] extends Serializable {
   def flatMap(input : Seq[AnyRef], collector : Collector[T])
+}
+
+case class FlatMapperWrapper[T](func:(Any,Collector[T]) => Unit) extends FlatMapper[T]{
+  override def flatMap(input: Seq[AnyRef], collector: Collector[T]): Unit = func(input,collector)
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-core/eagle-data-process/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/pom.xml b/eagle-core/eagle-data-process/pom.xml
index 1b9fb57..90b125b 100644
--- a/eagle-core/eagle-data-process/pom.xml
+++ b/eagle-core/eagle-data-process/pom.xml
@@ -15,6 +15,7 @@
   ~ See the License for the specific language governing permissions and
   ~ limitations under the License.
   -->
+
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <modelVersion>4.0.0</modelVersion>
     <parent>
@@ -29,6 +30,7 @@
         <module>eagle-stream-process-base</module>
     	<module>eagle-storm-jobrunning-spout</module>
     	<module>eagle-job-common</module>
-    <module>eagle-stream-process-api</module>
-  </modules>
+        <module>eagle-stream-process-api</module>
+        <module>eagle-stream-pipeline</module>
+    </modules>
 </project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertExecutorEntity.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertExecutorEntity.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertExecutorEntity.java
index 97f18f7..ca9d334 100644
--- a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertExecutorEntity.java
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertExecutorEntity.java
@@ -22,6 +22,9 @@ import org.apache.eagle.log.entity.meta.*;
 import org.codehaus.jackson.annotate.JsonIgnoreProperties;
 import org.codehaus.jackson.map.annotate.JsonSerialize;
 
+/**
+ * Stream and Alert executor Id mapping should be automatically created by topology definition
+ */
 @JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
 @Table("alertExecutor")
 @ColumnFamily("f")

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiPolicyEvaluator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiPolicyEvaluator.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiPolicyEvaluator.java
index 024e400..13db689 100644
--- a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiPolicyEvaluator.java
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiPolicyEvaluator.java
@@ -42,16 +42,16 @@ import java.util.*;
  * during this time, synchronization is important
  */
 public class SiddhiPolicyEvaluator<T extends AbstractPolicyDefinitionEntity, K> implements PolicyEvaluator<T>{
-	
+
 	private final static Logger LOG = LoggerFactory.getLogger(SiddhiPolicyEvaluator.class);
-	
+
 	private volatile SiddhiRuntime siddhiRuntime;
 	private String[] sourceStreams;
 	private boolean needValidation;
 	private String policyId;
 	private Config config;
 	private final static String EXECUTION_PLAN_NAME = "query";
-	
+
 	/**
 	 * everything dependent on policyDef should be together and switched in runtime
 	 */
@@ -63,24 +63,24 @@ public class SiddhiPolicyEvaluator<T extends AbstractPolicyDefinitionEntity, K>
 		List<String> outputFields;
 		String executionPlanName;
 	}
-	
+
 	public SiddhiPolicyEvaluator(Config config, String policyName, AbstractPolicyDefinition policyDef, String[] sourceStreams){
 		this(config, policyName, policyDef, sourceStreams, false);
 	}
-	
+
 	public SiddhiPolicyEvaluator(Config config, String policyId, AbstractPolicyDefinition policyDef, String[] sourceStreams, boolean needValidation){
 		this.config = config;
 		this.policyId = policyId;
 		this.needValidation = needValidation;
-		this.sourceStreams = sourceStreams; 
+		this.sourceStreams = sourceStreams;
 		init(policyDef);
 	}
-	
-	public void init(AbstractPolicyDefinition policyDef){			
+
+	public void init(AbstractPolicyDefinition policyDef){
 		siddhiRuntime = createSiddhiRuntime((SiddhiPolicyDefinition)policyDef);
 	}
 
-	public static String addContextFieldIfNotExist(String expression) {		
+	public static String addContextFieldIfNotExist(String expression) {
 		// select fieldA, fieldB --> select eagleAlertContext, fieldA, fieldB
 		int pos = expression.indexOf("select ") + 7;
 		int index = pos;
@@ -103,7 +103,7 @@ public class SiddhiPolicyEvaluator<T extends AbstractPolicyDefinitionEntity, K>
 
 	private SiddhiRuntime createSiddhiRuntime(SiddhiPolicyDefinition policyDef){
 		SiddhiManager siddhiManager = new SiddhiManager();
-		Map<String, InputHandler> siddhiInputHandlers = new HashMap<String, InputHandler>();
+		Map<String, InputHandler> siddhiInputHandlers = new HashMap<>();
 
 		// compose execution plan sql
 		String executionPlan = policyDef.getExpression();
@@ -120,13 +120,14 @@ public class SiddhiPolicyEvaluator<T extends AbstractPolicyDefinitionEntity, K>
 		}
 
 		ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(executionPlan);
-		
-		for(String sourceStream : sourceStreams){			
+
+		for(String sourceStream : sourceStreams){
 			siddhiInputHandlers.put(sourceStream, executionPlanRuntime.getInputHandler(sourceStream));
 		}
+
 		executionPlanRuntime.start();
 
-		QueryCallback callback = new SiddhiQueryCallbackImpl<T, K>(config, this);		
+		QueryCallback callback = new SiddhiQueryCallbackImpl<T, K>(config, this);
 
 		LOG.info("Siddhi query: " + executionPlan);
 		executionPlanRuntime.addCallback(EXECUTION_PLAN_NAME, callback);
@@ -137,7 +138,7 @@ public class SiddhiPolicyEvaluator<T extends AbstractPolicyDefinitionEntity, K>
 	        field.setAccessible(true);
 	        Query query = (Query)field.get(callback);
 	        List<OutputAttribute> list = query.getSelector().getSelectionList();
-	        for (OutputAttribute output : list) {	        	
+	        for (OutputAttribute output : list) {
 	        	outputFields.add(output.getRename());
 	        }
 		}
@@ -153,7 +154,7 @@ public class SiddhiPolicyEvaluator<T extends AbstractPolicyDefinitionEntity, K>
 		runtime.executionPlanName = executionPlanRuntime.getName();
 		return runtime;
 	}
-	
+
 	/**
 	 * 1. input has 3 fields, first is siddhi context, second is streamName, the last one is map of attribute name/value
 	 * 2. runtime check for input data (This is very expensive, so we ignore for now)
@@ -173,8 +174,13 @@ public class SiddhiPolicyEvaluator<T extends AbstractPolicyDefinitionEntity, K>
 			//insert siddhiAlertContext into the first field
 			List<Object> input = new ArrayList<>();
 			input.add(siddhiAlertContext);
+			// input.add(streamName);
 			putAttrsIntoInputStream(input, streamName, map);
-			siddhiRuntime.siddhiInputHandlers.get(streamName).send(input.toArray(new Object[0]));
+            try {
+                siddhiRuntime.siddhiInputHandlers.get(streamName).send(input.toArray(new Object[0]));
+            }catch (InterruptedException ex){
+                LOG.error("Got exception "+ex.getMessage(),ex);
+            }
 		}
 	}
 
@@ -218,7 +224,7 @@ public class SiddhiPolicyEvaluator<T extends AbstractPolicyDefinitionEntity, K>
 	public void onPolicyUpdate(T newAlertDef) {
 		AbstractPolicyDefinition policyDef = null;
 		try {
-			policyDef = JsonSerDeserUtils.deserialize(newAlertDef.getPolicyDef(), 
+			policyDef = JsonSerDeserUtils.deserialize(newAlertDef.getPolicyDef(),
 					AbstractPolicyDefinition.class, PolicyManager.getInstance().getPolicyModules(newAlertDef.getTags().get(Constants.POLICY_TYPE)));
 		}
 		catch (Exception ex) {
@@ -230,7 +236,7 @@ public class SiddhiPolicyEvaluator<T extends AbstractPolicyDefinitionEntity, K>
 			previous.siddhiManager.getExecutionPlanRuntime(previous.executionPlanName).shutdown();
 		}
 	}
-	
+
 	@Override
 	public void onPolicyDelete(){
 		synchronized(siddhiRuntime){
@@ -239,12 +245,12 @@ public class SiddhiPolicyEvaluator<T extends AbstractPolicyDefinitionEntity, K>
 			LOG.info("Siddhi execution plan " + siddhiRuntime.executionPlanName + " is successfully shutdown ");
 		}
 	}
-	
+
 	@Override
 	public String toString(){
 		return siddhiRuntime.policyDef.toString();
 	}
-	
+
 	public String[] getStreamNames() {
 		return sourceStreams;
 	}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiStreamMetadataUtils.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiStreamMetadataUtils.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiStreamMetadataUtils.java
index f42a021..d14cf83 100644
--- a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiStreamMetadataUtils.java
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/siddhi/SiddhiStreamMetadataUtils.java
@@ -35,7 +35,7 @@ public class SiddhiStreamMetadataUtils {
 	public static SortedMap<String, AlertStreamSchemaEntity> getAttrMap(String streamName) {
 		SortedMap<String, AlertStreamSchemaEntity> map = StreamMetadataManager.getInstance().getMetadataEntityMapForStream(streamName);
 		if(map == null || map.size() == 0){
-			throw new IllegalStateException("alert stream schema should never be empty");
+			throw new IllegalStateException("Alert stream schema should never be empty");
 		}
 		return map;
 	}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-core/eagle-policy/eagle-policy-base/src/main/resources/eagle.siddhiext
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/resources/eagle.siddhiext b/eagle-core/eagle-policy/eagle-policy-base/src/main/resources/eagle.siddhiext
index 1b1183c..2671c31 100644
--- a/eagle-core/eagle-policy/eagle-policy-base/src/main/resources/eagle.siddhiext
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/resources/eagle.siddhiext
@@ -1 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
 externalTimeBatch=org.apache.eagle.policy.siddhi.extension.ExternalTimeBatchWindowProcessor
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-core/eagle-policy/eagle-policy-base/src/main/resources/str.siddhiext
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/resources/str.siddhiext b/eagle-core/eagle-policy/eagle-policy-base/src/main/resources/str.siddhiext
index 963f311..8c1672b 100644
--- a/eagle-core/eagle-policy/eagle-policy-base/src/main/resources/str.siddhiext
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/resources/str.siddhiext
@@ -1,11 +1,12 @@
 #
-# Copyright (c) 2015, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
 #
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
+#    http://www.apache.org/licenses/LICENSE-2.0
 #
 # Unless required by applicable law or agreed to in writing, software
 # distributed under the License is distributed on an "AS IS" BASIS,

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-core/eagle-query/eagle-storage-jdbc/src/test/java/org/apache/eagle/storage/jdbc/TestJdbcStorage.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-storage-jdbc/src/test/java/org/apache/eagle/storage/jdbc/TestJdbcStorage.java b/eagle-core/eagle-query/eagle-storage-jdbc/src/test/java/org/apache/eagle/storage/jdbc/TestJdbcStorage.java
index acc8d43..43b3998 100644
--- a/eagle-core/eagle-query/eagle-storage-jdbc/src/test/java/org/apache/eagle/storage/jdbc/TestJdbcStorage.java
+++ b/eagle-core/eagle-query/eagle-storage-jdbc/src/test/java/org/apache/eagle/storage/jdbc/TestJdbcStorage.java
@@ -65,7 +65,7 @@ public class TestJdbcStorage {
         Assert.assertNotNull(result);
     }
 
-    //@Test
+    @Test
     public void testReadByComplexQuery() throws QueryCompileException, IOException {
         RawQuery rawQuery = new RawQuery();
         rawQuery.setQuery("TestTimeSeriesAPIEntity[@cluster=\"cluster\" AND @field4 > 1000 AND @field7 CONTAINS \"subtext\" OR @jobID =\"jobID\" ]{@field1,@field2}");

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-samples/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-samples/pom.xml b/eagle-samples/pom.xml
index 48935d4..f9fedc1 100644
--- a/eagle-samples/pom.xml
+++ b/eagle-samples/pom.xml
@@ -1,20 +1,21 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <!--
-  Licensed to the Apache Software Foundation (ASF) under one or more
-  contributor license agreements.  See the NOTICE file distributed with
-  this work for additional information regarding copyright ownership.
-  The ASF licenses this file to You under the Apache License, Version 2.0
-  (the "License"); you may not use this file except in compliance with
-  the License.  You may obtain a copy of the License at
-
-     http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
--->
+<<<<<<< HEAD
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
 <project xmlns="http://maven.apache.org/POM/4.0.0"
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/entity/FileSensitivityAPIEntity.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/entity/FileSensitivityAPIEntity.java b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/entity/FileSensitivityAPIEntity.java
new file mode 100644
index 0000000..9eef192
--- /dev/null
+++ b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/entity/FileSensitivityAPIEntity.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.security.entity;
+
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.meta.Column;
+import org.apache.eagle.log.entity.meta.ColumnFamily;
+import org.apache.eagle.log.entity.meta.Prefix;
+import org.apache.eagle.log.entity.meta.Service;
+import org.apache.eagle.log.entity.meta.Table;
+import org.apache.eagle.log.entity.meta.Tags;
+import org.apache.eagle.log.entity.meta.TimeSeries;
+
+@JsonSerialize(include =  JsonSerialize.Inclusion.NON_NULL)
+@Table("fileSensitivity")
+@ColumnFamily("f")
+@Prefix("fileSensitivity")
+@Service("FileSensitivityService")
+@TimeSeries(false)
+@Tags({"site", "filedir"})
+public class FileSensitivityAPIEntity extends TaggedLogAPIEntity{
+	/**
+	 * sensitivityType can be multi-value attribute, and values can be separated by "|"
+	 */
+	@Column("a")
+	private String sensitivityType;
+
+	public String getSensitivityType() {
+		return sensitivityType;
+	}
+
+	public void setSensitivityType(String sensitivityType) {
+		this.sensitivityType = sensitivityType;
+		valueChanged("sensitivityType");
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/entity/FileStatusEntity.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/entity/FileStatusEntity.java b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/entity/FileStatusEntity.java
new file mode 100644
index 0000000..6541c8a
--- /dev/null
+++ b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/entity/FileStatusEntity.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.security.entity;
+
+import java.io.IOException;
+import java.util.Set;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+
+public class FileStatusEntity {
+    //private Path path;
+    private long length;
+    private boolean isdir;
+    private short block_replication;
+    private long blocksize;
+    private long modification_time;
+
+    private long access_time;
+    private FsPermission permission;
+    private String owner;
+    private String group;
+    private Path symlink;
+    private String resource;
+    private String sensitiveType;
+    private Set<String> childSensitiveTypes;
+
+    public FileStatusEntity(){
+
+    }
+
+    public FileStatusEntity(FileStatus status) throws IOException {
+        //this.path = status.getPath();
+        this.length = status.getLen();
+        this.isdir = status.isDirectory();
+        this.block_replication = status.getReplication();
+        this.blocksize = status.getBlockSize();
+        this.modification_time = status.getModificationTime();
+        this.access_time = status.getAccessTime();
+        this.permission = status.getPermission();
+        this.owner = status.getOwner();
+        this.group = status.getGroup();
+        if(status.isSymlink()) {
+            this.symlink = status.getSymlink();
+        }
+    }
+
+    public long getAccess_time() {
+        return access_time;
+    }
+
+    public String getResource() {
+        return resource;
+    }
+
+    public void setResource(String resource) {
+        this.resource = resource;
+    }
+
+    public void setAccess_time(long access_time) {
+        this.access_time = access_time;
+
+    }
+
+    //public Path getPath() {
+    //    return path;
+    //}
+
+    //public void setPath(Path path) {
+    //    this.path = path;
+    //}
+
+    public long getLength() {
+        return length;
+    }
+
+    public void setLength(long length) {
+        this.length = length;
+    }
+
+    public boolean isIsdir() {
+        return isdir;
+    }
+
+    public void setIsdir(boolean isdir) {
+        this.isdir = isdir;
+    }
+
+    public short getBlock_replication() {
+        return block_replication;
+    }
+
+    public void setBlock_replication(short block_replication) {
+        this.block_replication = block_replication;
+    }
+
+    public long getBlocksize() {
+        return blocksize;
+    }
+
+    public void setBlocksize(long blocksize) {
+        this.blocksize = blocksize;
+    }
+
+    public long getModification_time() {
+        return modification_time;
+    }
+
+    public void setModification_time(long modification_time) {
+        this.modification_time = modification_time;
+    }
+
+    public FsPermission getPermission() {
+        return permission;
+    }
+
+    public void setPermission(FsPermission permission) {
+        this.permission = permission;
+    }
+
+    public String getOwner() {
+        return owner;
+    }
+
+    public void setOwner(String owner) {
+        this.owner = owner;
+    }
+
+    public String getGroup() {
+        return group;
+    }
+
+    public void setGroup(String group) {
+        this.group = group;
+    }
+
+    public Path getSymlink() {
+        return symlink;
+    }
+
+    public void setSymlink(Path symlink) {
+        this.symlink = symlink;
+    }
+
+    public String getSensitiveType() {
+        return sensitiveType;
+    }
+
+    public void setSensitiveType(String sensitiveType) {
+        this.sensitiveType = sensitiveType;
+    }
+
+    public Set<String> getChildSensitiveTypes() {
+        return childSensitiveTypes;
+    }
+
+    public void setChildSensitiveTypes(Set<String> childSensitiveTypes) {
+        this.childSensitiveTypes = childSensitiveTypes;
+    }
+    
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/entity/HbaseResourceEntity.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/entity/HbaseResourceEntity.java b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/entity/HbaseResourceEntity.java
new file mode 100644
index 0000000..deb8364
--- /dev/null
+++ b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/entity/HbaseResourceEntity.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.security.entity;
+
+import com.google.common.base.Objects;
+
+import java.io.Serializable;
+import java.util.Set;
+
+public class HbaseResourceEntity implements Serializable {
+    private String resource;
+    private String namespace;
+    private String table;
+    private String columnFamily;
+    private String sensitiveType;
+    private Set<String> childSensitiveTypes;
+
+
+
+    public HbaseResourceEntity(String resource, String ns, String table, String cf, String sensitiveType, Set<String> childSensitiveTypes) {
+        this.resource = resource;
+        this.namespace = ns;
+        this.table = table;
+        this.columnFamily = cf;
+        this.sensitiveType = sensitiveType;
+        this.childSensitiveTypes = childSensitiveTypes;
+    }
+
+    public String getResource() {
+        return resource;
+    }
+
+    public void setResource(String resource) {
+        this.resource = resource;
+    }
+
+    public String getNamespace() {
+        return namespace;
+    }
+
+    public void setNamespace(String namespace) {
+        this.namespace = namespace;
+    }
+
+    public String getTable() {
+        return table;
+    }
+
+    public void setTable(String table) {
+        this.table = table;
+    }
+
+    public String getColumnFamily() {
+        return columnFamily;
+    }
+
+    public void setColumnFamily(String columnFamily) {
+        this.columnFamily = columnFamily;
+    }
+
+    public String getSensitiveType() {
+        return sensitiveType;
+    }
+
+    public void setSensitiveType(String sensitiveType) {
+        this.sensitiveType = sensitiveType;
+    }
+
+    public Set<String> getChildSensitiveTypes() {
+        return childSensitiveTypes;
+    }
+
+    public void setChildSensitiveTypes(Set<String> childSensitiveTypes) {
+        this.childSensitiveTypes = childSensitiveTypes;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (obj == null || getClass() != obj.getClass()) {
+            return false;
+        }
+        final HbaseResourceEntity other = (HbaseResourceEntity) obj;
+        return Objects.equal(this.resource, other.resource)
+                && this.namespace.equals(other.namespace)
+                && this.table.equals(other.table)
+                && this.columnFamily.equals(other.columnFamily)
+                && this.sensitiveType.equals(other.sensitiveType)
+                && this.childSensitiveTypes.containsAll(other.childSensitiveTypes);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/entity/HbaseResourceSensitivityAPIEntity.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/entity/HbaseResourceSensitivityAPIEntity.java b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/entity/HbaseResourceSensitivityAPIEntity.java
new file mode 100644
index 0000000..551967d
--- /dev/null
+++ b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/entity/HbaseResourceSensitivityAPIEntity.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.security.entity;
+
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.meta.*;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
+@Table("hbaseResourceSensitivity")
+@ColumnFamily("f")
+@Prefix("hbaseResourceSensitivity")
+@Service("HbaseResourceSensitivityService")
+@TimeSeries(false)
+@Tags({"site", "hbaseResource"})
+public class HbaseResourceSensitivityAPIEntity extends TaggedLogAPIEntity {
+	private static final long serialVersionUID = 2L;
+	/**
+     * sensitivityType can be multi-value attribute, and values can be separated by "|"
+     */
+    @Column("a")
+    private String sensitivityType;
+
+    public String getSensitivityType() {
+        return sensitivityType;
+    }
+
+    public void setSensitivityType(String sensitivityType) {
+        this.sensitivityType = sensitivityType;
+        valueChanged("sensitivityType");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/entity/HdfsUserCommandPatternEntity.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/entity/HdfsUserCommandPatternEntity.java b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/entity/HdfsUserCommandPatternEntity.java
new file mode 100644
index 0000000..e8c7b72
--- /dev/null
+++ b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/entity/HdfsUserCommandPatternEntity.java
@@ -0,0 +1,82 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.security.entity;
+
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.meta.*;
+
+import java.util.Map;
+
+/**
+ * User command pattern entity to specify Siddhi pattern, field selector and field modifier
+ */
+@Table("hdfsusercommandpattern")
+@ColumnFamily("f")
+@Prefix("hdfsusercommandpattern")
+@Service(HdfsUserCommandPatternEntity.HDFS_USER_COMMAND_PATTERN_SERVICE)
+@TimeSeries(false)
+@Tags({"userCommand"})
+public class HdfsUserCommandPatternEntity extends TaggedLogAPIEntity {
+    public static final String HDFS_USER_COMMAND_PATTERN_SERVICE = "HdfsUserCommandPatternService";
+    @Column("a")
+    private String description;
+    public String getDescription(){
+        return description;
+    }
+    public void setDescription(String description){
+        this.description = description;
+    }
+
+    @Column("b")
+    private String pattern;
+
+    public String getPattern() {
+        return pattern;
+    }
+
+    public void setPattern(String pattern) {
+        this.pattern = pattern;
+        valueChanged("pattern");
+    }
+
+    @Column("c")
+    private Map<String, String> fieldSelector;
+
+    public Map<String, String> getFieldSelector(){
+        return fieldSelector;
+    }
+
+    public void setFieldSelector(Map<String, String> fieldSelector){
+        this.fieldSelector = fieldSelector;
+        valueChanged("fieldSelector");
+    }
+
+    @Column("d")
+    private Map<String, String> fieldModifier;
+
+    public Map<String, String> getFieldModifier(){
+        return fieldModifier;
+    }
+
+    public void setFieldModifier(Map<String, String> fieldModifier){
+        this.fieldModifier = fieldModifier;
+        valueChanged("fieldModifier");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/entity/HiveResourceEntity.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/entity/HiveResourceEntity.java b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/entity/HiveResourceEntity.java
new file mode 100644
index 0000000..24f1f97
--- /dev/null
+++ b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/entity/HiveResourceEntity.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.security.entity;
+
+import com.google.common.base.Objects;
+
+import java.io.Serializable;
+import java.util.Set;
+
+public class HiveResourceEntity implements Serializable {
+    private String resource;
+    private String database;
+    private String table;
+    private String column;
+    private String sensitiveType;
+    private Set<String> childSensitiveTypes;
+
+
+    public Set<String> getChildSensitiveTypes() {
+        return childSensitiveTypes;
+    }
+
+    public void setChildSensitiveTypes(Set<String> childSensitiveTypes) {
+        this.childSensitiveTypes = childSensitiveTypes;
+    }
+
+    public String getResource() {
+        return resource;
+    }
+
+    public void setResource(String resource) {
+        this.resource = resource;
+    }
+
+    public String getDatabase() {
+        return database;
+    }
+
+    public void setDatabase(String database) {
+        this.database = database;
+    }
+
+    public String getTable() {
+        return table;
+    }
+
+    public void setTable(String table) {
+        this.table = table;
+    }
+
+    public String getColumn() {
+        return column;
+    }
+
+    public void setColumn(String column) {
+        this.column = column;
+    }
+
+    public String getSensitiveType() {
+        return sensitiveType;
+    }
+
+    public void setSensitiveType(String sensitiveType) {
+        this.sensitiveType = sensitiveType;
+    }
+
+
+    public HiveResourceEntity(String resource, String database, String table, String column, String sensitiveType, Set<String> childSensitiveTypes) {
+        this.resource = resource;
+        this.database = database;
+        this.table = table;
+        this.column = column;
+        this.sensitiveType = sensitiveType;
+        this.childSensitiveTypes = childSensitiveTypes;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (obj == null || getClass() != obj.getClass()) {
+            return false;
+        }
+        final HiveResourceEntity other = (HiveResourceEntity) obj;
+        return Objects.equal(this.resource, other.resource)
+                && this.database.equals(other.database)
+                && this.table.equals(other.table)
+                && this.column.equals(other.column)
+                && this.sensitiveType.equals(other.sensitiveType)
+                && this.childSensitiveTypes.containsAll(other.childSensitiveTypes);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/entity/HiveResourceSensitivityAPIEntity.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/entity/HiveResourceSensitivityAPIEntity.java b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/entity/HiveResourceSensitivityAPIEntity.java
new file mode 100644
index 0000000..aebec8b
--- /dev/null
+++ b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/entity/HiveResourceSensitivityAPIEntity.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.security.entity;
+
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.meta.Column;
+import org.apache.eagle.log.entity.meta.ColumnFamily;
+import org.apache.eagle.log.entity.meta.Prefix;
+import org.apache.eagle.log.entity.meta.Service;
+import org.apache.eagle.log.entity.meta.Table;
+import org.apache.eagle.log.entity.meta.Tags;
+import org.apache.eagle.log.entity.meta.TimeSeries;
+
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
+@Table("hiveResourceSensitivity")
+@ColumnFamily("f")
+@Prefix("hiveResourceSensitivity")
+@Service("HiveResourceSensitivityService")
+@TimeSeries(false)
+@Tags({"site", "hiveResource"})
+public class HiveResourceSensitivityAPIEntity extends TaggedLogAPIEntity {
+	private static final long serialVersionUID = 1L;
+	/**
+     * sensitivityType can be multi-value attribute, and values can be separated by "|"
+     */
+    @Column("a")
+    private String sensitivityType;
+
+    public String getSensitivityType() {
+        return sensitivityType;
+    }
+
+    public void setSensitivityType(String sensitivityType) {
+        this.sensitivityType = sensitivityType;
+        valueChanged("sensitivityType");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/entity/IPZoneEntity.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/entity/IPZoneEntity.java b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/entity/IPZoneEntity.java
new file mode 100644
index 0000000..f944ace
--- /dev/null
+++ b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/entity/IPZoneEntity.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.security.entity;
+
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.meta.Column;
+import org.apache.eagle.log.entity.meta.ColumnFamily;
+import org.apache.eagle.log.entity.meta.Prefix;
+import org.apache.eagle.log.entity.meta.Service;
+import org.apache.eagle.log.entity.meta.Table;
+import org.apache.eagle.log.entity.meta.Tags;
+import org.apache.eagle.log.entity.meta.TimeSeries;
+
+/**
+ * ipranger can be single IP, host or ip with subnet etc.  
+ */
+@JsonSerialize(include =  JsonSerialize.Inclusion.NON_NULL)
+@Table("ipzone")
+@ColumnFamily("f")
+@Prefix("ipzone")
+@Service("IPZoneService")
+@TimeSeries(false)
+@Tags({"iphost"})
+public class IPZoneEntity extends TaggedLogAPIEntity{
+	@Column("a")
+	private String securityZone;
+
+	public String getSecurityZone() {
+		return securityZone;
+	}
+
+	public void setSecurityZone(String securityZone) {
+		this.securityZone = securityZone;
+		valueChanged("securityZone");
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/entity/SecurityEntityRepository.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/entity/SecurityEntityRepository.java b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/entity/SecurityEntityRepository.java
new file mode 100644
index 0000000..a313983
--- /dev/null
+++ b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/entity/SecurityEntityRepository.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.eagle.security.entity;
+
+import org.apache.eagle.log.entity.repo.EntityRepository;
+
+
+public class SecurityEntityRepository extends EntityRepository {
+    public SecurityEntityRepository() {
+        entitySet.add(HbaseResourceSensitivityAPIEntity.class);
+        entitySet.add(FileSensitivityAPIEntity.class);
+        entitySet.add(IPZoneEntity.class);
+        entitySet.add(HdfsUserCommandPatternEntity.class);
+        entitySet.add(HiveResourceSensitivityAPIEntity.class);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/hbase/HbaseResourceEntity.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/hbase/HbaseResourceEntity.java b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/hbase/HbaseResourceEntity.java
deleted file mode 100644
index 379a52a..0000000
--- a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/hbase/HbaseResourceEntity.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.security.hbase;
-
-import com.google.common.base.Objects;
-
-import java.io.Serializable;
-import java.util.Set;
-
-public class HbaseResourceEntity implements Serializable {
-    private String resource;
-    private String namespace;
-    private String table;
-    private String columnFamily;
-    private String sensitiveType;
-    private Set<String> childSensitiveTypes;
-
-
-
-    public HbaseResourceEntity(String resource, String ns, String table, String cf, String sensitiveType, Set<String> childSensitiveTypes) {
-        this.resource = resource;
-        this.namespace = ns;
-        this.table = table;
-        this.columnFamily = cf;
-        this.sensitiveType = sensitiveType;
-        this.childSensitiveTypes = childSensitiveTypes;
-    }
-
-    public String getResource() {
-        return resource;
-    }
-
-    public void setResource(String resource) {
-        this.resource = resource;
-    }
-
-    public String getNamespace() {
-        return namespace;
-    }
-
-    public void setNamespace(String namespace) {
-        this.namespace = namespace;
-    }
-
-    public String getTable() {
-        return table;
-    }
-
-    public void setTable(String table) {
-        this.table = table;
-    }
-
-    public String getColumnFamily() {
-        return columnFamily;
-    }
-
-    public void setColumnFamily(String columnFamily) {
-        this.columnFamily = columnFamily;
-    }
-
-    public String getSensitiveType() {
-        return sensitiveType;
-    }
-
-    public void setSensitiveType(String sensitiveType) {
-        this.sensitiveType = sensitiveType;
-    }
-
-    public Set<String> getChildSensitiveTypes() {
-        return childSensitiveTypes;
-    }
-
-    public void setChildSensitiveTypes(Set<String> childSensitiveTypes) {
-        this.childSensitiveTypes = childSensitiveTypes;
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-        if (obj == null || getClass() != obj.getClass()) {
-            return false;
-        }
-        final HbaseResourceEntity other = (HbaseResourceEntity) obj;
-        return Objects.equal(this.resource, other.resource)
-                && this.namespace.equals(other.namespace)
-                && this.table.equals(other.table)
-                && this.columnFamily.equals(other.columnFamily)
-                && this.sensitiveType.equals(other.sensitiveType)
-                && this.childSensitiveTypes.containsAll(other.childSensitiveTypes);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/hbase/HbaseResourceSensitivityAPIEntity.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/hbase/HbaseResourceSensitivityAPIEntity.java b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/hbase/HbaseResourceSensitivityAPIEntity.java
deleted file mode 100644
index 0adfeb4..0000000
--- a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/hbase/HbaseResourceSensitivityAPIEntity.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.security.hbase;
-
-import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
-import org.apache.eagle.log.entity.meta.*;
-import org.codehaus.jackson.map.annotate.JsonSerialize;
-
-@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
-@Table("hbaseResourceSensitivity")
-@ColumnFamily("f")
-@Prefix("hbaseResourceSensitivity")
-@Service("HbaseResourceSensitivityService")
-@TimeSeries(false)
-@Tags({"site", "hbaseResource"})
-public class HbaseResourceSensitivityAPIEntity extends TaggedLogAPIEntity {
-	private static final long serialVersionUID = 2L;
-	/**
-     * sensitivityType can be multi-value attribute, and values can be separated by "|"
-     */
-    @Column("a")
-    private String sensitivityType;
-
-    public String getSensitivityType() {
-        return sensitivityType;
-    }
-
-    public void setSensitivityType(String sensitivityType) {
-        this.sensitivityType = sensitivityType;
-        valueChanged("sensitivityType");
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/hbase/HbaseSecurityEntityRepository.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/hbase/HbaseSecurityEntityRepository.java b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/hbase/HbaseSecurityEntityRepository.java
deleted file mode 100644
index 8777c20..0000000
--- a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/hbase/HbaseSecurityEntityRepository.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.security.hbase;
-
-import org.apache.eagle.log.entity.repo.EntityRepository;
-
-public class HbaseSecurityEntityRepository extends EntityRepository {
-	public HbaseSecurityEntityRepository() {
-		entitySet.add(HbaseResourceSensitivityAPIEntity.class);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/hdfs/entity/FileSensitivityAPIEntity.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/hdfs/entity/FileSensitivityAPIEntity.java b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/hdfs/entity/FileSensitivityAPIEntity.java
deleted file mode 100644
index 00b6ca7..0000000
--- a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/hdfs/entity/FileSensitivityAPIEntity.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.security.hdfs.entity;
-
-import org.codehaus.jackson.map.annotate.JsonSerialize;
-
-import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
-import org.apache.eagle.log.entity.meta.Column;
-import org.apache.eagle.log.entity.meta.ColumnFamily;
-import org.apache.eagle.log.entity.meta.Prefix;
-import org.apache.eagle.log.entity.meta.Service;
-import org.apache.eagle.log.entity.meta.Table;
-import org.apache.eagle.log.entity.meta.Tags;
-import org.apache.eagle.log.entity.meta.TimeSeries;
-
-@JsonSerialize(include =  JsonSerialize.Inclusion.NON_NULL)
-@Table("fileSensitivity")
-@ColumnFamily("f")
-@Prefix("fileSensitivity")
-@Service("FileSensitivityService")
-@TimeSeries(false)
-@Tags({"site", "filedir"})
-public class FileSensitivityAPIEntity extends TaggedLogAPIEntity{
-	/**
-	 * sensitivityType can be multi-value attribute, and values can be separated by "|"
-	 */
-	@Column("a")
-	private String sensitivityType;
-
-	public String getSensitivityType() {
-		return sensitivityType;
-	}
-
-	public void setSensitivityType(String sensitivityType) {
-		this.sensitivityType = sensitivityType;
-		valueChanged("sensitivityType");
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/hdfs/entity/FileStatusEntity.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/hdfs/entity/FileStatusEntity.java b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/hdfs/entity/FileStatusEntity.java
deleted file mode 100644
index 101ac5a..0000000
--- a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/hdfs/entity/FileStatusEntity.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.security.hdfs.entity;
-
-import java.io.IOException;
-import java.util.Set;
-
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
-
-public class FileStatusEntity {
-    //private Path path;
-    private long length;
-    private boolean isdir;
-    private short block_replication;
-    private long blocksize;
-    private long modification_time;
-
-    private long access_time;
-    private FsPermission permission;
-    private String owner;
-    private String group;
-    private Path symlink;
-    private String resource;
-    private String sensitiveType;
-    private Set<String> childSensitiveTypes;
-
-    public FileStatusEntity(){
-
-    }
-
-    public FileStatusEntity(FileStatus status) throws IOException {
-        //this.path = status.getPath();
-        this.length = status.getLen();
-        this.isdir = status.isDirectory();
-        this.block_replication = status.getReplication();
-        this.blocksize = status.getBlockSize();
-        this.modification_time = status.getModificationTime();
-        this.access_time = status.getAccessTime();
-        this.permission = status.getPermission();
-        this.owner = status.getOwner();
-        this.group = status.getGroup();
-        if(status.isSymlink()) {
-            this.symlink = status.getSymlink();
-        }
-    }
-
-    public long getAccess_time() {
-        return access_time;
-    }
-
-    public String getResource() {
-        return resource;
-    }
-
-    public void setResource(String resource) {
-        this.resource = resource;
-    }
-
-    public void setAccess_time(long access_time) {
-        this.access_time = access_time;
-
-    }
-
-    //public Path getPath() {
-    //    return path;
-    //}
-
-    //public void setPath(Path path) {
-    //    this.path = path;
-    //}
-
-    public long getLength() {
-        return length;
-    }
-
-    public void setLength(long length) {
-        this.length = length;
-    }
-
-    public boolean isIsdir() {
-        return isdir;
-    }
-
-    public void setIsdir(boolean isdir) {
-        this.isdir = isdir;
-    }
-
-    public short getBlock_replication() {
-        return block_replication;
-    }
-
-    public void setBlock_replication(short block_replication) {
-        this.block_replication = block_replication;
-    }
-
-    public long getBlocksize() {
-        return blocksize;
-    }
-
-    public void setBlocksize(long blocksize) {
-        this.blocksize = blocksize;
-    }
-
-    public long getModification_time() {
-        return modification_time;
-    }
-
-    public void setModification_time(long modification_time) {
-        this.modification_time = modification_time;
-    }
-
-    public FsPermission getPermission() {
-        return permission;
-    }
-
-    public void setPermission(FsPermission permission) {
-        this.permission = permission;
-    }
-
-    public String getOwner() {
-        return owner;
-    }
-
-    public void setOwner(String owner) {
-        this.owner = owner;
-    }
-
-    public String getGroup() {
-        return group;
-    }
-
-    public void setGroup(String group) {
-        this.group = group;
-    }
-
-    public Path getSymlink() {
-        return symlink;
-    }
-
-    public void setSymlink(Path symlink) {
-        this.symlink = symlink;
-    }
-
-    public String getSensitiveType() {
-        return sensitiveType;
-    }
-
-    public void setSensitiveType(String sensitiveType) {
-        this.sensitiveType = sensitiveType;
-    }
-
-    public Set<String> getChildSensitiveTypes() {
-        return childSensitiveTypes;
-    }
-
-    public void setChildSensitiveTypes(Set<String> childSensitiveTypes) {
-        this.childSensitiveTypes = childSensitiveTypes;
-    }
-    
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/hdfs/entity/HDFSSecurityEntityRepository.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/hdfs/entity/HDFSSecurityEntityRepository.java b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/hdfs/entity/HDFSSecurityEntityRepository.java
deleted file mode 100644
index 35c3578..0000000
--- a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/hdfs/entity/HDFSSecurityEntityRepository.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- package org.apache.eagle.security.hdfs.entity;
-
-import org.apache.eagle.log.entity.repo.EntityRepository;
-
-public class HDFSSecurityEntityRepository extends EntityRepository {
-	public HDFSSecurityEntityRepository(){
-		entitySet.add(FileSensitivityAPIEntity.class);
-        entitySet.add(IPZoneEntity.class);
-        entitySet.add(HdfsUserCommandPatternEntity.class);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/hdfs/entity/HdfsUserCommandPatternEntity.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/hdfs/entity/HdfsUserCommandPatternEntity.java b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/hdfs/entity/HdfsUserCommandPatternEntity.java
deleted file mode 100644
index d35719f..0000000
--- a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/hdfs/entity/HdfsUserCommandPatternEntity.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package org.apache.eagle.security.hdfs.entity;
-
-import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
-import org.apache.eagle.log.entity.meta.*;
-
-import java.util.Map;
-
-/**
- * User command pattern entity to specify Siddhi pattern, field selector and field modifier
- */
-@Table("hdfsusercommandpattern")
-@ColumnFamily("f")
-@Prefix("hdfsusercommandpattern")
-@Service(HdfsUserCommandPatternEntity.HDFS_USER_COMMAND_PATTERN_SERVICE)
-@TimeSeries(false)
-@Tags({"userCommand"})
-public class HdfsUserCommandPatternEntity extends TaggedLogAPIEntity {
-    public static final String HDFS_USER_COMMAND_PATTERN_SERVICE = "HdfsUserCommandPatternService";
-    @Column("a")
-    private String description;
-    public String getDescription(){
-        return description;
-    }
-    public void setDescription(String description){
-        this.description = description;
-    }
-
-    @Column("b")
-    private String pattern;
-
-    public String getPattern() {
-        return pattern;
-    }
-
-    public void setPattern(String pattern) {
-        this.pattern = pattern;
-        valueChanged("pattern");
-    }
-
-    @Column("c")
-    private Map<String, String> fieldSelector;
-
-    public Map<String, String> getFieldSelector(){
-        return fieldSelector;
-    }
-
-    public void setFieldSelector(Map<String, String> fieldSelector){
-        this.fieldSelector = fieldSelector;
-        valueChanged("fieldSelector");
-    }
-
-    @Column("d")
-    private Map<String, String> fieldModifier;
-
-    public Map<String, String> getFieldModifier(){
-        return fieldModifier;
-    }
-
-    public void setFieldModifier(Map<String, String> fieldModifier){
-        this.fieldModifier = fieldModifier;
-        valueChanged("fieldModifier");
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/hdfs/entity/IPZoneEntity.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/hdfs/entity/IPZoneEntity.java b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/hdfs/entity/IPZoneEntity.java
deleted file mode 100644
index c60724b..0000000
--- a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/hdfs/entity/IPZoneEntity.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.security.hdfs.entity;
-
-import org.codehaus.jackson.map.annotate.JsonSerialize;
-
-import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
-import org.apache.eagle.log.entity.meta.Column;
-import org.apache.eagle.log.entity.meta.ColumnFamily;
-import org.apache.eagle.log.entity.meta.Prefix;
-import org.apache.eagle.log.entity.meta.Service;
-import org.apache.eagle.log.entity.meta.Table;
-import org.apache.eagle.log.entity.meta.Tags;
-import org.apache.eagle.log.entity.meta.TimeSeries;
-
-/**
- * ipranger can be single IP, host or ip with subnet etc.  
- */
-@JsonSerialize(include =  JsonSerialize.Inclusion.NON_NULL)
-@Table("ipzone")
-@ColumnFamily("f")
-@Prefix("ipzone")
-@Service("IPZoneService")
-@TimeSeries(false)
-@Tags({"iphost"})
-public class IPZoneEntity extends TaggedLogAPIEntity{
-	@Column("a")
-	private String securityZone;
-
-	public String getSecurityZone() {
-		return securityZone;
-	}
-
-	public void setSecurityZone(String securityZone) {
-		this.securityZone = securityZone;
-		valueChanged("securityZone");
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/hive/entity/HiveResourceEntity.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/hive/entity/HiveResourceEntity.java b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/hive/entity/HiveResourceEntity.java
deleted file mode 100644
index f0e0435..0000000
--- a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/hive/entity/HiveResourceEntity.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.security.hive.entity;
-
-import com.google.common.base.Objects;
-
-import java.io.Serializable;
-import java.util.Set;
-
-public class HiveResourceEntity implements Serializable {
-    private String resource;
-    private String database;
-    private String table;
-    private String column;
-    private String sensitiveType;
-    private Set<String> childSensitiveTypes;
-
-
-    public Set<String> getChildSensitiveTypes() {
-        return childSensitiveTypes;
-    }
-
-    public void setChildSensitiveTypes(Set<String> childSensitiveTypes) {
-        this.childSensitiveTypes = childSensitiveTypes;
-    }
-
-    public String getResource() {
-        return resource;
-    }
-
-    public void setResource(String resource) {
-        this.resource = resource;
-    }
-
-    public String getDatabase() {
-        return database;
-    }
-
-    public void setDatabase(String database) {
-        this.database = database;
-    }
-
-    public String getTable() {
-        return table;
-    }
-
-    public void setTable(String table) {
-        this.table = table;
-    }
-
-    public String getColumn() {
-        return column;
-    }
-
-    public void setColumn(String column) {
-        this.column = column;
-    }
-
-    public String getSensitiveType() {
-        return sensitiveType;
-    }
-
-    public void setSensitiveType(String sensitiveType) {
-        this.sensitiveType = sensitiveType;
-    }
-
-
-    public HiveResourceEntity(String resource, String database, String table, String column, String sensitiveType, Set<String> childSensitiveTypes) {
-        this.resource = resource;
-        this.database = database;
-        this.table = table;
-        this.column = column;
-        this.sensitiveType = sensitiveType;
-        this.childSensitiveTypes = childSensitiveTypes;
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-        if (obj == null || getClass() != obj.getClass()) {
-            return false;
-        }
-        final HiveResourceEntity other = (HiveResourceEntity) obj;
-        return Objects.equal(this.resource, other.resource)
-                && this.database.equals(other.database)
-                && this.table.equals(other.table)
-                && this.column.equals(other.column)
-                && this.sensitiveType.equals(other.sensitiveType)
-                && this.childSensitiveTypes.containsAll(other.childSensitiveTypes);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/hive/entity/HiveResourceSensitivityAPIEntity.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/hive/entity/HiveResourceSensitivityAPIEntity.java b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/hive/entity/HiveResourceSensitivityAPIEntity.java
deleted file mode 100644
index 6086908..0000000
--- a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/hive/entity/HiveResourceSensitivityAPIEntity.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.security.hive.entity;
-
-import org.codehaus.jackson.map.annotate.JsonSerialize;
-
-import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
-import org.apache.eagle.log.entity.meta.Column;
-import org.apache.eagle.log.entity.meta.ColumnFamily;
-import org.apache.eagle.log.entity.meta.Prefix;
-import org.apache.eagle.log.entity.meta.Service;
-import org.apache.eagle.log.entity.meta.Table;
-import org.apache.eagle.log.entity.meta.Tags;
-import org.apache.eagle.log.entity.meta.TimeSeries;
-
-@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
-@Table("hiveResourceSensitivity")
-@ColumnFamily("f")
-@Prefix("hiveResourceSensitivity")
-@Service("HiveResourceSensitivityService")
-@TimeSeries(false)
-@Tags({"site", "hiveResource"})
-public class HiveResourceSensitivityAPIEntity extends TaggedLogAPIEntity {
-	private static final long serialVersionUID = 1L;
-	/**
-     * sensitivityType can be multi-value attribute, and values can be separated by "|"
-     */
-    @Column("a")
-    private String sensitivityType;
-
-    public String getSensitivityType() {
-        return sensitivityType;
-    }
-
-    public void setSensitivityType(String sensitivityType) {
-        this.sensitivityType = sensitivityType;
-        valueChanged("sensitivityType");
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/hive/entity/HiveSecurityEntityRepository.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/hive/entity/HiveSecurityEntityRepository.java b/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/hive/entity/HiveSecurityEntityRepository.java
deleted file mode 100644
index 96164b5..0000000
--- a/eagle-security/eagle-security-common/src/main/java/org/apache/eagle/security/hive/entity/HiveSecurityEntityRepository.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.security.hive.entity;
-
-import org.apache.eagle.log.entity.repo.EntityRepository;
-
-public class HiveSecurityEntityRepository extends EntityRepository {
-	public HiveSecurityEntityRepository() {
-		entitySet.add(HiveResourceSensitivityAPIEntity.class);
-	}
-}



[3/4] incubator-eagle git commit: EAGLE-130 Eagle Pipeline DSL: Parser, Compiler, Runner

Posted by ha...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/scala/org/apache/eagle/stream/pipeline/DataFlowSpec.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/scala/org/apache/eagle/stream/pipeline/DataFlowSpec.scala b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/scala/org/apache/eagle/stream/pipeline/DataFlowSpec.scala
new file mode 100644
index 0000000..e63280a
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/scala/org/apache/eagle/stream/pipeline/DataFlowSpec.scala
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.stream.pipeline
+
+import com.typesafe.config.ConfigFactory
+import org.apache.eagle.stream.pipeline.parser.{ConnectionIdentifier, DataFlow, DefinitionIdentifier, Identifier}
+import org.scalatest.{FlatSpec, Matchers}
+
+class DataFlowSpec extends FlatSpec with Matchers {
+  val dataFlowConfig =
+    """
+       |{
+       |	kafkaSource.metric_event_1 {
+       |    schema {
+       |      metric: string
+       |      timestamp: long
+       |      value: double
+       |    }
+       |		parallism = 1000
+       |		topic = "metric_event_1"
+       |		zkConnection = "localhost:2181"
+       |		zkConnectionTimeoutMS = 15000
+       |		consumerGroupId = "Consumer"
+       |		fetchSize = 1048586
+       |		transactionZKServers = "localhost"
+       |		transactionZKPort = 2181
+       |		transactionZKRoot = "/consumers"
+       |		transactionStateUpdateMS = 2000
+       |		deserializerClass = "org.apache.eagle.datastream.storm.JsonMessageDeserializer"
+       |	}
+       |
+       |	kafkaSource.metric_event_2 {
+       |		schema = {
+       |      metric: string
+       |      timestamp: long
+       |      value: double
+       |    }
+       |		parallism = 1000
+       |		topic = "metric_event_2"
+       |		zkConnection = "localhost:2181"
+       |		zkConnectionTimeoutMS = 15000
+       |		consumerGroupId = "Consumer"
+       |		fetchSize = 1048586
+       |		transactionZKServers = "localhost"
+       |		transactionZKPort = 2181
+       |		transactionZKRoot = "/consumers"
+       |		transactionStateUpdateMS = 2000
+       |		deserializerClass = "org.apache.eagle.datastream.storm.JsonMessageDeserializer"
+       |	}
+       |
+       |	kafkaSink.metricStore {}
+       |
+       |	alert.alert {
+       |		executor = "alertExecutor"
+       |	}
+       |
+       |	aggregator.aggreator {
+       |		executor = "aggreationExecutor"
+       |	}
+       |
+       |	metric_event_1|metric_event_2 -> alert {}
+       |	metric_event_1|metric_event_2 -> metricStore {}
+       |}
+     """.stripMargin
+
+  DataFlow.getClass.toString should "parse dataflow end-to-end correctly" in {
+    val config = ConfigFactory.parseString(dataFlowConfig)
+    config should not be null
+    val dataflow = DataFlow.parse(config)
+    dataflow should not be null
+    dataflow.getConnectors.size should be(4)
+    dataflow.getProcessors.size should be(5)
+  }
+
+  Identifier.getClass.toString should "parse as definition" in {
+    val defId = Identifier.parse("kafka").asInstanceOf[DefinitionIdentifier]
+    defId.moduleType should be("kafka")
+  }
+
+  Identifier.getClass.toString should "parse node1 -> node2 as connection" in {
+    val id = Identifier.parse("node1 -> node2").asInstanceOf[ConnectionIdentifier]
+    id.fromIds.size should be(1)
+  }
+
+  Identifier.getClass.toString should "parse node1|node2 -> node3" in {
+    val id = Identifier.parse("node1|node2 -> node3").asInstanceOf[ConnectionIdentifier]
+    id.fromIds.size should be(2)
+  }
+
+  Identifier.getClass.toString should "parse node1|node2|node3 -> node4 as connection" in {
+    val id = Identifier.parse("node1|node2|node3 -> node4").asInstanceOf[ConnectionIdentifier]
+    id.fromIds.size should be(3)
+  }
+
+  Identifier.getClass.toString should "parse node1 | node2 | node3 -> node4 as connection" in {
+    val id = Identifier.parse("node1 | node2 | node3 -> node4").asInstanceOf[ConnectionIdentifier]
+    id.fromIds.size should be(3)
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/scala/org/apache/eagle/stream/pipeline/PipelineSpec.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/scala/org/apache/eagle/stream/pipeline/PipelineSpec.scala b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/scala/org/apache/eagle/stream/pipeline/PipelineSpec.scala
new file mode 100644
index 0000000..c87475b
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/scala/org/apache/eagle/stream/pipeline/PipelineSpec.scala
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.stream.pipeline
+
+import org.apache.eagle.datastream.ExecutionEnvironments.storm
+import org.scalatest.{FlatSpec, Matchers}
+
+class PipelineSpec extends FlatSpec with Matchers{
+  "Pipeline" should "parse successfully from pipeline_1.conf" in {
+    val pipeline = Pipeline.parseResource("pipeline_1.conf")
+    pipeline should not be null
+  }
+
+  "Pipeline" should "compile successfully from pipeline_2.conf" in {
+    val pipeline = Pipeline.parseResource("pipeline_2.conf")
+    pipeline should not be null
+    val stream = Pipeline.compile(pipeline)
+    stream should not be null
+    // Throw ClassNotFoundException when submit in unit test
+    // stream.submit[storm]
+  }
+}
+
+/**
+ * Storm LocalCluster throws ClassNotFoundException when submit in unit test, so here submit in App
+ */
+object PipelineSpec_2 extends App{
+  val pipeline = Pipeline(args).parseResource("pipeline_2.conf")
+  val stream = Pipeline.compile(pipeline)
+  stream.submit[storm]
+}
+
+object PipelineSpec_3 extends App {
+  Pipeline(args).submit[storm]("pipeline_3.conf")
+}
+
+object PipelineSpec_4 extends App {
+  Pipeline(args).submit[storm]("pipeline_4.conf")
+}
+
+object PipelineCLISpec extends App{
+  Pipeline.main(args)
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/AggregateExecutorFactory.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/AggregateExecutorFactory.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/AggregateExecutorFactory.java
index e66d3fb..48c832a 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/AggregateExecutorFactory.java
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/AggregateExecutorFactory.java
@@ -58,11 +58,8 @@ public class AggregateExecutorFactory {
 	public IPolicyExecutor[] createExecutors(Config config, List<String> streamNames, String executorId) throws Exception {
 		StringBuilder partitionerCls = new StringBuilder(DefaultPolicyPartitioner.class.getCanonicalName());
         int numPartitions = loadExecutorConfig(config, executorId, partitionerCls);
-        
-		PolicyDefinitionDAO<AggregateDefinitionAPIEntity> policyDefDao = new PolicyDefinitionEntityDAOImpl<AggregateDefinitionAPIEntity>(
+		PolicyDefinitionDAO<AggregateDefinitionAPIEntity> policyDefDao = new PolicyDefinitionEntityDAOImpl<>(
 				new EagleServiceConnector(config), Constants.AGGREGATE_DEFINITION_SERVICE_ENDPOINT_NAME);
-		
-		
 		return newAggregateExecutors(policyDefDao, streamNames, executorId, numPartitions, partitionerCls.toString());
 	}
 	
@@ -86,7 +83,6 @@ public class AggregateExecutorFactory {
         return numPartitions;
 	}
 
-
 //	private List<String> findStreamNames(Config config, String executorId, String dataSource) throws Exception {
 //		// Get map from alertExecutorId to alert stream
 //		// (dataSource) => Map[alertExecutorId:String,streamName:List[String]]
@@ -104,12 +100,12 @@ public class AggregateExecutorFactory {
 	private AggregateExecutor[] newAggregateExecutors(PolicyDefinitionDAO<AggregateDefinitionAPIEntity> alertDefDAO,
 			List<String> sourceStreams, String executorID, int numPartitions, String partitionerCls)
 					throws Exception {
-		LOG.info("Creating alert executors with executorID: " + executorID + ", numPartitions: "
+		LOG.info("Creating aggregator executors with executorID: " + executorID + ", numPartitions: "
 				+ numPartitions + ", Partition class is: " + partitionerCls);
 
 		PolicyPartitioner partitioner = (PolicyPartitioner) Class.forName(partitionerCls).newInstance();
 		AggregateExecutor[] alertExecutors = new AggregateExecutor[numPartitions];
-		String[] _sourceStreams = sourceStreams.toArray(new String[0]);
+		String[] _sourceStreams = sourceStreams.toArray(new String[sourceStreams.size()]);
 
 		for (int i = 0; i < numPartitions; i++) {
 			alertExecutors[i] = new AggregateExecutor(executorID, partitioner, numPartitions, i, alertDefDAO,
@@ -117,5 +113,4 @@ public class AggregateExecutorFactory {
 		}
 		return alertExecutors;
 	}
-
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/SimpleAggregateExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/SimpleAggregateExecutor.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/SimpleAggregateExecutor.java
index 8c01935..e42fc48 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/SimpleAggregateExecutor.java
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/aggregate/SimpleAggregateExecutor.java
@@ -16,6 +16,7 @@
  */
 package org.apache.eagle.dataproc.impl.aggregate;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.typesafe.config.Config;
 import org.apache.eagle.dataproc.core.JsonSerDeserUtils;
 import org.apache.eagle.dataproc.core.ValuesArray;
@@ -38,6 +39,7 @@ import org.slf4j.LoggerFactory;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 /**
  * Created on 1/10/16.
@@ -68,10 +70,13 @@ public class SimpleAggregateExecutor
         aggDef.getTags().put(Constants.POLICY_TYPE, policyType);
         // TODO make it more general, not only hard code siddhi cep support here.
         try {
-            String template = "{\"type\":\"siddhiCEPEngine\", \"expression\":\"%s\", \"containsDefintion\": true }";
-            aggDef.setPolicyDef(String.format(template, this.cql));
+            Map<String,Object> template = new HashMap<>();
+            template.put("type","siddhiCEPEngine");
+            template.put("expression",this.cql);
+            template.put("containsDefinition",true);
+            aggDef.setPolicyDef(new ObjectMapper().writer().writeValueAsString(template));
         } catch (Exception e) {
-            LOG.error("simple aggregate generate policy definition failed!", e);
+            LOG.error("Simple aggregate generate policy definition failed!", e);
         }
         aggDef.setCreatedTime(new Date().getTime());
         aggDef.setLastModifiedDate(new Date().getTime());

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/JsonSerializer.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/JsonSerializer.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/JsonSerializer.java
new file mode 100644
index 0000000..416aaa3
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/JsonSerializer.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.dataproc.impl.storm.kafka;
+
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.SerializationConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Map;
+
+public class JsonSerializer implements Serializer<Object> {
+    private final StringSerializer stringSerializer = new StringSerializer();
+    private static final Logger logger = LoggerFactory.getLogger(JsonSerializer.class);
+    private static final ObjectMapper om = new ObjectMapper();
+
+    static {
+        om.configure(SerializationConfig.Feature.WRITE_DATES_AS_TIMESTAMPS, true);
+    }
+
+    @Override
+    public void configure(Map<String, ?> configs, boolean isKey) {
+        stringSerializer.configure(configs,isKey);
+    }
+
+    @Override
+    public byte[] serialize(String topic, Object data) {
+        String str = null;
+        try {
+            str = om.writeValueAsString(data);
+        } catch (IOException e) {
+            logger.error("Kafka serialization for send error!", e);
+        }
+        return stringSerializer.serialize(topic, str);
+    }
+
+    @Override
+    public void close() {
+        stringSerializer.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutProvider.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutProvider.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutProvider.java
index 53537d7..0107a9b 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutProvider.java
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutProvider.java
@@ -37,26 +37,36 @@ public class KafkaSourcedSpoutProvider implements StormSpoutProvider {
 		return new SchemeAsMultiScheme(new KafkaSourcedSpoutScheme(deserClsName, context));
 	}
 
+    private String configPrefix = "dataSourceConfig";
+
+    public KafkaSourcedSpoutProvider(){}
+
+    public KafkaSourcedSpoutProvider(String prefix){
+        this.configPrefix = prefix;
+    }
+
 	@Override
-	public BaseRichSpout getSpout(Config context){
+	public BaseRichSpout getSpout(Config config){
+        Config context = config;
+        if(this.configPrefix!=null) context = config.getConfig(configPrefix);
 		// Kafka topic
-		String topic = context.getString("dataSourceConfig.topic");
+		String topic = context.getString("topic");
 		// Kafka consumer group id
-		String groupId = context.getString("dataSourceConfig.consumerGroupId");
+		String groupId = context.getString("consumerGroupId");
 		// Kafka fetch size
-		int fetchSize = context.getInt("dataSourceConfig.fetchSize");
+		int fetchSize = context.getInt("fetchSize");
 		// Kafka deserializer class
-		String deserClsName = context.getString("dataSourceConfig.deserializerClass");
+		String deserClsName = context.getString("deserializerClass");
 		// Kafka broker zk connection
-		String zkConnString = context.getString("dataSourceConfig.zkConnection");
+		String zkConnString = context.getString("zkConnection");
 		// transaction zkRoot
-		String zkRoot = context.getString("dataSourceConfig.transactionZKRoot");
+		String zkRoot = context.getString("transactionZKRoot");
 
         LOG.info(String.format("Use topic id: %s",topic));
 
         String brokerZkPath = null;
-        if(context.hasPath("dataSourceConfig.brokerZkPath")) {
-            brokerZkPath = context.getString("dataSourceConfig.brokerZkPath");
+        if(context.hasPath("brokerZkPath")) {
+            brokerZkPath = context.getString("brokerZkPath");
         }
 
         BrokerHosts hosts;
@@ -72,20 +82,20 @@ public class KafkaSourcedSpoutProvider implements StormSpoutProvider {
 				groupId);
 		
 		// transaction zkServers
-		spoutConfig.zkServers = Arrays.asList(context.getString("dataSourceConfig.transactionZKServers").split(","));
+		spoutConfig.zkServers = Arrays.asList(context.getString("transactionZKServers").split(","));
 		// transaction zkPort
-		spoutConfig.zkPort = context.getInt("dataSourceConfig.transactionZKPort");
+		spoutConfig.zkPort = context.getInt("transactionZKPort");
 		// transaction update interval
-		spoutConfig.stateUpdateIntervalMs = context.getLong("dataSourceConfig.transactionStateUpdateMS");
+		spoutConfig.stateUpdateIntervalMs = context.getLong("transactionStateUpdateMS");
 		// Kafka fetch size
 		spoutConfig.fetchSizeBytes = fetchSize;		
 		// "startOffsetTime" is for test usage, prod should not use this
-		if (context.hasPath("dataSourceConfig.startOffsetTime")) {
-			spoutConfig.startOffsetTime = context.getInt("dataSourceConfig.startOffsetTime");
+		if (context.hasPath("startOffsetTime")) {
+			spoutConfig.startOffsetTime = context.getInt("startOffsetTime");
 		}		
 		// "forceFromStart" is for test usage, prod should not use this 
-		if (context.hasPath("dataSourceConfig.forceFromStart")) {
-			spoutConfig.forceFromStart = context.getBoolean("dataSourceConfig.forceFromStart");
+		if (context.hasPath("forceFromStart")) {
+			spoutConfig.forceFromStart = context.getBoolean("forceFromStart");
 		}
 		
 		spoutConfig.scheme = getStreamScheme(deserClsName, context);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutScheme.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutScheme.java b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutScheme.java
index faeb7c3..15401fd 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutScheme.java
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/java/org/apache/eagle/dataproc/impl/storm/kafka/KafkaSourcedSpoutScheme.java
@@ -38,7 +38,7 @@ public class KafkaSourcedSpoutScheme implements Scheme {
 	public KafkaSourcedSpoutScheme(String deserClsName, Config context){
 		try{
 			Properties prop = new Properties();
-            if(context.getObject("eagleProps") != null) {
+            if(context.hasPath("eagleProps")) {
                 prop.putAll(context.getObject("eagleProps"));
             }
 			Constructor<?> constructor =  Class.forName(deserClsName).getConstructor(Properties.class);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/ExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/ExecutionEnvironment.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/ExecutionEnvironment.scala
deleted file mode 100644
index 7e5f271..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/ExecutionEnvironment.scala
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.datastream
-
-import com.typesafe.config.{Config, ConfigFactory}
-import org.apache.eagle.dataproc.util.ConfigOptionParser
-import org.apache.eagle.datastream.core._
-import org.apache.eagle.datastream.storm.StormExecutionEnvironment
-
-import scala.reflect.runtime.universe._
-
-/**
- * Execution environment factory
- *
- * The factory is mainly used for create or manage execution environment,
- * and also handles the shared works like configuration, arguments for execution environment
- *
- * Notice: this factory class should not know any implementation like storm or spark
- *
- * @since 0.3.0
- */
-object ExecutionEnvironments{
-  /**
-   * Use `'''get[StormExecutionEnvironment](config)'''` instead
-   *
-   * @param config
-   * @return
-   */
-  @deprecated("Execution environment should not know implementation of Storm")
-  def getStorm(config : Config) = new StormExecutionEnvironment(config)
-
-  /**
-   * Use `'''get[StormExecutionEnvironment]'''` instead
-   *
-   * @return
-   */
-  @deprecated("Execution environment should not know implementation of Storm")
-  def getStorm:StormExecutionEnvironment = {
-    val config = ConfigFactory.load()
-    getStorm(config)
-  }
-
-  /**
-   * Use `'''get[StormExecutionEnvironment](args)'''` instead
-   *
-   * @see get[StormExecutionEnvironment](args)
-   *
-   * @param args
-   * @return
-   */
-  @deprecated("Execution environment should not know implementation of Storm")
-  def getStorm(args:Array[String]):StormExecutionEnvironment = {
-    getStorm(new ConfigOptionParser().load(args))
-  }
-
-  /**
-   * @param typeTag
-   * @tparam T
-   * @return
-   */
-  def get[T<:ExecutionEnvironment](implicit typeTag: TypeTag[T]): T ={
-    get[T](ConfigFactory.load())
-  }
-
-  /**
-   *
-   * @param config
-   * @param typeTag
-   * @tparam T
-   * @return
-   */
-  def get[T<:ExecutionEnvironment](config:Config)(implicit typeTag: TypeTag[T]): T ={
-    typeTag.mirror.runtimeClass(typeOf[T]).getConstructor(classOf[Config]).newInstance(config).asInstanceOf[T]
-  }
-
-  /**
-   *
-   * @param args
-   * @param typeTag
-   * @tparam T
-   * @return
-   */
-  def get[T<:ExecutionEnvironment](args:Array[String])(implicit typeTag: TypeTag[T]): T ={
-    get[T](new ConfigOptionParser().load(args))
-  }
-
-  /**
-   * Support java style for default config
-   *
-   * @param clazz execution environment class
-   * @tparam T execution environment type
-   * @return
-   */
-  def get[T<:ExecutionEnvironment](clazz:Class[T]):T ={
-    get[T](ConfigFactory.load(),clazz)
-  }
-
-  /**
-   * Support java style
-   * @param config command config
-   * @param clazz execution environment class
-   * @tparam T execution environment type
-   * @return
-   */
-  def get[T<:ExecutionEnvironment](config:Config,clazz:Class[T]):T ={
-    clazz.getConstructor(classOf[Config]).newInstance(config)
-  }
-
-  /**
-   * Support java style
-   *
-   * @param args command arguments in string array
-   * @param clazz execution environment class
-   * @tparam T execution environment type
-   * @return
-   */
-  def get[T<:ExecutionEnvironment](args:Array[String],clazz:Class[T]):T ={
-    clazz.getConstructor(classOf[Config]).newInstance(new ConfigOptionParser().load(args))
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/ExecutionEnvironments.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/ExecutionEnvironments.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/ExecutionEnvironments.scala
new file mode 100644
index 0000000..90e59cf
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/ExecutionEnvironments.scala
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.datastream
+
+import com.typesafe.config.{Config, ConfigFactory}
+import org.apache.eagle.dataproc.util.ConfigOptionParser
+import org.apache.eagle.datastream.core._
+import org.apache.eagle.datastream.storm.StormExecutionEnvironment
+
+import scala.reflect.runtime.universe._
+
+/**
+ * Execution environment factory
+ *
+ * The factory is mainly used for create or manage execution environment,
+ * and also handles the shared works like configuration, arguments for execution environment
+ *
+ * Notice: this factory class should not know any implementation like storm or spark
+ *
+ * @since 0.3.0
+ */
+object ExecutionEnvironments{
+  type storm = StormExecutionEnvironment
+
+  /**
+   * Use `'''get[StormExecutionEnvironment](config)'''` instead
+   *
+   * @param config
+   * @return
+   */
+  @deprecated("Execution environment should not know implementation of Storm")
+  def getStorm(config : Config) = new StormExecutionEnvironment(config)
+
+  /**
+   * Use `'''get[StormExecutionEnvironment]'''` instead
+   *
+   * @return
+   */
+  @deprecated("Execution environment should not know implementation of Storm")
+  def getStorm:StormExecutionEnvironment = {
+    val config = ConfigFactory.load()
+    getStorm(config)
+  }
+
+  /**
+   * Use `'''get[StormExecutionEnvironment](args)'''` instead
+   *
+   * @see get[StormExecutionEnvironment](args)
+    * @param args
+   * @return
+   */
+  @deprecated("Execution environment should not know implementation of Storm")
+  def getStorm(args:Array[String]):StormExecutionEnvironment = {
+    getStorm(new ConfigOptionParser().load(args))
+  }
+
+  /**
+   * @param typeTag
+   * @tparam T
+   * @return
+   */
+  def get[T<:ExecutionEnvironment](implicit typeTag: TypeTag[T]): T ={
+    getWithConfig[T](ConfigFactory.load())
+  }
+
+  /**
+   *
+   * @param config
+   * @param typeTag
+   * @tparam T
+   * @return
+   */
+  def getWithConfig[T <: ExecutionEnvironment](config:Config)(implicit typeTag: TypeTag[T]): T ={
+    typeTag.mirror.runtimeClass(typeOf[T]).getConstructor(classOf[Config]).newInstance(config).asInstanceOf[T]
+  }
+
+  /**
+   *
+   * @param args
+   * @param typeTag
+   * @tparam T
+   * @return
+   */
+  def get[T<:ExecutionEnvironment](args:Array[String])(implicit typeTag: TypeTag[T]): T ={
+    getWithConfig[T](new ConfigOptionParser().load(args))
+  }
+
+  /**
+   * Support java style for default config
+   *
+   * @param clazz execution environment class
+   * @tparam T execution environment type
+   * @return
+   */
+  def get[T<:ExecutionEnvironment](clazz:Class[T]):T ={
+    get[T](ConfigFactory.load(),clazz)
+  }
+
+  def get[T<:ExecutionEnvironment](clazz:Class[T], config:Config):T ={
+    get[T](config,clazz)
+  }
+
+  /**
+   * Support java style
+    *
+    * @param config command config
+   * @param clazz execution environment class
+   * @tparam T execution environment type
+   * @return
+   */
+  def get[T<:ExecutionEnvironment](config:Config,clazz:Class[T]):T ={
+    clazz.getConstructor(classOf[Config]).newInstance(config)
+  }
+
+  /**
+   * Support java style
+   *
+   * @param args command arguments in string array
+   * @param clazz execution environment class
+   * @tparam T execution environment type
+   * @return
+   */
+  def get[T<:ExecutionEnvironment](args:Array[String],clazz:Class[T]):T ={
+    clazz.getConstructor(classOf[Config]).newInstance(new ConfigOptionParser().load(args))
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/ExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/ExecutionEnvironment.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/ExecutionEnvironment.scala
index b189c57..c511484 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/ExecutionEnvironment.scala
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/ExecutionEnvironment.scala
@@ -19,64 +19,27 @@
 package org.apache.eagle.datastream.core
 
 import com.typesafe.config.Config
-import org.apache.eagle.datastream.utils.GraphPrinter
-import org.jgrapht.experimental.dag.DirectedAcyclicGraph
+
+trait StreamContextAdapter{
+  def submit(context:StreamContext):Unit = {
+    execute(context.build)
+  }
+  def execute(dag: StreamDAG)
+}
 
 /**
+ * TODO: Decouple execution environment with stream context
+ *
  * @since 0.3.0
  */
-trait ExecutionEnvironment {
-  def config:Configuration
-
-  /**
-   * Business logic DAG
-   * @return
-   */
-  def dag:DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]]
-
+abstract class ExecutionEnvironment(private val conf:Config)
+  extends StreamContext(conf) with StreamContextAdapter     // Continue to support old API
+  with StreamSourceBuilder
+{
   /**
    * Start to execute
    */
-  def execute():Unit
-
-  /**
-   * Support Java Style Config
-   *
-   * @return
-   */
-  def getConfig:Config = config.get
-}
-
-/**
- * @todo Use Configuration instead of Config
- *
- * @param conf
- */
-abstract class ExecutionEnvironmentBase(private val conf:Config)  extends ExecutionEnvironment with StreamSourceBuilder {
-  implicit private val _dag = new DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]](classOf[StreamConnector[Any,Any]])
-  private val _config:Configuration = Configuration(conf)
-
-  override def dag = _dag
-  override def config = _config
-
-  override def execute(): Unit = {
-    implicit val i_conf = _config.get
-    StreamNameExpansion()
-    GraphPrinter.print(dag,message="Before expanded DAG ")
-    StreamAggregateExpansion()
-    GraphPrinter.print(dag,message="after analyze expanded DAG ")
-    StreamAlertExpansion()
-    StreamUnionExpansion()
-    StreamGroupbyExpansion()
-    StreamParallelismConfigExpansion()
-    StreamNameExpansion()
-    GraphPrinter.print(dag,message="After expanded DAG ")
-
-    GraphPrinter.printDotDigraph(dag)
-
-    val streamDAG = StreamDAGTransformer.transform(dag)
-    execute(streamDAG)
+  def execute():Unit = {
+    submit(this)
   }
-
-  protected def execute(dag: StreamDAG)
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamAggregateExpansion.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamAggregateExpansion.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamAggregateExpansion.scala
index a95001b..ffb4b9e 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamAggregateExpansion.scala
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamAggregateExpansion.scala
@@ -42,7 +42,6 @@ class StreamAggregateExpansion(config: Config) extends StreamAlertExpansion(conf
          */
         val newStreamProducers = rewriteWithStreamOutputWrapper(current, dag, toBeAddedEdges, toBeRemovedVertex, upStreamNames)
 
-
         val analyzeExecutors = if (cepQl != null) {
           AggregateExecutorFactory.Instance.createExecutors(cepQl)
         } else {
@@ -50,7 +49,7 @@ class StreamAggregateExpansion(config: Config) extends StreamAlertExpansion(conf
         }
 
         analyzeExecutors.foreach(exec => {
-          val t = FlatMapProducer(exec.asInstanceOf[FlatMapper[Any]]).nameAs(exec.getExecutorId() + "_" + exec.getPartitionSeq()).initWith(dag,config, hook = false)
+          val t = FlatMapProducer(exec.asInstanceOf[FlatMapper[Any]]).initWith(dag,config, hook = false).nameAs(exec.getExecutorId + "_" + exec.getPartitionSeq).stream(child.stream)
 
           // connect with previous
           if (strategy == null) {
@@ -70,7 +69,6 @@ class StreamAggregateExpansion(config: Config) extends StreamAlertExpansion(conf
       case _ => 
     }
   }
-  
 }
 
 object StreamAggregateExpansion{

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamAlertExpansion.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamAlertExpansion.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamAlertExpansion.scala
index c731ac9..1ef57cc 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamAlertExpansion.scala
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamAlertExpansion.scala
@@ -54,6 +54,7 @@ import com.typesafe.config.Config
 
 case class StreamAlertExpansion(config: Config) extends StreamDAGExpansion(config) {
   val LOG = LoggerFactory.getLogger(classOf[StreamAlertExpansion])
+  import StreamAlertExpansion._
 
   override def expand(dag: DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]]): Unit ={
     val iter = dag.iterator()
@@ -78,7 +79,7 @@ case class StreamAlertExpansion(config: Config) extends StreamDAGExpansion(confi
   def onIteration(toBeAddedEdges: ListBuffer[StreamConnector[Any,Any]], toBeRemovedVertex: ListBuffer[StreamProducer[Any]], 
                dag: DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]], current: StreamProducer[Any], child: StreamProducer[Any]): Unit = {
     child match {
-      case AlertStreamSink(upStreamNames, alertExecutorId, withConsumer,strategy) => {
+      case AlertStreamProducer(upStreamNames, alertExecutorId, withConsumer,strategy) => {
         /**
          * step 1: wrapper previous StreamProducer with one more field "streamName"
          * for AlertStreamSink, we check previous StreamProducer and replace that
@@ -114,7 +115,10 @@ case class StreamAlertExpansion(config: Config) extends StreamDAGExpansion(confi
     }
   }
 
-  protected def rewriteWithStreamOutputWrapper(current: org.apache.eagle.datastream.core.StreamProducer[Any], dag: org.jgrapht.experimental.dag.DirectedAcyclicGraph[org.apache.eagle.datastream.core.StreamProducer[Any],org.apache.eagle.datastream.core.StreamConnector[Any,Any]], toBeAddedEdges: scala.collection.mutable.ListBuffer[org.apache.eagle.datastream.core.StreamConnector[Any,Any]], toBeRemovedVertex: scala.collection.mutable.ListBuffer[org.apache.eagle.datastream.core.StreamProducer[Any]], upStreamNames: java.util.List[String]) = {/**
+  protected def rewriteWithStreamOutputWrapper(current: org.apache.eagle.datastream.core.StreamProducer[Any], dag: org.jgrapht.experimental.dag.DirectedAcyclicGraph[org.apache.eagle.datastream.core.StreamProducer[Any],org.apache.eagle.datastream.core.StreamConnector[Any,Any]], toBeAddedEdges: scala.collection.mutable.ListBuffer[org.apache.eagle.datastream.core.StreamConnector[Any,Any]], toBeRemovedVertex: scala.collection.mutable.ListBuffer[org.apache.eagle.datastream.core.StreamProducer[Any]], upStreamNames: java.util.List[String]) = {
+    if(upStreamNames == null) throw new NullPointerException("upStreamNames is null")
+
+    /**
      * step 1: wrapper previous StreamProducer with one more field "streamName"
      * for AlertStreamSink, we check previous StreamProducer and replace that
      */
@@ -129,20 +133,22 @@ case class StreamAlertExpansion(config: Config) extends StreamDAGExpansion(confi
           i += 1
         })
       }
-      case _: FlatMapProducer[AnyRef, AnyRef] => {
-        newStreamProducers += replace(toBeAddedEdges, toBeRemovedVertex, dag, current, upStreamNames.get(0))
+      case p: FlatMapProducer[AnyRef, AnyRef] =>
+        if(upStreamNames.size()>1) throw new IllegalStateException("More than 1 upStreamNames "+upStreamNames+" found for "+p){
+        newStreamProducers += replace(toBeAddedEdges, toBeRemovedVertex, dag, current, recognizeSingleStreamName(p,upStreamNames))
       }
-      case _: MapperProducer[AnyRef,AnyRef] => {
-        newStreamProducers += replace(toBeAddedEdges, toBeRemovedVertex, dag, current, upStreamNames.get(0))
+      case p: MapperProducer[AnyRef,AnyRef] => {
+        newStreamProducers += replace(toBeAddedEdges, toBeRemovedVertex, dag, current, recognizeSingleStreamName(p,upStreamNames))
       }
       case s: StreamProducer[AnyRef] if dag.inDegreeOf(s) == 0 => {
-        newStreamProducers += replace(toBeAddedEdges, toBeRemovedVertex, dag, current, upStreamNames.get(0))
+        newStreamProducers += replace(toBeAddedEdges, toBeRemovedVertex, dag, current, recognizeSingleStreamName(s,upStreamNames))
       }
       case p@_ => throw new IllegalStateException(s"$p can not be put before AlertStreamSink, only StreamUnionProducer,FlatMapProducer and MapProducer are supported")
     }
     newStreamProducers
   }
 
+
   protected def replace(toBeAddedEdges: ListBuffer[StreamConnector[Any,Any]], toBeRemovedVertex: ListBuffer[StreamProducer[Any]],
                       dag: DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]], current: StreamProducer[Any], upStreamName: String) : StreamProducer[Any]= {
     var newsp: StreamProducer[Any] = null
@@ -152,11 +158,11 @@ case class StreamAlertExpansion(config: Config) extends StreamDAGExpansion(confi
         mapper match {
           case a: JavaStormStreamExecutor[EagleTuple] => {
             val newmapper = new JavaStormExecutorForAlertWrapper(a.asInstanceOf[JavaStormStreamExecutor[Tuple2[String, util.SortedMap[AnyRef, AnyRef]]]], upStreamName)
-            newsp = FlatMapProducer(newmapper).initWith(dag,config,hook = false)
+            newsp = FlatMapProducer(newmapper).initWith(dag,config,hook = false).stream(current.streamId)
           }
           case b: StormStreamExecutor[EagleTuple] => {
             val newmapper = StormExecutorForAlertWrapper(b.asInstanceOf[StormStreamExecutor[Tuple2[String, util.SortedMap[AnyRef, AnyRef]]]], upStreamName)
-            newsp = FlatMapProducer(newmapper).initWith(dag,config,hook = false)
+            newsp = FlatMapProducer(newmapper).initWith(dag,config,hook = false).stream(current.streamId)
           }
           case _ => throw new IllegalArgumentException
         }
@@ -176,8 +182,8 @@ case class StreamAlertExpansion(config: Config) extends StreamDAGExpansion(confi
           }
         }
         current match {
-          case MapperProducer(2, fn) => newsp = MapperProducer(3, newfun)
-          case _ => throw new IllegalArgumentException
+          case MapperProducer(_, fn) => newsp = MapperProducer(3, newfun).initWith(dag,config,hook = false).stream(current.stream)
+          case _ => throw new IllegalArgumentException(s"Illegal producer $current")
         }
         val incomingEdges = dag.incomingEdgesOf(current)
         incomingEdges.foreach(e => toBeAddedEdges += StreamConnector(e.from, newsp))
@@ -196,7 +202,7 @@ case class StreamAlertExpansion(config: Config) extends StreamDAGExpansion(confi
             }
           }
         }
-        newsp = MapperProducer(3,fn)
+        newsp = MapperProducer(3,fn).initWith(dag,config,hook = false).stream(s.stream)
         toBeAddedEdges += StreamConnector(current,newsp)
         val outgoingEdges = dag.outgoingEdgesOf(current)
         outgoingEdges.foreach(e => toBeAddedEdges += StreamConnector(newsp,e.to))
@@ -213,6 +219,35 @@ object StreamAlertExpansion{
     e.expand(dag)
     e
   }
+
+  /**
+    * Try upStreamNames firstly, otherwise try producer.streamId
+    *
+    * @param producer
+    * @param upStreamNames
+    * @return
+    */
+  private def recognizeSingleStreamName(producer: StreamProducer[AnyRef],upStreamNames:util.List[String]):String = {
+    if(upStreamNames == null){
+      producer.streamId
+    }else if(upStreamNames.size()>1){
+      if(producer.streamId == null) {
+        if (upStreamNames.size() > 1)
+          throw new IllegalStateException("Too many (more than 1) upStreamNames " + upStreamNames + " given for " + producer)
+        else
+          upStreamNames.get(0)
+      } else {
+        producer.streamId
+      }
+    } else if(upStreamNames.size() == 1){
+      upStreamNames.get(0)
+    }else {
+      if(producer.streamId == null){
+        throw new IllegalArgumentException("No stream name found for "+producer)
+      } else
+        producer.streamId
+    }
+  }
 }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamBuilder.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamBuilder.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamBuilder.scala
new file mode 100644
index 0000000..6e21bcc
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamBuilder.scala
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.datastream.core
+
+import com.typesafe.config.{Config, ConfigFactory}
+import org.apache.eagle.dataproc.util.ConfigOptionParser
+import org.apache.eagle.datastream.ExecutionEnvironments
+import org.apache.eagle.datastream.utils.GraphPrinter
+import org.jgrapht.experimental.dag.DirectedAcyclicGraph
+
+import scala.reflect.runtime.universe._
+
+trait StreamContextBuilder extends StreamSourceBuilder {
+  def config:Configuration
+  /**
+   * Business logic DAG
+   * @return
+   */
+  def dag:DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]]
+  /**
+   * Support Java Style Config
+   *
+   * @return
+   */
+  def getConfig:Config = config.get
+  def build:StreamDAG
+  def submit[E<:ExecutionEnvironment](implicit typeTag:TypeTag[E]):Unit
+  def submit(env:ExecutionEnvironment):Unit
+  def submit(clazz:Class[ExecutionEnvironment]):Unit
+}
+
+class StreamContext(private val conf:Config) extends StreamContextBuilder{
+  implicit private val _dag = new DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]](classOf[StreamConnector[Any,Any]])
+  private val _config:Configuration = Configuration(conf)
+  override def dag = _dag
+  override def config = _config
+  override def build: StreamDAG = {
+    implicit val i_conf = _config.get
+    StreamNameExpansion()
+    GraphPrinter.print(dag,message="Before expanded DAG ")
+    StreamAggregateExpansion()
+    StreamAlertExpansion()
+    StreamUnionExpansion()
+    StreamGroupbyExpansion()
+    StreamParallelismConfigExpansion()
+    StreamNameExpansion()
+    GraphPrinter.print(dag,message="After expanded DAG ")
+    GraphPrinter.printDotDigraph(dag)
+    StreamDAGTransformer.transform(dag)
+  }
+
+  override def submit(env: ExecutionEnvironment): Unit = {
+    env.submit(this)
+  }
+
+  override def submit(clazz: Class[ExecutionEnvironment]): Unit = {
+    ExecutionEnvironments.get(clazz,conf).submit(this)
+  }
+
+  override def submit[E <: ExecutionEnvironment](implicit typeTag: TypeTag[E]): Unit = {
+    ExecutionEnvironments.getWithConfig[E](conf).submit(this)
+  }
+}
+
+object StreamContext {
+  /**
+   * @return
+   */
+  def apply():StreamContext = {
+    new StreamContext(ConfigFactory.load())
+  }
+
+  /**
+   *
+   * @param args
+   * @return
+   */
+  def apply(args:Array[String]):StreamContext ={
+    new StreamContext(new ConfigOptionParser().load(args))
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamDAG.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamDAG.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamDAG.scala
index 255f031..7845740 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamDAG.scala
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamDAG.scala
@@ -27,7 +27,8 @@ import scala.collection.{JavaConversions, mutable}
  * wrapper of DAG, used for storm topology compiler
  */
 class StreamDAG(val graph: DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]]) extends StreamProducerGraph {
-  var nodeMap: mutable.Map[String, StreamProducer[Any]] = null
+  var nodeMap: mutable.Map[String, StreamProducer[Any]] = mutable.Map[String,StreamProducer[Any]]()
+  graph.iterator().asScala.foreach(p=> nodeMap.put(p.name,p))
 
   override def addEdge(from: StreamProducer[Any], to: StreamProducer[Any], streamConnector: StreamConnector[Any,Any]): Unit = {
     graph.addEdge(from, to, streamConnector)
@@ -35,6 +36,7 @@ class StreamDAG(val graph: DirectedAcyclicGraph[StreamProducer[Any], StreamConne
 
   override def addVertex(producer: StreamProducer[Any]): Unit = {
     graph.addVertex(producer)
+    nodeMap.put(producer.name,producer)
   }
 
   override def iterator(): Iterator[StreamProducer[Any]] = {
@@ -65,4 +67,4 @@ class StreamDAG(val graph: DirectedAcyclicGraph[StreamProducer[Any], StreamConne
     graph.incomingEdgesOf(v).asScala.foreach(e => set += graph.getEdgeSource(e))
     set
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamDAGTransformer.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamDAGTransformer.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamDAGTransformer.scala
index 32947b9..6b20bf2 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamDAGTransformer.scala
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamDAGTransformer.scala
@@ -31,6 +31,7 @@ object StreamDAGTransformer {
    * @param dag DirectedAcyclicGraph[StreamProducer, StreamConnector]
    * @return StormStreamDAG
    */
+  @deprecated("Use StreamDAG(dag) will transform directly")
   def transform(dag: DirectedAcyclicGraph[StreamProducer[Any], StreamConnector[Any,Any]]) : StreamDAG = {
     val stormDAG = new StreamDAG(dag)
     val nodeMap = mutable.HashMap[String, StreamProducer[Any]]()

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamParallelismConfigExpansion.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamParallelismConfigExpansion.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamParallelismConfigExpansion.scala
index e01ffa4..8699da6 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamParallelismConfigExpansion.scala
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamParallelismConfigExpansion.scala
@@ -45,9 +45,13 @@ case class StreamParallelismConfigExpansion(config: Config) extends StreamDAGExp
   }
 
   private def getParallelismMap(config: Config) : Map[Pattern, Int]= {
-    val parallelismConfig: ConfigObject = config.getObject("envContextConfig.parallelismConfig")
-    parallelismConfig.asScala.toMap map {
-      case (name, value) => (Pattern.compile(name), value.asInstanceOf[ConfigValue].unwrapped().asInstanceOf[Int])
+    if(config.hasPath("envContextConfig.parallelismConfig")) {
+      val parallelismConfig: ConfigObject = config.getObject("envContextConfig.parallelismConfig")
+      parallelismConfig.asScala.toMap map {
+        case (name, value) => (Pattern.compile(name), value.asInstanceOf[ConfigValue].unwrapped().asInstanceOf[Int])
+      }
+    }else{
+      Map[Pattern,Int]()
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamProducer.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamProducer.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamProducer.scala
index 4d81424..18acd9c 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamProducer.scala
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamProducer.scala
@@ -1,20 +1,18 @@
 /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
  *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
+ *    http://www.apache.org/licenses/LICENSE-2.0
  *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 package org.apache.eagle.datastream.core
 
@@ -24,7 +22,7 @@ import java.util.concurrent.atomic.AtomicInteger
 import backtype.storm.topology.base.BaseRichSpout
 import com.typesafe.config.Config
 import org.apache.eagle.alert.entity.AlertAPIEntity
-import org.apache.eagle.datastream.FlatMapper
+import org.apache.eagle.datastream.{FlatMapperWrapper, Collector, FlatMapper}
 import org.apache.eagle.partition.PartitionStrategy
 import org.apache.eagle.policy.common.Constants
 import org.jgrapht.experimental.dag.DirectedAcyclicGraph
@@ -73,48 +71,54 @@ abstract class StreamProducer[+T <: Any] extends StreamInfo with StreamProtocol[
 
   override def filter(fn : T => Boolean): StreamProducer[T] ={
     val ret = FilterProducer[T](fn)
-    hookup(this, ret)
+    connect(this, ret)
     ret
   }
 
   override def flatMap[R](flatMapper:FlatMapper[R]): StreamProducer[R] = {
     val ret = FlatMapProducer[T,R](flatMapper)
-    hookup(this, ret)
+    connect(this, ret)
+    ret
+  }
+  override def flatMap[R](func:(Any,Collector[R])=>Unit): StreamProducer[R] = {
+    val ret = FlatMapProducer[T,R](FlatMapperWrapper[R](func))
+    connect(this, ret)
     ret
   }
 
+
   override def foreach(fn : T => Unit) : Unit = {
     val ret = ForeachProducer[T](fn)
-    hookup(this, ret)
+    connect(this, ret)
   }
 
   override def map[R](fn : T => R) : StreamProducer[R] = {
     val ret = MapperProducer[T,R](0,fn)
-    hookup(this, ret)
+    connect(this, ret)
     ret
   }
 
   override def map1[R](fn : T => R): StreamProducer[R] = {
     val ret = MapperProducer[T,R](1, fn)
-    hookup(this, ret)
+    connect(this, ret)
     ret
   }
 
   override def map2[R](fn : T => R): StreamProducer[R] = {
     val ret = MapperProducer[T,R](2, fn)
-    hookup(this, ret)
+    connect(this, ret)
     ret
   }
 
   override def map3[R](fn : T => R): StreamProducer[R] = {
     val ret = MapperProducer(3, fn)
-    hookup(this, ret)
+    connect(this, ret)
     ret
   }
 
   override def map4[R](fn : T => R): StreamProducer[R] = {
     val ret = MapperProducer(4, fn)
-    hookup(this, ret)
+    connect(this, ret)
     ret
   }
 
@@ -125,7 +129,15 @@ abstract class StreamProducer[+T <: Any] extends StreamInfo with StreamProtocol[
     // validate each field index is greater or equal to 0
     fields.foreach(n => if(n<0) throw new IllegalArgumentException("field index should be always >= 0"))
     val ret = GroupByFieldProducer[T](fields)
-    hookup(this, ret)
+    connect(this, ret)
+    ret
+  }
+
+  def groupByFieldIndex(fields : Seq[Int]) : StreamProducer[T] = {
+    // validate each field index is greater or equal to 0
+    fields.foreach(n => if(n<0) throw new IllegalArgumentException("field index should be always >= 0"))
+    val ret = GroupByFieldProducer[T](fields)
+    connect(this, ret)
     ret
   }
 
@@ -134,19 +146,19 @@ abstract class StreamProducer[+T <: Any] extends StreamInfo with StreamProtocol[
     // validate each field index is greater or equal to 0
     fields.foreach(n => if(n<0) throw new IllegalArgumentException("field index should be always >= 0"))
     val ret = GroupByFieldProducer[T](fields.asScala.toSeq.asInstanceOf[Seq[Int]])
-    hookup(this, ret)
+    connect(this, ret)
     ret
   }
 
   override def groupByKey(keySelector: T=> Any):StreamProducer[T] = {
     val ret = GroupByKeyProducer(keySelector)
-    hookup(this,ret)
+    connect(this,ret)
     ret
   }
 
   override def streamUnion[T2,T3](others : Seq[StreamProducer[T2]]) : StreamProducer[T3] = {
     val ret = StreamUnionProducer[T, T2, T3](others)
-    hookup(this, ret)
+    connect(this, ret)
     ret
   }
 
@@ -158,7 +170,7 @@ abstract class StreamProducer[+T <: Any] extends StreamInfo with StreamProtocol[
 
   override def groupBy(strategy : PartitionStrategy) : StreamProducer[T] = {
     val ret = GroupByStrategyProducer(strategy)
-    hookup(this, ret)
+    connect(this, ret)
     ret
   }
 
@@ -173,9 +185,10 @@ abstract class StreamProducer[+T <: Any] extends StreamInfo with StreamProtocol[
     alert(upStreamNames.asScala, alertExecutorId, consume = false)
   }
 
-  def alert(upStreamNames: Seq[String], alertExecutorId : String, consume: Boolean=true, strategy : PartitionStrategy=null) = {
-    val ret = AlertStreamSink(upStreamNames, alertExecutorId, consume, strategy)
-    hookup(this, ret)
+  override def alert(upStreamNames: Seq[String], alertExecutorId : String, consume: Boolean=true, strategy : PartitionStrategy=null):AlertStreamProducer = {
+    val ret = AlertStreamProducer(upStreamNames, alertExecutorId, consume, strategy)
+    connect(this, ret)
+    ret
   }
 
   def alertWithConsumer(upStreamName: String, alertExecutorId : String, strategy: PartitionStrategy): Unit ={
@@ -196,28 +209,36 @@ abstract class StreamProducer[+T <: Any] extends StreamInfo with StreamProtocol[
 
   def aggregate(upStreamNames: java.util.List[String], queryExecutorId : String, strategy: PartitionStrategy = null): StreamProducer[T] = {
     val ret= AggregateProducer(upStreamNames, queryExecutorId, null, strategy)
-    hookup(this, ret)
+    connect(this, ret)
     ret
   }
 
   def aggregate(cql : String, strategy: PartitionStrategy): StreamProducer[T] = {
     val ret= AggregateProducer(util.Arrays.asList(Constants.EAGLE_DEFAULT_POLICY_NAME), null, cql, strategy)
-    hookup(this, ret)
+    connect(this, ret)
     ret
   }
   
   def persist(executorId : String, storageType: StorageType.StorageType) : StreamProducer[T] = {
     val ret = PersistProducer(executorId, storageType)
-    hookup(this, ret)
+    connect(this, ret)
     ret
   }
 
-  protected def hookup[T1,T2](current: StreamProducer[T1], next: StreamProducer[T2]) = {
+  def connect[T1,T2](current: StreamProducer[T1], next: StreamProducer[T2]) = {
+    if(current.graph == null) throw new NullPointerException(s"$current has not been registered to any graph before being connected")
     current.graph.addVertex(next)
     current.graph.addEdge(current, next, StreamConnector(current, next))
     passOnContext[T1,T2](current, next)
   }
 
+  def connect[T2]( next: StreamProducer[T2]) = {
+    if(this.graph == null) throw new NullPointerException("graph is null")
+    this.graph.addVertex(next)
+    this.graph.addEdge(this, next, StreamConnector(this, next))
+    passOnContext[T,T2](this, next)
+  }
+
   private def passOnContext[T1 ,T2](current: StreamProducer[T1], next: StreamProducer[T2]): Unit ={
     next.initWith(current.graph,current.config)
   }
@@ -288,16 +309,21 @@ case class StormSourceProducer[T](source: BaseRichSpout) extends StreamProducer[
   }
 }
 
-case class IterableStreamProducer[T](iterable: Iterable[T],recycle:Boolean = false) extends StreamProducer[T]
+case class IterableStreamProducer[T](iterable: Iterable[T],recycle:Boolean = false) extends StreamProducer[T]{
+  override def toString: String = s"IterableStreamProducer(${iterable.getClass.getSimpleName}))"
+}
+case class IteratorStreamProducer[T](iterator: Iterator[T]) extends StreamProducer[T]{
+  override def toString: String = s"IteratorStreamProducer(${iterator.getClass.getSimpleName})"
+}
 
-case class AlertStreamSink(upStreamNames: util.List[String], alertExecutorId : String, var consume: Boolean=true, strategy: PartitionStrategy=null) extends StreamProducer[AlertAPIEntity] {
-  def consume(consume: Boolean): AlertStreamSink = {
+case class AlertStreamProducer(var upStreamNames: util.List[String], alertExecutorId : String, var consume: Boolean=true, strategy: PartitionStrategy=null) extends StreamProducer[AlertAPIEntity] {
+  def consume(consume: Boolean): AlertStreamProducer = {
     this.consume = consume
     this
   }
 }
 
-case class AggregateProducer[T](upStreamNames: util.List[String], analyzerId : String, cepQl: String = null, strategy:PartitionStrategy = null) extends StreamProducer[T]
+case class AggregateProducer[T](var upStreamNames: util.List[String], analyzerId : String, cepQl: String = null, strategy:PartitionStrategy = null) extends StreamProducer[T]
 
 case class PersistProducer[T](executorId :String, storageType: StorageType.StorageType) extends StreamProducer[T]
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamProtocol.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamProtocol.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamProtocol.scala
index 3ed067d..1330f06 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamProtocol.scala
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamProtocol.scala
@@ -19,12 +19,10 @@ package org.apache.eagle.datastream.core
 
 import com.typesafe.config.Config
 import org.apache.commons.lang3.builder.HashCodeBuilder
-import org.apache.eagle.datastream.FlatMapper
+import org.apache.eagle.datastream.{Collector, FlatMapper}
 import org.apache.eagle.partition.PartitionStrategy
 import org.jgrapht.experimental.dag.DirectedAcyclicGraph
 
-import scala.reflect.runtime.{universe => ru}
-
 /**
  * StreamInfo should be fully serializable and having not runtime type information
  */
@@ -39,7 +37,11 @@ class StreamInfo  extends Serializable{
    */
   var name: String = null
 
+  /**
+    * Output stream id, equals to name by default
+    */
   var streamId:String=null
+
   var parallelismNum: Int = 1
 
   /**
@@ -108,6 +110,7 @@ trait StreamProtocol[+T <: Any]{
    * @return
    */
   def flatMap[R](flatMapper:FlatMapper[R]): StreamProducer[R]
+  def flatMap[R](func:(Any,Collector[R])=>Unit): StreamProducer[R]
 
   /**
    *
@@ -152,7 +155,7 @@ trait StreamProtocol[+T <: Any]{
   def groupByKey(keyer:T => Any):StreamProducer[T]
 
   def streamUnion[T2,T3](otherStreams : Seq[StreamProducer[T2]]) : StreamProducer[T3]
-  def alert(upStreamNames: Seq[String], alertExecutorId : String, consume: Boolean,strategy : PartitionStrategy)
+  def alert(upStreamNames: Seq[String], alertExecutorId : String, consume: Boolean,strategy : PartitionStrategy):AlertStreamProducer
 
   def aggregate(upStreamNames: java.util.List[String], executorId :String, strategy:PartitionStrategy): StreamProducer[T]
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamSourceBuilder.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamSourceBuilder.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamSourceBuilder.scala
index 9884e88..5f3bd22 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamSourceBuilder.scala
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/core/StreamSourceBuilder.scala
@@ -44,4 +44,20 @@ trait StreamSourceBuilder {
     p.initWith(dag,config.get)
     p
   }
+
+  def from[T:ru.TypeTag](iterator: Iterator[T],recycle:Boolean):IteratorStreamProducer[T]={
+    val p = IteratorStreamProducer[T](iterator)
+    p.initWith(dag,config.get)
+    p
+  }
+
+  def from(product: Product):IteratorStreamProducer[Any]={
+    val p = IteratorStreamProducer[Any](product.productIterator)
+    p.initWith(dag,config.get)
+    p
+  }
+
+  def register[T](producer:StreamProducer[T]):Unit = {
+    producer.initWith(dag,config.get)
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/AbstractStreamBolt.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/AbstractStreamBolt.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/AbstractStreamBolt.scala
index e109118..64b5f0f 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/AbstractStreamBolt.scala
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/AbstractStreamBolt.scala
@@ -33,7 +33,7 @@ import org.slf4j.LoggerFactory
  * @param streamInfo
  * @tparam T
  */
-abstract class AbstractStreamBolt[T](val fieldsNum:Int=0, val ack:Boolean = true)(implicit streamInfo:StreamInfo) extends BaseRichBolt{
+abstract class AbstractStreamBolt[T](val fieldsNum:Int=1, val ack:Boolean = true)(implicit streamInfo:StreamInfo) extends BaseRichBolt{
   private var _collector: OutputCollector = null
   private val LOG = LoggerFactory.getLogger(classOf[AbstractStreamBolt[T]])
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/IterableStreamSpout.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/IterableStreamSpout.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/IterableStreamSpout.scala
index e3b6e25..c64ea83 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/IterableStreamSpout.scala
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/IterableStreamSpout.scala
@@ -52,10 +52,10 @@ case class IterableStreamSpout(iterable: Iterable[Any],recycle:Boolean = true)(i
         _collector.emit(List(current).asJava)
       }
     }else if(recycle){
-      LOG.info("Recycling the iterator")
+      if(LOG.isDebugEnabled) LOG.debug("Recycling the iterator")
       _iterator = iterable.iterator
     }else{
-      LOG.info("No tuple left, sleep forever")
+      if(LOG.isDebugEnabled) LOG.debug("No tuple left, sleep forever")
       this.deactivate()
       Utils.sleep(Long.MaxValue)
     }
@@ -65,7 +65,7 @@ case class IterableStreamSpout(iterable: Iterable[Any],recycle:Boolean = true)(i
     if(info.outKeyed) {
       declarer.declare(new Fields(NameConstants.FIELD_KEY,NameConstants.FIELD_VALUE))
     }else{
-      declarer.declare(new Fields(NameConstants.FIELD_PREFIX))
+      declarer.declare(new Fields(s"${NameConstants.FIELD_PREFIX}0"))
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/IteratorStreamSpout.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/IteratorStreamSpout.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/IteratorStreamSpout.scala
new file mode 100644
index 0000000..ea6d658
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/IteratorStreamSpout.scala
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.datastream.storm
+
+import java.util
+
+import backtype.storm.spout.SpoutOutputCollector
+import backtype.storm.task.TopologyContext
+import backtype.storm.topology.OutputFieldsDeclarer
+import backtype.storm.topology.base.BaseRichSpout
+import backtype.storm.tuple.Fields
+import backtype.storm.utils.Utils
+import org.apache.eagle.datastream.core.StreamInfo
+import org.apache.eagle.datastream.utils.NameConstants
+import org.slf4j.LoggerFactory
+
+import scala.collection.JavaConverters._
+
+case class IteratorStreamSpout(iterator: Iterator[Any])(implicit info:StreamInfo) extends BaseRichSpout {
+  val LOG = LoggerFactory.getLogger(classOf[IterableStreamSpout])
+  var _collector:SpoutOutputCollector=null
+  var _iterator:Iterator[Any] = null
+
+  override def open(conf: util.Map[_, _], context: TopologyContext, collector: SpoutOutputCollector): Unit = {
+    this._collector = collector
+    this._iterator = iterator
+  }
+
+  override def nextTuple(): Unit = {
+    if(_iterator.hasNext){
+      val current = _iterator.next().asInstanceOf[AnyRef]
+      if(info.outKeyed) {
+        _collector.emit(List(info.keySelector.key(current),current).asJava.asInstanceOf[util.List[AnyRef]])
+      }else{
+        _collector.emit(List(current).asJava)
+      }
+    }else{
+      LOG.info("No tuple left, sleep forever")
+      this.deactivate()
+      Utils.sleep(Long.MaxValue)
+    }
+  }
+
+  override def declareOutputFields(declarer: OutputFieldsDeclarer): Unit = {
+    if(info.outKeyed) {
+      declarer.declare(new Fields(NameConstants.FIELD_KEY,NameConstants.FIELD_VALUE))
+    }else{
+      declarer.declare(new Fields(s"${NameConstants.FIELD_PREFIX}0"))
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/JsonMessageDeserializer.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/JsonMessageDeserializer.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/JsonMessageDeserializer.scala
index 0001d2f..19305fa 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/JsonMessageDeserializer.scala
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/JsonMessageDeserializer.scala
@@ -33,11 +33,15 @@ case class JsonMessageDeserializer(props:Properties) extends SpoutKafkaMessageDe
 
   override def deserialize(bytes: Array[Byte]): AnyRef = {
     var map: util.Map[String, _] = null
-    try {
-      map = objectMapper.readValue(bytes, classOf[util.TreeMap[String, _]])
-    } catch {
-      case e: IOException => {
-        LOG.error("Failed to deserialize json from: " + new String(bytes), e)
+    if(bytes.length == 0 || bytes == null){
+     if(LOG.isDebugEnabled) LOG.warn("Skip empty message")
+    }else {
+      try {
+        map = objectMapper.readValue(bytes, classOf[util.TreeMap[String, _]])
+      } catch {
+        case e: IOException => {
+          LOG.error("Failed to deserialize json from: " + new String(bytes), e)
+        }
       }
     }
     map

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormBoltFactory.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormBoltFactory.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormBoltFactory.scala
index 29b5cf4..42a030d 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormBoltFactory.scala
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormBoltFactory.scala
@@ -35,9 +35,14 @@ object StormBoltFactory {
         }else if(worker.isInstanceOf[StormStreamExecutor[EagleTuple]]){
           worker.asInstanceOf[StormStreamExecutor[EagleTuple]].prepareConfig(config)
           StormBoltWrapper(worker.asInstanceOf[StormStreamExecutor[EagleTuple]])
-        }else {
-          throw new UnsupportedOperationException
+        }else if(worker.isInstanceOf[FlatMapperWrapper[Any]]){
+          StormFlatFunctionWrapper(worker.asInstanceOf[FlatMapperWrapper[Any]].func)
+        } else {
+          StormFlatMapperWrapper(worker)
         }
+//        else {
+//          throw new UnsupportedOperationException(s"Unsupported FlatMapperProducer type: $producer")
+//        }
       }
       case filter:FilterProducer[Any] => {
         FilterBoltWrapper(filter.fn)
@@ -49,8 +54,8 @@ object StormBoltFactory {
         ForeachBoltWrapper(foreach.fn)
       }
       case persist : PersistProducer[Any] => {
-        val persisExecutor = new PersistExecutor(persist.executorId, persist.storageType.toString());
-        persisExecutor.prepareConfig(config);
+        val persisExecutor = new PersistExecutor(persist.executorId, persist.storageType.toString)
+        persisExecutor.prepareConfig(config)
         JavaStormBoltWrapper(persisExecutor.asInstanceOf[JavaStormStreamExecutor[EagleTuple]])
       }
       case _ => throw new UnsupportedOperationException(s"Unsupported producer: ${producer.toString}")

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormExecutionEnvironment.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormExecutionEnvironment.scala
index 4e7d743..4165db4 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormExecutionEnvironment.scala
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormExecutionEnvironment.scala
@@ -19,15 +19,12 @@ package org.apache.eagle.datastream.storm
 import backtype.storm.topology.base.BaseRichSpout
 import com.typesafe.config.Config
 import org.apache.eagle.dataproc.impl.storm.StormSpoutProvider
-import org.apache.eagle.datastream.core.{ExecutionEnvironmentBase, StormSourceProducer, StreamDAG}
-
+import org.apache.eagle.datastream.core.{ExecutionEnvironment, StormSourceProducer, StreamDAG}
 
 /**
  * @since  12/7/15
  */
-case class StormExecutionEnvironment(private val conf:Config) extends ExecutionEnvironmentBase(conf){
-
-
+class StormExecutionEnvironment(private val conf:Config) extends ExecutionEnvironment(conf) {
   override def execute(dag: StreamDAG) : Unit = {
     StormTopologyCompiler(config.get, dag).buildTopology.execute
   }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormFlatFunctionWrapper.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormFlatFunctionWrapper.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormFlatFunctionWrapper.scala
new file mode 100644
index 0000000..e5eea1f
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormFlatFunctionWrapper.scala
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.datastream.storm
+
+import java.util
+
+import backtype.storm.tuple.Tuple
+import org.apache.eagle.datastream.Collector
+import org.apache.eagle.datastream.core.StreamInfo
+
+case class StormFlatFunctionWrapper(flatMapper:(Any,Collector[Any])=>Unit)(implicit info:StreamInfo) extends AbstractStreamBolt[Any]{
+  /**
+   * Handle keyed stream value
+   */
+  override def onKeyValue(key: Any, value: Any)(implicit input: Tuple): Unit = {
+    flatMapper(value,new Collector[Any] {
+      override def collect(r: Any): Unit = emit(r)(input)
+    })
+  }
+
+  /**
+   * Handle general stream values list
+   *
+   * @param values
+   */
+  override def onValues(values: util.List[AnyRef])(implicit input: Tuple): Unit = {
+    flatMapper(values,new Collector[Any] {
+      override def collect(r: Any): Unit = emit(r)(input)
+    })
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormFlatMapperWrapper.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormFlatMapperWrapper.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormFlatMapperWrapper.scala
new file mode 100644
index 0000000..e5fb86d
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormFlatMapperWrapper.scala
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.datastream.storm
+
+import backtype.storm.tuple.Tuple
+import org.apache.eagle.datastream.{Collector, FlatMapper}
+import org.apache.eagle.datastream.core.StreamInfo
+import scala.collection.JavaConverters._
+
+case class StormFlatMapperWrapper(flatMapper:FlatMapper[Any])(implicit info:StreamInfo) extends AbstractStreamBolt[Any]{
+  /**
+   * Handle keyed stream value
+   */
+  override def onKeyValue(key: Any, value: Any)(implicit input: Tuple): Unit = {
+    flatMapper.flatMap(value.asInstanceOf[Seq[AnyRef]],new Collector[Any] {
+      override def collect(r: Any): Unit = emit(r)(input)
+    })
+  }
+
+  /**
+   * Handle general stream values list
+   *
+   * @param values
+   */
+  override def onValues(values: java.util.List[AnyRef])(implicit input: Tuple): Unit = {
+    flatMapper.flatMap(values.asScala,new Collector[Any] {
+      override def collect(r: Any): Unit = emit(r)(input)
+    })
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormSpoutFactory.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormSpoutFactory.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormSpoutFactory.scala
index 74ed11d..6a3b606 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormSpoutFactory.scala
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormSpoutFactory.scala
@@ -22,7 +22,7 @@ import java.util
 
 import backtype.storm.topology.base.BaseRichSpout
 import com.typesafe.config.Config
-import org.apache.eagle.datastream.core.{IterableStreamProducer, StormSourceProducer, StreamProducer}
+import org.apache.eagle.datastream.core.{IteratorStreamProducer, IterableStreamProducer, StormSourceProducer, StreamProducer}
 import org.apache.eagle.datastream.utils.NameConstants
 
 object StormSpoutFactory {
@@ -37,6 +37,8 @@ object StormSpoutFactory {
         }
       case p@IterableStreamProducer(iterable,recycle) =>
         IterableStreamSpout(iterable,recycle)
+      case p@IteratorStreamProducer(iterator) =>
+        IteratorStreamSpout(iterator)
       case _ =>
         throw new IllegalArgumentException(s"Cannot compile unknown $from to a Storm Spout")
     }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormTopologyCompiler.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormTopologyCompiler.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormTopologyCompiler.scala
index 18f52cb..f572377 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormTopologyCompiler.scala
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormTopologyCompiler.scala
@@ -36,7 +36,7 @@ case class StormTopologyCompiler(config: Config, graph: StreamProducerGraph) ext
   val boltCache = scala.collection.mutable.Map[StreamProducer[Any], StormBoltWrapper]()
 
   override def buildTopology: AbstractTopologyExecutor ={
-    val builder = new TopologyBuilder();
+    val builder = new TopologyBuilder()
     val iter = graph.iterator()
     val boltDeclarerCache = scala.collection.mutable.Map[String, BoltDeclarer]()
     val stormTopologyGraph = ListBuffer[String]()



[4/4] incubator-eagle git commit: EAGLE-130 Eagle Pipeline DSL: Parser, Compiler, Runner

Posted by ha...@apache.org.
EAGLE-130 Eagle Pipeline DSL: Parser, Compiler, Runner

https://issues.apache.org/jira/browse/EAGLE-130

Compile DSL Configure to Pipeline model
Compile Pipeline model to Stream Execution Graph
Submit Stream Execution Graph to actual running environment say storm
Support Alert and Persistence for metric monitoring
Pipeline runner CLI tool and shell script
Decouple pipeline compiler and scheduler into individual modules
Fix configuration conflict, should pass through Config instead of
ConfigFactory.load() manually
Override application configuration with pipeline configuration
Supports inputs field to define connector

Author: @haoch <ha...@apache.org>
Reviewer: @haoch <ha...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/c1485aac
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/c1485aac
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/c1485aac

Branch: refs/heads/master
Commit: c1485aac543cff1245b7eab359a89d60c35424b1
Parents: f6c63e7
Author: Hao Chen <ha...@apache.org>
Authored: Wed Jan 20 14:47:24 2016 +0800
Committer: Hao Chen <ha...@apache.org>
Committed: Wed Jan 20 14:47:24 2016 +0800

----------------------------------------------------------------------
 .travis.yml                                     |  22 ++
 CONTRIBUTING.md                                 |   2 +
 .../src/main/bin/eagle-create-table.rb          |   2 +
 .../executor/AlertExecutorCreationUtils.java    |   2 +
 .../src/test/resources/str.siddhiext            |  13 +-
 .../eagle-stream-pipeline/README.md             |  61 +++++
 .../eagle-stream-pipeline/pom.xml               | 154 ++++++++++++
 .../apache/eagle/stream/pipeline/Pipeline.scala |  27 +++
 .../stream/pipeline/annotation/Extension.scala  |  21 ++
 .../pipeline/compiler/PipelineCompiler.scala    |  61 +++++
 .../pipeline/extension/ModuleManager.scala      | 182 ++++++++++++++
 .../eagle/stream/pipeline/parser/DataFlow.scala | 235 +++++++++++++++++++
 .../eagle/stream/pipeline/parser/Pipeline.scala |  89 +++++++
 .../eagle/stream/pipeline/parser/Schema.scala   | 152 ++++++++++++
 .../stream/pipeline/runner/PipelineRunner.scala | 115 +++++++++
 .../stream/pipeline/utils/Exceptions.scala      |  20 ++
 .../src/test/resources/application.conf         |  34 +++
 .../src/test/resources/eagle-pipeline.sh        |  20 ++
 .../src/test/resources/log4j.properties         |  19 ++
 .../src/test/resources/pipeline_1.conf          | 131 +++++++++++
 .../src/test/resources/pipeline_2.conf          |  93 ++++++++
 .../src/test/resources/pipeline_3.conf          | 152 ++++++++++++
 .../src/test/resources/pipeline_4.conf          | 122 ++++++++++
 .../eagle/stream/pipeline/ConfigSpec.scala      |  37 +++
 .../eagle/stream/pipeline/DataFlowSpec.scala    | 113 +++++++++
 .../eagle/stream/pipeline/PipelineSpec.scala    |  57 +++++
 .../aggregate/AggregateExecutorFactory.java     |  13 +-
 .../impl/aggregate/SimpleAggregateExecutor.java |  11 +-
 .../impl/storm/kafka/JsonSerializer.java        |  58 +++++
 .../storm/kafka/KafkaSourcedSpoutProvider.java  |  42 ++--
 .../storm/kafka/KafkaSourcedSpoutScheme.java    |   2 +-
 .../eagle/datastream/ExecutionEnvironment.scala | 134 -----------
 .../datastream/ExecutionEnvironments.scala      | 140 +++++++++++
 .../datastream/core/ExecutionEnvironment.scala  |  67 ++----
 .../core/StreamAggregateExpansion.scala         |   4 +-
 .../datastream/core/StreamAlertExpansion.scala  |  59 ++++-
 .../eagle/datastream/core/StreamBuilder.scala   |  95 ++++++++
 .../eagle/datastream/core/StreamDAG.scala       |   6 +-
 .../datastream/core/StreamDAGTransformer.scala  |   1 +
 .../core/StreamParallelismConfigExpansion.scala |  10 +-
 .../eagle/datastream/core/StreamProducer.scala  | 104 +++++---
 .../eagle/datastream/core/StreamProtocol.scala  |  11 +-
 .../datastream/core/StreamSourceBuilder.scala   |  16 ++
 .../datastream/storm/AbstractStreamBolt.scala   |   2 +-
 .../datastream/storm/IterableStreamSpout.scala  |   6 +-
 .../datastream/storm/IteratorStreamSpout.scala  |  65 +++++
 .../storm/JsonMessageDeserializer.scala         |  14 +-
 .../datastream/storm/StormBoltFactory.scala     |  13 +-
 .../storm/StormExecutionEnvironment.scala       |   7 +-
 .../storm/StormFlatFunctionWrapper.scala        |  45 ++++
 .../storm/StormFlatMapperWrapper.scala          |  44 ++++
 .../datastream/storm/StormSpoutFactory.scala    |   4 +-
 .../storm/StormTopologyCompiler.scala           |   2 +-
 .../storm/StormTopologyExecutorImpl.scala       |   9 +-
 .../datastream/TestExecutionEnvironment.scala   |   2 +-
 .../eagle/datastream/TestTypeSafedDSL.scala     |  28 +++
 .../util/AbstractConfigOptionParser.java        |   6 +-
 .../eagle/dataproc/util/ConfigOptionParser.java |   3 +
 .../org/apache/eagle/datastream/Collector.scala |   4 +-
 .../apache/eagle/datastream/FlatMapper.scala    |   4 +
 eagle-core/eagle-data-process/pom.xml           |   6 +-
 .../eagle/alert/entity/AlertExecutorEntity.java |   3 +
 .../policy/siddhi/SiddhiPolicyEvaluator.java    |  46 ++--
 .../siddhi/SiddhiStreamMetadataUtils.java       |   2 +-
 .../src/main/resources/eagle.siddhiext          |  17 ++
 .../src/main/resources/str.siddhiext            |  13 +-
 .../eagle/storage/jdbc/TestJdbcStorage.java     |   2 +-
 eagle-samples/pom.xml                           |  31 +--
 .../entity/FileSensitivityAPIEntity.java        |  52 ++++
 .../eagle/security/entity/FileStatusEntity.java | 176 ++++++++++++++
 .../security/entity/HbaseResourceEntity.java    | 105 +++++++++
 .../HbaseResourceSensitivityAPIEntity.java      |  47 ++++
 .../entity/HdfsUserCommandPatternEntity.java    |  82 +++++++
 .../security/entity/HiveResourceEntity.java     | 104 ++++++++
 .../HiveResourceSensitivityAPIEntity.java       |  53 +++++
 .../eagle/security/entity/IPZoneEntity.java     |  52 ++++
 .../entity/SecurityEntityRepository.java        |  32 +++
 .../security/hbase/HbaseResourceEntity.java     | 105 ---------
 .../HbaseResourceSensitivityAPIEntity.java      |  47 ----
 .../hbase/HbaseSecurityEntityRepository.java    |  26 --
 .../hdfs/entity/FileSensitivityAPIEntity.java   |  52 ----
 .../security/hdfs/entity/FileStatusEntity.java  | 176 --------------
 .../entity/HDFSSecurityEntityRepository.java    |  27 ---
 .../entity/HdfsUserCommandPatternEntity.java    |  82 -------
 .../security/hdfs/entity/IPZoneEntity.java      |  52 ----
 .../hive/entity/HiveResourceEntity.java         | 104 --------
 .../HiveResourceSensitivityAPIEntity.java       |  53 -----
 .../entity/HiveSecurityEntityRepository.java    |  25 --
 ...baseResourceSensitivityDataJoinExecutor.java |   2 +-
 .../HbaseResourceSensitivityPollingJob.java     |   5 +-
 .../hbase/HbaseMetadataBrowseWebResource.java   |   2 +-
 .../hbase/HbaseMetadataBrowseWebResponse.java   |   2 +-
 .../hbase/HbaseSensitivityResourceService.java  |   2 +-
 .../dao/HbaseMetadataAccessConfigDAOImpl.java   |   3 +-
 .../FileSensitivityDataJoinExecutor.java        |   3 +-
 .../HdfsUserCommandPatternByDBImpl.java         |   2 +-
 .../HdfsUserCommandPatternByFileImpl.java       |   2 +-
 .../auditlog/HdfsUserCommandPatternDAO.java     |   2 +-
 .../auditlog/HdfsUserCommandReassembler.java    |   2 +-
 .../auditlog/IPZoneDataJoinExecutor.java        |   3 +-
 .../timer/FileSensitivityPollingJob.java        |   4 +-
 .../auditlog/timer/IPZonePollingJob.java        |   3 +-
 .../TestHdfsUserCommandPatternByDB.java         |   2 +-
 .../TestHdfsUserCommandPatternByFile.java       |   3 +-
 .../hdfs/HDFSResourceSensitivityDataJoiner.java |   2 +-
 .../hdfs/HDFSResourceSensitivityService.java    |   2 +-
 .../hdfs/rest/HDFSResourceWebResource.java      |   2 +-
 .../hdfs/rest/HDFSResourceWebResponse.java      |   2 +-
 .../dao/HiveSensitivityMetadataDAOImpl.java     |   2 +-
 .../hive/res/HiveMetadataBrowseWebResource.java |   2 +-
 .../hive/res/HiveMetadataBrowseWebResponse.java |   2 +-
 ...HiveResourceSensitivityDataJoinExecutor.java |   2 +-
 .../HiveResourceSensitivityPollingJob.java      |   3 +-
 eagle-topology-assembly/pom.xml                 |   5 +
 eagle-webservice/pom.xml                        |   5 +
 .../src/main/resources/application.conf         |   6 +-
 pom.xml                                         |   8 +
 117 files changed, 3480 insertions(+), 1144 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
new file mode 100644
index 0000000..aa8bcd3
--- /dev/null
+++ b/.travis.yml
@@ -0,0 +1,22 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing software
+# distributed under the License is distributed on an "AS IS" BASIS
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+language: java
+
+jdk: openjdk7
+
+install : mvn install -DskipTests
+
+script: mvn clean compile test

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/CONTRIBUTING.md
----------------------------------------------------------------------
diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md
index 2cbcdb9..6722d8a 100755
--- a/CONTRIBUTING.md
+++ b/CONTRIBUTING.md
@@ -26,3 +26,5 @@ project's open source license. Whether or not you state this explicitly, by
 submitting any copyrighted material via pull request, email, or other means
 you agree to license the material under the project's open source license and
 warrant that you have the legal authority to do so.
+
+Learn more from [https://cwiki.apache.org/confluence/display/EAG/Contributing+to+Eagle](https://cwiki.apache.org/confluence/display/EAG/Contributing+to+Eagle)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-assembly/src/main/bin/eagle-create-table.rb
----------------------------------------------------------------------
diff --git a/eagle-assembly/src/main/bin/eagle-create-table.rb b/eagle-assembly/src/main/bin/eagle-create-table.rb
index 185deb9..21dc030 100644
--- a/eagle-assembly/src/main/bin/eagle-create-table.rb
+++ b/eagle-assembly/src/main/bin/eagle-create-table.rb
@@ -52,6 +52,8 @@ createEagleTable(admin, 'hbaseResourceSensitivity')
 createEagleTable(admin, 'mlmodel')
 createEagleTable(admin, 'userprofile')
 createEagleTable(admin, 'hfdsusercommandpattern')
+createEagleTable(admin, 'appCommand')
+createEagleTable(admin, 'appDefinition')
 createEagleTable(admin, 'serviceAudit')
 createEagleTable(admin, 'aggregatedef')
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/executor/AlertExecutorCreationUtils.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/executor/AlertExecutorCreationUtils.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/executor/AlertExecutorCreationUtils.java
index 75b00a2..8ab290e 100644
--- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/executor/AlertExecutorCreationUtils.java
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/executor/AlertExecutorCreationUtils.java
@@ -97,6 +97,8 @@ public class AlertExecutorCreationUtils {
                                                           String alertExecutorID, int numPartitions, String partitionerCls) throws Exception{
 		LOG.info("Creating alert executors with alertExecutorID: " + alertExecutorID + ", numPartitions: " + numPartitions + ", Partition class is: "+ partitionerCls);
 
+        // TODO: Create sourceStreams with alertExecutorID into AlertExecutorService
+
 		PolicyPartitioner partitioner = (PolicyPartitioner)Class.forName(partitionerCls).newInstance();
 		AlertExecutor[] alertExecutors = new AlertExecutor[numPartitions];
         String[] _sourceStreams = sourceStreams.toArray(new String[0]);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-core/eagle-alert/eagle-alert-process/src/test/resources/str.siddhiext
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/test/resources/str.siddhiext b/eagle-core/eagle-alert/eagle-alert-process/src/test/resources/str.siddhiext
index a7e2ddb..6b64e53 100644
--- a/eagle-core/eagle-alert/eagle-alert-process/src/test/resources/str.siddhiext
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/test/resources/str.siddhiext
@@ -1,11 +1,12 @@
 #
-# Copyright (c) 2015, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
 #
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
+#    http://www.apache.org/licenses/LICENSE-2.0
 #
 # Unless required by applicable law or agreed to in writing, software
 # distributed under the License is distributed on an "AS IS" BASIS,

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-core/eagle-data-process/eagle-stream-pipeline/README.md
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/README.md b/eagle-core/eagle-data-process/eagle-stream-pipeline/README.md
new file mode 100644
index 0000000..d44d156
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-pipeline/README.md
@@ -0,0 +1,61 @@
+Eagle Declarative Streaming DSL
+===============================
+
+DSL Format
+----------
+
+	{
+		config {
+		  config.key = configValue
+		}
+
+		schema {
+		  metricStreamSchema {
+		    metric: string
+		    value: double
+		    timestamp: long
+		  }
+		}
+
+		dataflow {
+		  kafkaSource.source1 {
+		    schema = "metricStreamSchema"
+		  }
+		  kafkaSource.source2 {
+		    schema = {
+		      metric: string
+		      value: double
+		      timestamp: long
+		    }
+		  }
+		}
+	}
+
+Usage
+-----
+
+	val pipeline = Pipeline.parseResource("pipeline.conf")
+	val stream = Pipeline.compile(pipeline)
+	stream.submit[storm]
+
+Features
+--------
+* [x] Compile DSL Configure to Pipeline model
+* [x] Compile Pipeline model to Stream Execution Graph
+* [x] Submit Stream Execution Graph to actual running environment say storm
+* [x] Support Alert and Persistence for metric monitoring
+* [ ] Extensible stream module management and automatically scan and register module
+* [x] Pipeline runner CLI tool and shell script
+* [ ] Decouple pipeline compiler and scheduler into individual modules
+* [ ] Stream Pipeline Scheduler
+* [ ] Graph editor to define streaming graph in UI
+* [?] JSON/Config & Scala Case Class Mapping (https://github.com/scala/pickling)
+* [?] Raw message structure oriented programing is a little ugly, we should define a generic message/event consist of [payload:stream/timestamp/serializer/deserializer,data:message]
+* [ ] Provide stream schema inline and send to metadata when submitting
+* [ ] UI should support specify executorId when defining new stream
+* [ ] Lack of a entity named StreamEntity for the workflow of defining topology&policy end-to-end
+* [!] Fix configuration conflict, should pass through Config instead of ConfigFactory.load() manually
+* [ ] Override application configuration with pipeline configuration
+* [ ] Refactor schema registration structure and automatically submit stream schema when submitting pipeline
+* [ ] Submit alertStream, alertExecutorId mapping to AlertExecutorService when submitting pipeline
+* [x] Supports `inputs` field to define connector
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-core/eagle-data-process/eagle-stream-pipeline/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/pom.xml b/eagle-core/eagle-data-process/eagle-stream-pipeline/pom.xml
new file mode 100644
index 0000000..e8965f5
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-pipeline/pom.xml
@@ -0,0 +1,154 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>eagle-data-process-parent</artifactId>
+        <groupId>eagle</groupId>
+        <version>0.3.0</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+    <artifactId>eagle-stream-pipeline</artifactId>
+    <dependencies>
+        <dependency>
+            <groupId>org.reflections</groupId>
+            <artifactId>reflections</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>eagle</groupId>
+            <artifactId>eagle-service-base</artifactId>
+            <version>${project.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.ow2.asm</groupId>
+                    <artifactId>asm-all</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>asm</groupId>
+                    <artifactId>asm</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>asm</groupId>
+                    <artifactId>asm-all</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>asm</groupId>
+                    <artifactId>asm-commons</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>asm</groupId>
+                    <artifactId>asm-tree</artifactId>
+                </exclusion>
+                <exclusion>
+                    <artifactId>servlet-api</artifactId>
+                    <groupId>javax.servlet</groupId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>eagle</groupId>
+            <artifactId>eagle-storage-base</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.scala-lang</groupId>
+            <artifactId>scala-reflect</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.scala-lang</groupId>
+            <artifactId>scala-compiler</artifactId>
+            <version>${scala.version}.0</version>
+        </dependency>
+        <dependency>
+            <groupId>org.scalatest</groupId>
+            <artifactId>scalatest_${scala.version}</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>eagle</groupId>
+            <artifactId>eagle-stream-process-api</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.typesafe.akka</groupId>
+            <artifactId>akka-actor_${scala.version}</artifactId>
+            <version>${akka.actor.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.typesafe.akka</groupId>
+            <artifactId>akka-testkit_${scala.version}</artifactId>
+            <version>${akka.actor.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+            <scope>compile</scope>
+        </dependency>
+    </dependencies>
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.scala-tools</groupId>
+                <artifactId>maven-scala-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>scala-compile-first</id>
+                        <phase>process-resources</phase>
+                        <goals>
+                            <goal>add-source</goal>
+                            <goal>compile</goal>
+                        </goals>
+                    </execution>
+                    <execution>
+                        <id>scala-test-compile</id>
+                        <phase>process-test-resources</phase>
+                        <goals>
+                            <goal>testCompile</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <configuration>
+                    <skipTests>true</skipTests>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.scalatest</groupId>
+                <artifactId>scalatest-maven-plugin</artifactId>
+                <configuration>
+                    <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
+                    <junitxml>.</junitxml>
+                    <filereports>TestSuite.txt</filereports>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>test</id>
+                        <goals>
+                            <goal>test</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/Pipeline.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/Pipeline.scala b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/Pipeline.scala
new file mode 100644
index 0000000..65ab390
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/Pipeline.scala
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.stream.pipeline
+
+
+import org.apache.eagle.stream.pipeline.compiler.PipelineCompiler
+import org.apache.eagle.stream.pipeline.parser.PipelineParser
+import org.apache.eagle.stream.pipeline.runner.PipelineRunner
+
+object Pipeline
+  extends PipelineRunner
+  with PipelineParser
+  with PipelineCompiler
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/annotation/Extension.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/annotation/Extension.scala b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/annotation/Extension.scala
new file mode 100644
index 0000000..2ff81d4
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/annotation/Extension.scala
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.stream.pipeline.annotation
+
+import scala.annotation.StaticAnnotation
+
+case class Extension(extType:String) extends StaticAnnotation
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/compiler/PipelineCompiler.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/compiler/PipelineCompiler.scala b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/compiler/PipelineCompiler.scala
new file mode 100644
index 0000000..df8bbe5
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/compiler/PipelineCompiler.scala
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.stream.pipeline.compiler
+
+
+import org.apache.eagle.datastream.core._
+import org.apache.eagle.stream.pipeline.extension.ModuleManager._
+import org.apache.eagle.stream.pipeline.parser._
+import org.apache.eagle.stream.pipeline.utils.CompileException
+
+trait PipelineCompiler {
+  def compile(pipeline:Pipeline):StreamContext = {
+    val context = new StreamContext(pipeline.config)
+    val dataflow = pipeline.dataflow
+    val dag = new StreamDAG(context.dag)
+    dataflow.getProcessors.map(buildStreamProducer(dag,_)).foreach(producer =>{
+      producer.initWith(dag.graph,pipeline.config)
+      dag.addVertex(producer)
+    })
+    dataflow.getConnectors.foreach(connector =>{
+      val from = dag.getNodeByName(connector.from).get
+      val to = dag.getNodeByName(connector.to).get
+      dag.addEdge(from,to,buildStreamConnector(from,to,dataflow,connector))
+    })
+    context
+  }
+  private def  buildStreamProducer(dag:StreamDAG,processor:Processor):StreamProducer[Any] = {
+    if(findModuleType(processor.getType)){
+      getModuleMapperByType(processor.getType).map(processor).nameAs(processor.getId).stream(processor.streamId)
+    } else {
+      throw new CompileException(s"Unknown processor type [${processor.getType}]")
+    }
+  }
+  private def buildStreamConnector(from:StreamProducer[Any],to:StreamProducer[Any],dataflow:DataFlow,connector:Connector):StreamConnector[Any,Any]={
+    var groupByIndexes:Seq[Int] = connector.groupByIndexes.orNull
+    if(groupByIndexes!=null ){
+      if(connector.groupByFields.isDefined) throw new CompileException(s"Both ${Connector.GROUP_BY_FIELD_FIELD} and ${Connector.GROUP_BY_INDEX_FIELD} is defined at same time")
+    } else if(connector.groupByFields.isDefined){
+      groupByIndexes = connector.groupByFields.get.map(dataflow.getProcessor(from.name).get.getSchema.get.indexOfAttribute)
+    }
+    if(groupByIndexes == null){
+      ShuffleConnector(from,to)
+    } else {
+      GroupbyFieldsConnector(from,to,groupByIndexes)
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/extension/ModuleManager.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/extension/ModuleManager.scala b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/extension/ModuleManager.scala
new file mode 100644
index 0000000..9c3e9d5
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/extension/ModuleManager.scala
@@ -0,0 +1,182 @@
+package org.apache.eagle.stream.pipeline.extension
+
+import java.util.Properties
+import java.util.concurrent.atomic.AtomicBoolean
+
+import com.typesafe.config.ConfigFactory
+import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSourcedSpoutProvider
+import org.apache.eagle.datastream.core._
+import org.apache.eagle.partition.PartitionStrategy
+import org.apache.eagle.stream.pipeline.parser.Processor
+import org.apache.kafka.clients.producer.{Callback, KafkaProducer, ProducerRecord, RecordMetadata}
+import org.slf4j.LoggerFactory
+
+import scala.collection.JavaConverters._
+
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+
+object ModuleManager{
+  def getModuleMapperByType(moduleType:String):ModuleMapper = {
+    classOfProcessorMapping(moduleType)
+  }
+
+  def findModuleType(moduleType:String):Boolean = classOfProcessorMapping.contains(moduleType)
+
+  val classOfProcessorMapping = Map[String,ModuleMapper](
+    "KafkaSource" -> KafkaSourceStreamProducer,
+    "KafkaSink" -> KafkaSinkStreamProducer,
+    "Alert" -> AlertStreamProducer,
+    "Persistence" -> PersistProducer,
+    "Aggregator" -> AggregatorProducer,
+    "Console" -> ConsoleStreamProducer
+  )
+}
+
+trait ModuleMapper{
+  def getType:String
+  def map(module:Processor):StreamProducer[Any]
+}
+object KafkaSourceStreamProducer extends ModuleMapper{
+  def getType = "KafkaSource"
+  override def map(module:Processor): StreamProducer[Any] = {
+    val config = module.getConfig
+    new StormSourceProducer[Any](new KafkaSourcedSpoutProvider(null).getSpout(ConfigFactory.parseMap(config.asJava)))
+  }
+}
+object KafkaSinkStreamProducer extends ModuleMapper{
+  def getType = "KafkaSink"
+  override def map(module:Processor): StreamProducer[Any] = {
+    val config = module.getConfig
+    ForeachProducer[AnyRef](KafkaSinkExecutor(config))
+  }
+}
+object ConsoleStreamProducer extends ModuleMapper{
+  override def getType: String = "Stdout"
+  override def map(module:Processor): StreamProducer[Any] = ForeachProducer[Any](m=>print(s"$m\n"))
+}
+object AlertStreamProducer extends ModuleMapper{
+  def getType:String = "Alert"
+  override def map(module:Processor): StreamProducer[Any] = {
+    val config = module.getConfig
+    val moduleId = module.getId
+    // Support create functional AlertStreamProducer constructor
+    new AlertStreamProducer (
+      upStreamNames = config.getOrElse("upStreamNames",if(module.inputIds!=null) module.inputIds.asJava else null).asInstanceOf[java.util.List[String]],
+      alertExecutorId = config.getOrElse("alertExecutorId",moduleId).asInstanceOf[String],
+      consume = config.getOrElse("consume",true).asInstanceOf[Boolean],
+      strategy = config.get("strategy") match {case Some(strategy)=> Class.forName(strategy.asInstanceOf[String]).newInstance().asInstanceOf[PartitionStrategy] case None => null}
+    )
+  }
+}
+
+object PersistProducer extends ModuleMapper{
+  override def getType = "Persistence"
+  override def map(module:Processor): StreamProducer[Any] = {
+    val config = module.getConfig
+    new PersistProducer(config.getOrElse("executorId",module.getId).asInstanceOf[String],StorageType.withName(config.getOrElse("storageType",null).asInstanceOf[String]))
+  }
+}
+
+object AggregatorProducer extends ModuleMapper{
+  override def getType: String = "Aggregator"
+  override def map(module:Processor): StreamProducer[Any] = {
+    val config = module.getConfig
+    new AggregateProducer(
+      upStreamNames = config.getOrElse("upStreamNames",if(module.inputIds!=null) module.inputIds.asJava else null).asInstanceOf[java.util.List[String]],
+      config.getOrElse("analyzer",module.getId).asInstanceOf[String],
+      config.get("sql") match {case Some(sql) => sql.asInstanceOf[String] case None => null },
+      config.get("strategy") match {case Some(strategy)=> Class.forName(strategy.asInstanceOf[String]).newInstance().asInstanceOf[PartitionStrategy] case None => null}
+    )
+  }
+}
+
+/**
+  * @todo currently support single topic now, should support topic selector
+  * @param config
+  */
+case class KafkaSinkExecutor(config:Map[String,AnyRef]) extends ((AnyRef) => Unit) with Serializable{
+  val TOPIC_KEY = "topic"
+  def getDefaultProps = {
+    val props = new Properties()
+    props.putAll(Map[String,AnyRef](
+      "bootstrap.servers" -> "localhost:6667",
+      "acks" -> "all",
+      "retries" -> "3",
+      "batch.size" -> "16384",
+      "linger.ms" -> "1",
+      "buffer.memory" -> "33554432",
+      "key.serializer" -> "org.apache.kafka.common.serialization.StringSerializer",
+      "value.serializer" -> classOf[org.apache.eagle.dataproc.impl.storm.kafka.JsonSerializer].getCanonicalName
+    ).asJava)
+    props
+  }
+
+  @transient var initialized:AtomicBoolean = new AtomicBoolean(false)
+  @transient var producer:KafkaProducer[String,AnyRef] = null
+  @transient var topic:String = null
+  @transient var timeoutMs:Long = 3000
+
+  val LOG = LoggerFactory.getLogger(classOf[KafkaSinkExecutor])
+
+  private def init():Unit = {
+    if(this.initialized != null && this.initialized.get()){
+      LOG.info("Already initialized, skip")
+      return
+    }
+    this.initialized = new AtomicBoolean(false)
+    if (producer != null) {
+      LOG.info(s"Closing $producer")
+      producer.close()
+    }
+    LOG.info("Initializing and creating Kafka Producer")
+    if (config.contains(TOPIC_KEY)) {
+      this.topic = config.get(TOPIC_KEY).get.asInstanceOf[String]
+    } else {
+      throw new IllegalStateException("topic is not defined")
+    }
+    val props = getDefaultProps
+    props.putAll((config - TOPIC_KEY).asJava)
+    producer = new KafkaProducer[String, AnyRef](props)
+    LOG.info(s"Created new KafkaProducer: $producer")
+    initialized.set(true)
+  }
+
+  override def apply(value: AnyRef): Unit = {
+    if(initialized == null || !initialized.get()) init()
+    if(topic == null) throw new IllegalStateException("topic is not defined")
+    val isList = value.isInstanceOf[java.util.List[AnyRef]]
+    val record: ProducerRecord[String, AnyRef] = if(isList){
+      val list = value.asInstanceOf[java.util.List[AnyRef]]
+      if(list.size() == 1) {
+        new ProducerRecord[String, AnyRef](topic, value.asInstanceOf[java.util.List[AnyRef]].get(0))
+      }else{
+        new ProducerRecord[String, AnyRef](topic, value)
+      }
+    }else{
+      new ProducerRecord[String, AnyRef](topic,value)
+    }
+    producer.send(record,new Callback(){
+      override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = {
+        if(exception!=null){
+          LOG.error(s"Failed to send record $value to topic: $topic",exception)
+        }
+      }
+    })
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/parser/DataFlow.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/parser/DataFlow.scala b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/parser/DataFlow.scala
new file mode 100644
index 0000000..7e1f4cf
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/parser/DataFlow.scala
@@ -0,0 +1,235 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.stream.pipeline.parser
+
+import com.typesafe.config.Config
+import org.apache.eagle.stream.pipeline.utils.ParseException
+
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+
+import scala.collection.mutable
+
+
+class DataFlow {
+  def getInputs(id: String):Seq[Processor] = {
+    this.getConnectors.filter(_.to.equals(id)).map(c => getProcessor(c.from).get)
+  }
+
+  /**
+    * Connect if not, do nothing if already connected
+    *
+    * @param from
+    * @param to
+    */
+  def connect(from: String, to: String): Unit = {
+    val connector = Connector(from,to,null)
+    var exists = false
+    connectors.foreach(c => exists = (c.from.equals(from) && c.to.equals(to)) || exists)
+    if(!exists) addConnector(connector)
+  }
+
+  private var processors = mutable.Map[String,Processor]()
+  private var connectors = mutable.Seq[Connector]()
+  def setProcessors(processors:Seq[Processor]):Unit = {
+    processors.foreach{module =>
+      this.processors.put(module.getId,module)
+    }
+  }
+  def setProcessors(processors:mutable.Map[String,Processor]):Unit = {
+    this.processors = processors
+  }
+  def setConnectors(connectors:Seq[Connector]):Unit = {
+    connectors.foreach(connector =>{
+      this.connectors :+= connector
+    })
+  }
+  def addProcessor(module:Processor):Unit = {
+    if(contains(module)) throw new IllegalArgumentException(s"Duplicated processor id error, ${module.getId} has already been defined as ${getProcessor(module.getId)}")
+    processors.put(module.getId,module)
+  }
+
+  def contains(module:Processor):Boolean = processors.contains(module.getId)
+  def addConnector(connector:Connector):Unit = {
+    connectors :+= connector
+  }
+  def getProcessors:Seq[Processor] = processors.values.toSeq
+  def getProcessor(processorId:String):Option[Processor] = processors.get(processorId)
+  def getConnectors:Seq[Connector] = connectors
+}
+
+/**
+  * Stream Processor
+  *
+  * @param processorId
+  * @param processorType
+  * @param schema
+  * @param processorConfig
+  */
+case class Processor(var processorId:String = null,var processorType:String = null,var schema:Schema = null, var processorConfig:Map[String,AnyRef] = null) extends Serializable {
+  private[pipeline] var inputs:Seq[Processor] = null
+  private[pipeline] var inputIds:Seq[String] = null
+
+  def getId:String = processorId
+  def getType:String = processorType
+  def getConfig:Map[String,AnyRef] = processorConfig
+  def getSchema:Option[Schema] = if(schema == null) None else Some(schema)
+
+  /**
+    * @todo assume processorId as streamId
+    * @return
+    */
+  def streamId = processorId
+}
+
+case class Connector (from:String,to:String, config:Map[String,AnyRef]) extends Serializable{
+  import Connector._
+
+  def group:Option[String] = config.get(GROUP_FIELD).asInstanceOf[Option[String]]
+  def groupByFields:Option[Seq[String]] = config.get(GROUP_BY_FIELD_FIELD) match {
+    case Some(obj) => Some(obj.asInstanceOf[java.util.List[String]].asScala.toSeq)
+    case None => None
+  }
+  def groupByIndexes:Option[Seq[Int]] = config.get(GROUP_BY_INDEX_FIELD) match {
+    case Some(obj) => Some(obj.asInstanceOf[java.util.List[java.lang.Integer]].asScala.toSeq.map(Int.unbox(_)))
+    case None => None
+  }
+}
+
+object Connector{
+  val GROUP_FIELD = "grouping"
+  val GROUP_BY_FIELD_FIELD = "groupByField"
+  val GROUP_BY_INDEX_FIELD = "groupByIndex"
+}
+
+private [pipeline]
+object Processor {
+  val SCHEMA_FIELD:String = "schema"
+  val INPUTS_FIELD = "inputs"
+  def parse(processorId:String,processorType:String,context:Map[String,AnyRef], schemaSet:SchemaSet):Processor = {
+    val schema = context.get(SCHEMA_FIELD) match {
+      case Some(schemaDef) => schemaDef match {
+        case schemaId:String => schemaSet.get(schemaId).getOrElse {
+          throw new ParseException(s"Schema [$schemaId] is not found but referred by [$processorType:$processorId] in $context")
+        }
+        case schemaMap:java.util.HashMap[String,AnyRef] => Schema.parse(schemaMap.toMap)
+        case _ => throw new ParseException(s"Illegal value for schema: $schemaDef")
+      }
+      case None => null
+    }
+    val instance = new Processor(processorId,processorType,schema,context-SCHEMA_FIELD)
+    if(context.contains(INPUTS_FIELD)) instance.inputIds = context.get(INPUTS_FIELD).get.asInstanceOf[java.util.List[String]].asScala.toSeq
+    instance
+  }
+}
+
+
+trait DataFlowParser {
+  def parse(config:Config,schemaSet:SchemaSet = SchemaSet.empty()):DataFlow = {
+    val dataw = new DataFlow()
+    val map = config.root().unwrapped().toMap
+
+    // Parse processors and connectors
+    map.foreach(entry => {
+      parseSingle(entry._1,entry._2.asInstanceOf[java.util.HashMap[String,AnyRef]].toMap,dataw,schemaSet)
+    })
+    expand(dataw)
+    validate(dataw)
+    dataw
+  }
+
+  private def expand(datafw: DataFlow):Unit = {
+    datafw.getProcessors.foreach(proc =>{
+      if(proc.inputIds!=null) {
+        proc.inputIds.foreach(id => {
+          // connect if not
+          datafw.connect(id,proc.getId)
+        })
+      }
+      proc.inputs = datafw.getInputs(proc.getId)
+      proc.inputIds = proc.inputs.map(_.getId)
+    })
+  }
+
+  private def
+  validate(pipeline:DataFlow): Unit ={
+    def checkModuleExists(id:String): Unit ={
+      pipeline.getProcessor(id).orElse {
+        throw new ParseException(s"Stream [$id] is not defined before being referred")
+      }
+    }
+
+    pipeline.getConnectors.foreach {connector =>
+      checkModuleExists(connector.from)
+      checkModuleExists(connector.to)
+    }
+  }
+
+  private def
+  parseSingle(identifier:String,config:Map[String,AnyRef],dataflow:DataFlow, schemaSet: SchemaSet):Unit = {
+    Identifier.parse(identifier) match {
+      case DefinitionIdentifier(processorType) => {
+        config foreach {entry =>
+          dataflow.addProcessor(Processor.parse(entry._1, processorType,entry._2.asInstanceOf[java.util.HashMap[String, AnyRef]].toMap,schemaSet))
+        }
+      }
+      case ConnectionIdentifier(fromIds,toId) => fromIds.foreach { fromId =>
+        if(fromId.eq(toId)) throw new ParseException(s"Can't connect $fromId to $toId")
+        dataflow.addConnector(Connector(fromId,toId,config))
+      }
+      case _ => ???
+    }
+  }
+}
+
+
+private[pipeline] trait Identifier
+
+private[pipeline] case class DefinitionIdentifier(moduleType: String) extends Identifier
+private[pipeline] case class ConnectionIdentifier(fromIds: Seq[String], toId: String) extends Identifier
+
+private[pipeline] object Identifier {
+  val ConnectorFlag = "->"
+  val UnitFlagSplitPattern = "\\|"
+  val UnitFlagChar = "|"
+  val ConnectorPattern = s"([\\w-|\\s]+)\\s+$ConnectorFlag\\s+([\\w-_]+)".r
+  def parse(identifier: String): Identifier = {
+    // ${id} -> ${id}
+    ConnectorPattern.findFirstMatchIn(identifier) match {
+      case Some(matcher) => {
+        if(matcher.groupCount != 2){
+          throw new ParseException(s"Illegal connector definition: $identifier")
+        }else{
+          val source = matcher.group(1)
+          val destination = matcher.group(2)
+          if(source.contains(UnitFlagChar)) {
+            val sources = source.split(UnitFlagSplitPattern).toSeq
+            ConnectionIdentifier(sources.map{_.trim()},destination)
+          }else{
+            ConnectionIdentifier(Seq(source),destination)
+          }
+        }
+      }
+      case None => {
+        if(identifier.contains(ConnectorFlag)) throw new ParseException(s"Failed to parse $identifier")
+        DefinitionIdentifier(identifier)
+      }
+    }
+  }
+}
+
+object DataFlow extends DataFlowParser
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/parser/Pipeline.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/parser/Pipeline.scala b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/parser/Pipeline.scala
new file mode 100644
index 0000000..cc1e009
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/parser/Pipeline.scala
@@ -0,0 +1,89 @@
+package org.apache.eagle.stream.pipeline.parser
+
+import java.io.File
+
+import com.typesafe.config.{Config, ConfigFactory}
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+case class Pipeline(config:Config,dataflow:DataFlow)
+
+/**
+ * Pipeline configuration parser
+ *
+ * For example:
+ *
+ * {{{
+ * <code>
+ * {
+ *    config {
+ *      execution.environment.config = someValue
+ *    }
+ *    schema {
+ *      metricStreamSchema {
+ *        metric: string
+ *        value: double
+ *        timestamp: long
+ *      }
+ *    }
+ *    dataflow {
+ *      kafkaSource.source1 {
+ *        schema = "metricStreamSchema"
+ *      }
+ *      kafkaSource.source2 {
+ *        schema = {
+ *          metric: string
+ *          value: double
+ *          timestamp: long
+ *        }
+ *      }
+ *    }
+ * }
+ * </code>
+ * }}}
+ */
+trait PipelineParser{
+  val CONFIG_FIELD = "config"
+  val SCHEMA_FIELD = "schema"
+  val DATAFLOW_FIELD = "dataflow"
+
+  def parse(config:Config):Pipeline = {
+    if(config.isEmpty) throw new IllegalArgumentException("Pipeline configuration is empty")
+    var pConfig:Config = ConfigFactory.empty()
+    var pSchemaSet:SchemaSet = SchemaSet.empty()
+    var pDataflow:DataFlow = null
+    if(config.hasPath(CONFIG_FIELD)) pConfig = config.getConfig(CONFIG_FIELD)
+    if(config.hasPath(SCHEMA_FIELD)) pSchemaSet = SchemaSet.parse(config.getConfig(SCHEMA_FIELD))
+    if(config.hasPath(DATAFLOW_FIELD)) pDataflow = DataFlow.parse(config.getConfig(DATAFLOW_FIELD),pSchemaSet)
+
+    // Merge pipeline config over base config
+    val baseConfig =ConfigFactory.load()
+    pConfig = if(pConfig!=null) pConfig.withFallback(baseConfig) else baseConfig
+    new Pipeline(pConfig,pDataflow)
+  }
+
+  def parseString(config:String):Pipeline = parse(ConfigFactory.parseString(config))
+  def parseResource(resource:String):Pipeline = {
+    // TODO: Load environment, currently hard-code with storm
+    if(resource.startsWith("/") || resource.startsWith("./")){
+      parse(ConfigFactory.parseFile(new File(resource)))
+    } else{
+      parse(ConfigFactory.parseResourcesAnySyntax(getClass.getClassLoader,resource))
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/parser/Schema.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/parser/Schema.scala b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/parser/Schema.scala
new file mode 100644
index 0000000..7653f9e
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/parser/Schema.scala
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.stream.pipeline.parser
+
+import com.typesafe.config.Config
+
+import scala.collection.JavaConversions.mapAsScalaMap
+import scala.collection.mutable
+
+class Field(name:String) extends Serializable{
+  def getName:String = name
+}
+
+case class StringField(name:String) extends Field(name)
+case class LongField(name:String) extends Field(name)
+case class IntegerField(name:String) extends Field(name)
+case class BooleanField(name:String) extends Field(name)
+case class FloatField(name:String) extends Field(name)
+case class DoubleField(name:String) extends Field(name)
+case class DatetimeField(name:String,format:String) extends Field(name)
+
+object Field{
+  def string(name:String) = StringField(name)
+  def long(name:String) = LongField(name)
+  def integer(name:String) = IntegerField(name)
+  def boolean(name:String) = BooleanField(name)
+  def float(name:String) = FloatField(name)
+  def double(name:String) = DoubleField(name)
+  def datetime(name:String)(format:String) = DatetimeField(name,format)
+
+  def apply(name:String,typeName:String):Field = typeName match {
+    case "string" => string(name)
+    case "long" => long(name)
+    case "integer" => integer(name)
+    case "boolean" => boolean(name)
+    case "float" => float(name)
+    case "double" => double(name)
+    case _ => throw new UnsupportedOperationException(s"""Unknown attribute type $typeName for attribute "$name"""")
+  }
+}
+
+case class Schema(attributes:Seq[Field]) extends Serializable{
+  def getAttribute(attributeName:String):Option[Field]={
+    if(attributes != null){
+      attributes.find(_.getName.eq(attributeName))
+    }else None
+  }
+
+  def indexOfAttribute(attributeName:String):Int = {
+    if(attributes != null){
+      attributes.indexWhere(_.getName.eq(attributeName))
+    } else -1
+  }
+
+  @throws[IllegalArgumentException]
+  def indexOfAttributeOrException(attributeName:String):Int = {
+    if(attributes != null){
+      attributes.indexWhere(_.getName.eq(attributeName))
+    } else throw new IllegalArgumentException(s"Attribute [$attributeName] is not found in stream $this")
+  }
+}
+
+object Schema{
+  def parse(map:Map[String,AnyRef]):Schema = {
+    new Schema(map.keys.map {attributeName =>
+      map(attributeName) match{
+        case simpleType:String => Field(attributeName,simpleType)
+        case complexType:java.util.Map[String,AnyRef] => throw new IllegalStateException(s"ComplexType attribute definition is not supported yet [$attributeName : $complexType] ")
+        case otherType@_ => throw new IllegalStateException(s"Illegal attribute definition $attributeName : $otherType")
+      }
+    }.toSeq)
+  }
+
+  /**
+   * @param attributes support string, symbol, Attribute and so on.
+   * @return
+   */
+  def build(attributes:Seq[AnyRef]):Schema = {
+    new Schema(attributes.map{ a:AnyRef =>
+      a match {
+        case t:(String, AnyRef) => {
+          t._2 match {
+            case v:String => Field(t._1,v)
+            case v:Symbol => Field(t._1,v.name)
+            case _ => throw new UnsupportedOperationException(s"Illegal attribute definition $a")
+          }
+        }
+        case t:Field => t
+        case _ => throw new UnsupportedOperationException(s"Illegal attribute definition $a")
+      }
+    })
+  }
+}
+
+private[pipeline] class StreamUndefinedException(message:String = "stream is not defined",throwable: Throwable = null) extends Exception(message,throwable)
+
+private[pipeline] class SchemaSet {
+  private val processorSchemaCache = mutable.Map[String,Schema]()
+  def set(schemaId:String,schema:Schema):Unit = {
+    if(processorSchemaCache.contains(schemaId)) throw new IllegalArgumentException(
+      s"""
+         |Failed to define schema for $schemaId as $schema,
+         |because it has been defined as ${processorSchemaCache(schemaId)},
+         |please call updateSchema(processorId,schema) instead
+       """)
+    processorSchemaCache.put(schemaId,schema)
+  }
+  def get(schemaId:String):Option[Schema] = processorSchemaCache.get(schemaId)
+}
+
+private[pipeline] object SchemaSet{
+  def empty() = new SchemaSet()
+  /**
+   * For example:
+   *
+   * <code>
+   *    {
+   *      metricStream {
+   *        metric: string
+   *        value: double
+   *        timestamp: long
+   *      }
+   *    }
+   * </code>
+   * @param schemaConfig
+   * @return
+   */
+  def parse(schemaConfig:Map[String,AnyRef]):SchemaSet = {
+    val schemas = new SchemaSet()
+    schemaConfig.foreach(entry =>{
+      schemas.set(entry._1,Schema.parse(entry._2.asInstanceOf[java.util.HashMap[String,AnyRef]].toMap))
+    })
+    schemas
+  }
+
+  def parse(config:Config):SchemaSet = parse(config.root().unwrapped().asInstanceOf[java.util.HashMap[String,AnyRef]].toMap)
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/runner/PipelineRunner.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/runner/PipelineRunner.scala b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/runner/PipelineRunner.scala
new file mode 100644
index 0000000..1c964e1
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/runner/PipelineRunner.scala
@@ -0,0 +1,115 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+package org.apache.eagle.stream.pipeline.runner
+
+import java.util
+
+import com.typesafe.config.Config
+import org.apache.commons.cli.{CommandLine, Options}
+import org.apache.eagle.dataproc.util.ConfigOptionParser
+import org.apache.eagle.datastream.ExecutionEnvironments.storm
+import org.apache.eagle.datastream.core.ExecutionEnvironment
+import org.apache.eagle.stream.pipeline.compiler.PipelineCompiler
+import org.apache.eagle.stream.pipeline.parser.PipelineParser
+import org.slf4j.LoggerFactory
+
+import scala.reflect.runtime.{universe => ru}
+
+trait PipelineRunner extends PipelineParser with PipelineCompiler{
+  import PipelineCLIOptionParser._
+  private val LOG = LoggerFactory.getLogger("PipelineCLIOptionParser")
+  def submit[T <: ExecutionEnvironment](resource:String)(implicit typeTag:ru.TypeTag[T]) =
+    compile(parseResource(resource)).submit[T]
+  def submit(resource:String,clazz:Class[ExecutionEnvironment]) =
+    compile(parseResource(resource)).submit(clazz)
+  def submit(pipelineConfig:Config,clazz:Class[ExecutionEnvironment]) =
+    compile(parse(pipelineConfig)).submit(clazz)
+  def submit[T <: ExecutionEnvironment](pipelineConfig:Config)(implicit typeTag: ru.TypeTag[T]) =
+    compile(parse(pipelineConfig)).submit[T]
+
+  def apply(args:Array[String]):PipelineRunner = {
+    new ConfigOptionParser().load(args)
+    this
+  }
+
+  def main(args: Array[String]): Unit = {
+    val config = PipelineCLIOptionParser.load(args)
+    if(config.hasPath(PIPELINE_CONFIG_KEY)) {
+      submit[storm](config.getString(PIPELINE_CONFIG_KEY))
+    } else {
+      sys.error(
+        s"""
+           |Error: --$PIPELINE_OPT_KEY is required
+           |$USAGE
+         """.stripMargin)
+    }
+  }
+}
+
+private[runner] object PipelineCLIOptionParser extends ConfigOptionParser{
+  val LOG = LoggerFactory.getLogger("PipelineCLIOptionParser")
+  val PIPELINE_OPT_KEY="pipeline"
+
+  val PIPELINE_CONFIG_KEY="pipeline.config"
+
+  val CONFIG_OPT_KEY="conf"
+  val CONFIG_RESOURCE_KEY="config.resource"
+  val CONFIG_FILE_KEY="config.file"
+  val USAGE =
+    """
+      |Usage: java org.apache.eagle.stream.pipeline.Pipeline [options]
+      |
+      |Options:
+      |   --pipeline   pipeline configuration
+      |   --conf       common configuration
+      |   --env        storm (support spark, etc later)
+      |   --mode       local/remote/cluster
+    """.stripMargin
+  
+  override protected def options(): Options = {
+    val options = super.options()
+    options.addOption(PIPELINE_OPT_KEY, true, "Pipeline configuration file")
+    options.addOption(CONFIG_OPT_KEY, true, "Config properties file")
+    options
+  }
+
+  override protected def parseCommand(cmd: CommandLine): util.Map[String, String] = {
+    val map = super.parseCommand(cmd)
+
+    if (cmd.hasOption(PIPELINE_OPT_KEY)) {
+      val pipelineConf = cmd.getOptionValue(PIPELINE_OPT_KEY)
+      if(pipelineConf == null){
+        throw new IllegalArgumentException(s"--$PIPELINE_OPT_KEY should not be null")
+      } else {
+        LOG.info(s"Set $PIPELINE_CONFIG_KEY as $pipelineConf")
+        map.put(PIPELINE_CONFIG_KEY, pipelineConf)
+      }
+    }
+
+    if(cmd.hasOption(CONFIG_OPT_KEY)){
+      val commonConf = cmd.getOptionValue(CONFIG_OPT_KEY)
+      if(commonConf.contains("/")){
+        LOG.info(s"Set $CONFIG_FILE_KEY as $commonConf")
+        map.put(CONFIG_FILE_KEY, commonConf)
+      }else {
+        LOG.info(s"Set $CONFIG_RESOURCE_KEY $commonConf")
+        map.put(CONFIG_RESOURCE_KEY, commonConf)
+      }
+    }
+    map
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/utils/Exceptions.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/utils/Exceptions.scala b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/utils/Exceptions.scala
new file mode 100644
index 0000000..1102a33
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/utils/Exceptions.scala
@@ -0,0 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.stream.pipeline.utils
+
+class ParseException(message:String) extends Exception(message)
+class CompileException(message:String) extends Exception(message)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/application.conf b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/application.conf
new file mode 100644
index 0000000..d285a6f
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/application.conf
@@ -0,0 +1,34 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing software
+# distributed under the License is distributed on an "AS IS" BASIS
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+{
+	"eagleProps" : {
+		"dataJoinPollIntervalSec" : 30
+		"mailHost" : "smtp.server.host"
+		"mailSmtpPort":"25"
+		"mailDebug" : "true"
+		"eagleService": {
+			"host": "localhost"
+			"port": 38080
+			"username": "admin"
+			"password": "secret"
+		}
+	}
+	"dynamicConfigSource" : {
+		"enabled" : true
+		"initDelayMillis" : 0
+		"delayMillis" : 30000
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/eagle-pipeline.sh
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/eagle-pipeline.sh b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/eagle-pipeline.sh
new file mode 100644
index 0000000..4250681
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/eagle-pipeline.sh
@@ -0,0 +1,20 @@
+#!/usr/bin/env bash
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing software
+# distributed under the License is distributed on an "AS IS" BASIS
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# ./eagle-pipeline.sh --pipeline [pipeline-definition-config] --config [base-configuration]
+
+
+

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/log4j.properties b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/log4j.properties
new file mode 100644
index 0000000..c8a4f46
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/log4j.properties
@@ -0,0 +1,19 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+log4j.rootLogger=INFO, stdout
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_1.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_1.conf b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_1.conf
new file mode 100644
index 0000000..8bd4fd3
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_1.conf
@@ -0,0 +1,131 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing software
+# distributed under the License is distributed on an "AS IS" BASIS
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+{
+	config {
+		alertExecutorConfigs {
+			defaultAlertExecutor  {
+				"parallelism" : 1
+				"partitioner" : "org.apache.eagle.policy.DefaultPolicyPartitioner"
+				"needValidation" : "true"
+			}
+		}
+		eagleProps  {
+			"site" : "sandbox"
+			"dataSource": "eventSource"
+			"dataJoinPollIntervalSec" : 30
+			"mailHost" : "mail.host.com"
+			"mailSmtpPort":"25"
+			"mailDebug" : "true"
+			"eagleService": {
+				"host": "localhost"
+				"port": 38080
+				"username": "admin"
+				"password": "secret"
+			}
+		}
+		dynamicConfigSource  {
+			"enabled" : true
+			"initDelayMillis" : 0
+			"delayMillis" : 30000
+		}
+	}
+
+	schema {
+		metricStreamSchema {
+			metric: string
+			value: double
+			timestamp: long
+		}
+	}
+
+	dataflow {
+		KafkaSource.metricStream_1 {
+			parallism = 1000
+			topic = "metric_event_1"
+			zkConnection = "localhost:2181"
+			zkConnectionTimeoutMS = 15000
+			consumerGroupId = "Consumer"
+			fetchSize = 1048586
+			transactionZKServers = "localhost"
+			transactionZKPort = 2181
+			transactionZKRoot = "/consumers"
+			transactionStateUpdateMS = 2000
+			deserializerClass = "org.apache.eagle.datastream.storm.JsonMessageDeserializer"
+			schema = "metricStreamSchema"
+		}
+
+		KafkaSource.metricStream_2 {
+			parallism = 1000
+			topic = "metric_event_2"
+			zkConnection = "localhost:2181"
+			zkConnectionTimeoutMS = 15000
+			consumerGroupId = "Consumer"
+			fetchSize = 1048586
+			transactionZKServers = "localhost"
+			transactionZKPort = 2181
+			transactionZKRoot = "/consumers"
+			transactionStateUpdateMS = 2000
+			deserializerClass = "org.apache.eagle.datastream.storm.JsonMessageDeserializer"
+		}
+
+		KafkaSource.metricStream_3{
+			parallism = 1000
+			topic = "metric_event_3"
+			zkConnection = "localhost:2181"
+			zkConnectionTimeoutMS = 15000
+			consumerGroupId = "Consumer"
+			fetchSize = 1048586
+			transactionZKServers = "localhost"
+			transactionZKPort = 2181
+			transactionZKRoot = "/consumers"
+			transactionStateUpdateMS = 2000
+			deserializerClass = "org.apache.eagle.datastream.storm.JsonMessageDeserializer"
+			schema = "metricStreamSchema"
+		}
+
+		KafkaSink.metricStore {
+			schema = "metricStreamSchema"
+			parallism = 1000
+			topic = "metric_event_2"
+			zkConnection = "localhost:2181"
+			zkConnectionTimeoutMS = 15000
+			consumerGroupId = "Consumer"
+			fetchSize = 1048586
+			transactionZKServers = "localhost"
+			transactionZKPort = 2181
+			transactionZKRoot = "/consumers"
+			transactionStateUpdateMS = 2000
+			deserializerClass = "org.apache.eagle.datastream.storm.JsonMessageDeserializer"
+		}
+
+		Alert.alert {
+			upStreamNames = [metricStream_1,metricStream_2]
+			alertExecutorId = defaultAlertExecutor
+		}
+
+//		aggregator.aggreator {
+//			executor = "aggreationExecutor"
+//		}
+
+		metricStream_1|metricStream_2 -> alert {
+			group = shuffle
+		}
+
+		metricStream_1|metricStream_2 -> metricStore {
+			group = shuffle
+		}
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_2.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_2.conf b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_2.conf
new file mode 100644
index 0000000..f458464
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_2.conf
@@ -0,0 +1,93 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing software
+# distributed under the License is distributed on an "AS IS" BASIS
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+{
+	config {
+		envContextConfig {
+			"env" : "storm"
+			"mode" : "local"
+			"topologyName" : "dsl-based-topology"
+		}
+		eagleProps  {
+			"site" : "sandbox"
+			"dataSource": "eventSource"
+			"dataJoinPollIntervalSec" : 30
+			"mailHost" : "mail.host.com"
+			"mailSmtpPort":"25"
+			"mailDebug" : "true"
+			"eagleService": {
+				"host": "localhost"
+				"port": 38080
+				"username": "admin"
+				"password": "secret"
+			}
+		}
+		dynamicConfigSource  {
+			"enabled" : true
+			"initDelayMillis" : 0
+			"delayMillis" : 30000
+		}
+	}
+
+	dataflow {
+		KafkaSource.metricStream_1 {
+			parallism = 1000
+			topic = "metric_event_1"
+			zkConnection = "localhost:2181"
+			zkConnectionTimeoutMS = 15000
+			consumerGroupId = "Consumer"
+			fetchSize = 1048586
+			transactionZKServers = "localhost"
+			transactionZKPort = 2181
+			transactionZKRoot = "/consumers"
+			transactionStateUpdateMS = 2000
+			deserializerClass = "org.apache.eagle.datastream.storm.JsonMessageDeserializer"
+		}
+
+		KafkaSource.metricStream_2 {
+			parallism = 1000
+			topic = "metric_event_2"
+			zkConnection = "localhost:2181"
+			zkConnectionTimeoutMS = 15000
+			consumerGroupId = "Consumer"
+			fetchSize = 1048586
+			transactionZKServers = "localhost"
+			transactionZKPort = 2181
+			transactionZKRoot = "/consumers"
+			transactionStateUpdateMS = 2000
+			deserializerClass = "org.apache.eagle.datastream.storm.JsonMessageDeserializer"
+		}
+
+		KafkaSource.metricStream_3{
+			parallism = 1000
+			topic = "metric_event_3"
+			zkConnection = "localhost:2181"
+			zkConnectionTimeoutMS = 15000
+			consumerGroupId = "Consumer"
+			fetchSize = 1048586
+			transactionZKServers = "localhost"
+			transactionZKPort = 2181
+			transactionZKRoot = "/consumers"
+			transactionStateUpdateMS = 2000
+			deserializerClass = "org.apache.eagle.datastream.storm.JsonMessageDeserializer"
+		}
+
+		Console.printer {}
+
+		metricStream_1|metricStream_2|metricStream_3 -> printer {
+			grouping = shuffle
+		}
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_3.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_3.conf b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_3.conf
new file mode 100644
index 0000000..2ee316c
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_3.conf
@@ -0,0 +1,152 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing software
+# distributed under the License is distributed on an "AS IS" BASIS
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+{
+	config {
+		envContextConfig {
+			"env" : "storm"
+			"mode" : "local"
+			"topologyName" : "dsl-based-topology"
+			"parallelismConfig" : {
+				"kafkaMsgConsumer" : 1
+			}
+		}
+		alertExecutorConfigs {
+			defaultAlertExecutor  {
+				"parallelism" : 1
+				"partitioner" : "org.apache.eagle.policy.DefaultPolicyPartitioner"
+				"needValidation" : "true"
+			}
+		}
+		eagleProps  {
+			"site" : "sandbox"
+			"dataSource": "HADOOP"
+			"dataJoinPollIntervalSec" : 30
+			"mailHost" : "atom.corp.ebay.com"
+			"mailSmtpPort":"25"
+			"mailDebug" : "true"
+			"eagleService": {
+				"host": "localhost"
+				"port": 38080
+				"username": "admin"
+				"password": "secret"
+			}
+		}
+		dynamicConfigSource  {
+			"enabled" : true
+			"initDelayMillis" : 0
+			"delayMillis" : 30000
+		}
+	}
+
+	schema {
+//		JmxStreamOne {
+//			attributes {
+//				metric: string
+//				value: double
+//				timestamp: long
+//			}
+//			alertExecutorId = [defaultAlertExecutor,anotherAlertExecutor]
+//		}
+		JmxStreamOne {
+			metric: string
+			value: double
+			timestamp: long
+		}
+	}
+
+	dataflow {
+		KafkaSource.JmxStreamOne {
+			parallism = 1000
+			topic = "metric_event_1"
+			zkConnection = "localhost:2181"
+			zkConnectionTimeoutMS = 15000
+			consumerGroupId = "Consumer"
+			fetchSize = 1048586
+			transactionZKServers = "localhost"
+			transactionZKPort = 2181
+			transactionZKRoot = "/consumers"
+			transactionStateUpdateMS = 2000
+			deserializerClass = "org.apache.eagle.datastream.storm.JsonMessageDeserializer"
+		}
+
+		KafkaSource.JmxStreamTwo {
+			parallism = 1000
+			topic = "metric_event_2"
+			zkConnection = "localhost:2181"
+			zkConnectionTimeoutMS = 15000
+			consumerGroupId = "Consumer"
+			fetchSize = 1048586
+			transactionZKServers = "localhost"
+			transactionZKPort = 2181
+			transactionZKRoot = "/consumers"
+			transactionStateUpdateMS = 2000
+			deserializerClass = "org.apache.eagle.datastream.storm.JsonMessageDeserializer"
+		}
+
+		KafkaSource.JmxStreamThree{
+			parallism = 1000
+			topic = "metric_event_3"
+			zkConnection = "localhost:2181"
+			zkConnectionTimeoutMS = 15000
+			consumerGroupId = "Consumer"
+			fetchSize = 1048586
+			transactionZKServers = "localhost"
+			transactionZKPort = 2181
+			transactionZKRoot = "/consumers"
+			transactionStateUpdateMS = 2000
+			deserializerClass = "org.apache.eagle.datastream.storm.JsonMessageDeserializer"
+		}
+
+		Console.printer {
+			format = "%s"
+		}
+
+		KafkaSink.metricStore {
+			topic = "metric_event_persist"
+		}
+
+//		KafkaSink.alertStore {
+//			"topic" = "alert_persist"
+//			"bootstrap.servers" = "localhost:6667"
+//		}
+
+		Alert.alert {
+			inputs = [JmxStreamOne,JmxStreamTwo,JmxStreamThree]
+
+			upStreamNames = [JmxStreamOne,JmxStreamTwo,JmxStreamThree]
+			alertExecutorId = defaultAlertExecutor
+		}
+
+//		Aggregator.aggreator {
+//			upStreamNames = []
+//			analyzerId = ""
+//			cepQl = ""
+//			strategy = ""
+//		}
+
+		JmxStreamOne|JmxStreamTwo|JmxStreamThree -> alert {
+			grouping = shuffle
+		}
+
+		JmxStreamOne|JmxStreamTwo|JmxStreamThree -> metricStore {
+			grouping = shuffle
+		}
+
+		JmxStreamOne|JmxStreamTwo|JmxStreamThree -> printer {
+			grouping = shuffle
+		}
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_4.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_4.conf b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_4.conf
new file mode 100644
index 0000000..9e297ee
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_4.conf
@@ -0,0 +1,122 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing software
+# distributed under the License is distributed on an "AS IS" BASIS
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+{
+	config {
+		envContextConfig {
+			"env" : "storm"
+			"mode" : "local"
+			"topologyName" : "dsl-topology"
+			"parallelismConfig" : {
+				"kafkaMsgConsumer" : 1
+			}
+		}
+		alertExecutorConfigs {
+			defaultAlertExecutor  {
+				"parallelism" : 1
+				"partitioner" : "org.apache.eagle.policy.DefaultPolicyPartitioner"
+				"needValidation" : "true"
+			}
+		}
+		eagleProps {
+			"site" : "sandbox"
+			"dataSource": "HADOOP"
+		}
+	}
+	
+	dataflow {
+		KafkaSource.JmxStreamOne {
+			parallism = 1000
+			topic = "metric_event_1"
+			zkConnection = "localhost:2181"
+			zkConnectionTimeoutMS = 15000
+			consumerGroupId = "Consumer"
+			fetchSize = 1048586
+			transactionZKServers = "localhost"
+			transactionZKPort = 2181
+			transactionZKRoot = "/consumers"
+			transactionStateUpdateMS = 2000
+			deserializerClass = "org.apache.eagle.datastream.storm.JsonMessageDeserializer"
+		}
+
+		KafkaSource.JmxStreamTwo {
+			parallism = 1000
+			topic = "metric_event_2"
+			zkConnection = "localhost:2181"
+			zkConnectionTimeoutMS = 15000
+			consumerGroupId = "Consumer"
+			fetchSize = 1048586
+			transactionZKServers = "localhost"
+			transactionZKPort = 2181
+			transactionZKRoot = "/consumers"
+			transactionStateUpdateMS = 2000
+			deserializerClass = "org.apache.eagle.datastream.storm.JsonMessageDeserializer"
+		}
+
+		KafkaSource.JmxStreamThree{
+			parallism = 1000
+			topic = "metric_event_3"
+			zkConnection = "localhost:2181"
+			zkConnectionTimeoutMS = 15000
+			consumerGroupId = "Consumer"
+			fetchSize = 1048586
+			transactionZKServers = "localhost"
+			transactionZKPort = 2181
+			transactionZKRoot = "/consumers"
+			transactionStateUpdateMS = 2000
+			deserializerClass = "org.apache.eagle.datastream.storm.JsonMessageDeserializer"
+		}
+
+		Console.printer {
+			format = "%s"
+		}
+
+		KafkaSink.metricStore {
+			topic = "metric_event_persist"
+		}
+
+//		KafkaSink.aggSink {
+//			topic = "metric_agg_persist"
+//		}
+
+		Alert.defaultAlertExecutor {
+			// upStreamNames = [JmxStreamOne,JmxStreamTwo,JmxStreamThree]
+			// alertExecutorId = defaultAlertExecutor
+		}
+
+//		Aggregator.Aggregator{ sql = """
+//				@info("query")
+//				from JmxStreamOne[value > 100.0] select * insert into OutputStream;
+//			"""
+//		}
+//		JmxStreamOne -> Aggregator{
+//			grouping = shuffle
+//		}
+//		Aggregator -> aggregatedSink{
+//			grouping = shuffle
+//		}
+		JmxStreamOne|JmxStreamTwo|JmxStreamThree -> defaultAlertExecutor {
+			grouping = shuffle
+		}
+
+		JmxStreamOne|JmxStreamTwo|JmxStreamThree -> metricStore {
+			grouping = shuffle
+		}
+
+		JmxStreamOne|JmxStreamTwo|JmxStreamThree -> printer {
+			grouping = shuffle
+		}
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/scala/org/apache/eagle/stream/pipeline/ConfigSpec.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/scala/org/apache/eagle/stream/pipeline/ConfigSpec.scala b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/scala/org/apache/eagle/stream/pipeline/ConfigSpec.scala
new file mode 100644
index 0000000..7b552da
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/scala/org/apache/eagle/stream/pipeline/ConfigSpec.scala
@@ -0,0 +1,37 @@
+package org.apache.eagle.stream.pipeline
+
+import com.typesafe.config.ConfigFactory
+import org.scalatest.{FlatSpec, Matchers}
+
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+class ConfigSpec extends FlatSpec with Matchers{
+  "Config" should "be overrode correctly" in {
+    val conf1 = ConfigFactory.parseString(
+      """
+        |value=1
+      """.stripMargin)
+    val conf2 = ConfigFactory.parseString(
+      """
+        |value=2
+      """.stripMargin)
+    val conf3 = conf1.withFallback(conf2)
+    val conf4 = conf2.withFallback(conf1)
+    conf3.getInt("value") should be(1)
+    conf4.getInt("value") should be(2)
+  }
+}