You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2016/01/20 07:48:24 UTC

[4/4] incubator-eagle git commit: EAGLE-130 Eagle Pipeline DSL: Parser, Compiler, Runner

EAGLE-130 Eagle Pipeline DSL: Parser, Compiler, Runner

https://issues.apache.org/jira/browse/EAGLE-130

Compile DSL Configure to Pipeline model
Compile Pipeline model to Stream Execution Graph
Submit Stream Execution Graph to actual running environment say storm
Support Alert and Persistence for metric monitoring
Pipeline runner CLI tool and shell script
Decouple pipeline compiler and scheduler into individual modules
Fix configuration conflict, should pass through Config instead of
ConfigFactory.load() manually
Override application configuration with pipeline configuration
Supports inputs field to define connector

Author: @haoch <ha...@apache.org>
Reviewer: @haoch <ha...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/c1485aac
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/c1485aac
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/c1485aac

Branch: refs/heads/master
Commit: c1485aac543cff1245b7eab359a89d60c35424b1
Parents: f6c63e7
Author: Hao Chen <ha...@apache.org>
Authored: Wed Jan 20 14:47:24 2016 +0800
Committer: Hao Chen <ha...@apache.org>
Committed: Wed Jan 20 14:47:24 2016 +0800

----------------------------------------------------------------------
 .travis.yml                                     |  22 ++
 CONTRIBUTING.md                                 |   2 +
 .../src/main/bin/eagle-create-table.rb          |   2 +
 .../executor/AlertExecutorCreationUtils.java    |   2 +
 .../src/test/resources/str.siddhiext            |  13 +-
 .../eagle-stream-pipeline/README.md             |  61 +++++
 .../eagle-stream-pipeline/pom.xml               | 154 ++++++++++++
 .../apache/eagle/stream/pipeline/Pipeline.scala |  27 +++
 .../stream/pipeline/annotation/Extension.scala  |  21 ++
 .../pipeline/compiler/PipelineCompiler.scala    |  61 +++++
 .../pipeline/extension/ModuleManager.scala      | 182 ++++++++++++++
 .../eagle/stream/pipeline/parser/DataFlow.scala | 235 +++++++++++++++++++
 .../eagle/stream/pipeline/parser/Pipeline.scala |  89 +++++++
 .../eagle/stream/pipeline/parser/Schema.scala   | 152 ++++++++++++
 .../stream/pipeline/runner/PipelineRunner.scala | 115 +++++++++
 .../stream/pipeline/utils/Exceptions.scala      |  20 ++
 .../src/test/resources/application.conf         |  34 +++
 .../src/test/resources/eagle-pipeline.sh        |  20 ++
 .../src/test/resources/log4j.properties         |  19 ++
 .../src/test/resources/pipeline_1.conf          | 131 +++++++++++
 .../src/test/resources/pipeline_2.conf          |  93 ++++++++
 .../src/test/resources/pipeline_3.conf          | 152 ++++++++++++
 .../src/test/resources/pipeline_4.conf          | 122 ++++++++++
 .../eagle/stream/pipeline/ConfigSpec.scala      |  37 +++
 .../eagle/stream/pipeline/DataFlowSpec.scala    | 113 +++++++++
 .../eagle/stream/pipeline/PipelineSpec.scala    |  57 +++++
 .../aggregate/AggregateExecutorFactory.java     |  13 +-
 .../impl/aggregate/SimpleAggregateExecutor.java |  11 +-
 .../impl/storm/kafka/JsonSerializer.java        |  58 +++++
 .../storm/kafka/KafkaSourcedSpoutProvider.java  |  42 ++--
 .../storm/kafka/KafkaSourcedSpoutScheme.java    |   2 +-
 .../eagle/datastream/ExecutionEnvironment.scala | 134 -----------
 .../datastream/ExecutionEnvironments.scala      | 140 +++++++++++
 .../datastream/core/ExecutionEnvironment.scala  |  67 ++----
 .../core/StreamAggregateExpansion.scala         |   4 +-
 .../datastream/core/StreamAlertExpansion.scala  |  59 ++++-
 .../eagle/datastream/core/StreamBuilder.scala   |  95 ++++++++
 .../eagle/datastream/core/StreamDAG.scala       |   6 +-
 .../datastream/core/StreamDAGTransformer.scala  |   1 +
 .../core/StreamParallelismConfigExpansion.scala |  10 +-
 .../eagle/datastream/core/StreamProducer.scala  | 104 +++++---
 .../eagle/datastream/core/StreamProtocol.scala  |  11 +-
 .../datastream/core/StreamSourceBuilder.scala   |  16 ++
 .../datastream/storm/AbstractStreamBolt.scala   |   2 +-
 .../datastream/storm/IterableStreamSpout.scala  |   6 +-
 .../datastream/storm/IteratorStreamSpout.scala  |  65 +++++
 .../storm/JsonMessageDeserializer.scala         |  14 +-
 .../datastream/storm/StormBoltFactory.scala     |  13 +-
 .../storm/StormExecutionEnvironment.scala       |   7 +-
 .../storm/StormFlatFunctionWrapper.scala        |  45 ++++
 .../storm/StormFlatMapperWrapper.scala          |  44 ++++
 .../datastream/storm/StormSpoutFactory.scala    |   4 +-
 .../storm/StormTopologyCompiler.scala           |   2 +-
 .../storm/StormTopologyExecutorImpl.scala       |   9 +-
 .../datastream/TestExecutionEnvironment.scala   |   2 +-
 .../eagle/datastream/TestTypeSafedDSL.scala     |  28 +++
 .../util/AbstractConfigOptionParser.java        |   6 +-
 .../eagle/dataproc/util/ConfigOptionParser.java |   3 +
 .../org/apache/eagle/datastream/Collector.scala |   4 +-
 .../apache/eagle/datastream/FlatMapper.scala    |   4 +
 eagle-core/eagle-data-process/pom.xml           |   6 +-
 .../eagle/alert/entity/AlertExecutorEntity.java |   3 +
 .../policy/siddhi/SiddhiPolicyEvaluator.java    |  46 ++--
 .../siddhi/SiddhiStreamMetadataUtils.java       |   2 +-
 .../src/main/resources/eagle.siddhiext          |  17 ++
 .../src/main/resources/str.siddhiext            |  13 +-
 .../eagle/storage/jdbc/TestJdbcStorage.java     |   2 +-
 eagle-samples/pom.xml                           |  31 +--
 .../entity/FileSensitivityAPIEntity.java        |  52 ++++
 .../eagle/security/entity/FileStatusEntity.java | 176 ++++++++++++++
 .../security/entity/HbaseResourceEntity.java    | 105 +++++++++
 .../HbaseResourceSensitivityAPIEntity.java      |  47 ++++
 .../entity/HdfsUserCommandPatternEntity.java    |  82 +++++++
 .../security/entity/HiveResourceEntity.java     | 104 ++++++++
 .../HiveResourceSensitivityAPIEntity.java       |  53 +++++
 .../eagle/security/entity/IPZoneEntity.java     |  52 ++++
 .../entity/SecurityEntityRepository.java        |  32 +++
 .../security/hbase/HbaseResourceEntity.java     | 105 ---------
 .../HbaseResourceSensitivityAPIEntity.java      |  47 ----
 .../hbase/HbaseSecurityEntityRepository.java    |  26 --
 .../hdfs/entity/FileSensitivityAPIEntity.java   |  52 ----
 .../security/hdfs/entity/FileStatusEntity.java  | 176 --------------
 .../entity/HDFSSecurityEntityRepository.java    |  27 ---
 .../entity/HdfsUserCommandPatternEntity.java    |  82 -------
 .../security/hdfs/entity/IPZoneEntity.java      |  52 ----
 .../hive/entity/HiveResourceEntity.java         | 104 --------
 .../HiveResourceSensitivityAPIEntity.java       |  53 -----
 .../entity/HiveSecurityEntityRepository.java    |  25 --
 ...baseResourceSensitivityDataJoinExecutor.java |   2 +-
 .../HbaseResourceSensitivityPollingJob.java     |   5 +-
 .../hbase/HbaseMetadataBrowseWebResource.java   |   2 +-
 .../hbase/HbaseMetadataBrowseWebResponse.java   |   2 +-
 .../hbase/HbaseSensitivityResourceService.java  |   2 +-
 .../dao/HbaseMetadataAccessConfigDAOImpl.java   |   3 +-
 .../FileSensitivityDataJoinExecutor.java        |   3 +-
 .../HdfsUserCommandPatternByDBImpl.java         |   2 +-
 .../HdfsUserCommandPatternByFileImpl.java       |   2 +-
 .../auditlog/HdfsUserCommandPatternDAO.java     |   2 +-
 .../auditlog/HdfsUserCommandReassembler.java    |   2 +-
 .../auditlog/IPZoneDataJoinExecutor.java        |   3 +-
 .../timer/FileSensitivityPollingJob.java        |   4 +-
 .../auditlog/timer/IPZonePollingJob.java        |   3 +-
 .../TestHdfsUserCommandPatternByDB.java         |   2 +-
 .../TestHdfsUserCommandPatternByFile.java       |   3 +-
 .../hdfs/HDFSResourceSensitivityDataJoiner.java |   2 +-
 .../hdfs/HDFSResourceSensitivityService.java    |   2 +-
 .../hdfs/rest/HDFSResourceWebResource.java      |   2 +-
 .../hdfs/rest/HDFSResourceWebResponse.java      |   2 +-
 .../dao/HiveSensitivityMetadataDAOImpl.java     |   2 +-
 .../hive/res/HiveMetadataBrowseWebResource.java |   2 +-
 .../hive/res/HiveMetadataBrowseWebResponse.java |   2 +-
 ...HiveResourceSensitivityDataJoinExecutor.java |   2 +-
 .../HiveResourceSensitivityPollingJob.java      |   3 +-
 eagle-topology-assembly/pom.xml                 |   5 +
 eagle-webservice/pom.xml                        |   5 +
 .../src/main/resources/application.conf         |   6 +-
 pom.xml                                         |   8 +
 117 files changed, 3480 insertions(+), 1144 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
new file mode 100644
index 0000000..aa8bcd3
--- /dev/null
+++ b/.travis.yml
@@ -0,0 +1,22 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing software
+# distributed under the License is distributed on an "AS IS" BASIS
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+language: java
+
+jdk: openjdk7
+
+install : mvn install -DskipTests
+
+script: mvn clean compile test

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/CONTRIBUTING.md
----------------------------------------------------------------------
diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md
index 2cbcdb9..6722d8a 100755
--- a/CONTRIBUTING.md
+++ b/CONTRIBUTING.md
@@ -26,3 +26,5 @@ project's open source license. Whether or not you state this explicitly, by
 submitting any copyrighted material via pull request, email, or other means
 you agree to license the material under the project's open source license and
 warrant that you have the legal authority to do so.
+
+Learn more from [https://cwiki.apache.org/confluence/display/EAG/Contributing+to+Eagle](https://cwiki.apache.org/confluence/display/EAG/Contributing+to+Eagle)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-assembly/src/main/bin/eagle-create-table.rb
----------------------------------------------------------------------
diff --git a/eagle-assembly/src/main/bin/eagle-create-table.rb b/eagle-assembly/src/main/bin/eagle-create-table.rb
index 185deb9..21dc030 100644
--- a/eagle-assembly/src/main/bin/eagle-create-table.rb
+++ b/eagle-assembly/src/main/bin/eagle-create-table.rb
@@ -52,6 +52,8 @@ createEagleTable(admin, 'hbaseResourceSensitivity')
 createEagleTable(admin, 'mlmodel')
 createEagleTable(admin, 'userprofile')
 createEagleTable(admin, 'hfdsusercommandpattern')
+createEagleTable(admin, 'appCommand')
+createEagleTable(admin, 'appDefinition')
 createEagleTable(admin, 'serviceAudit')
 createEagleTable(admin, 'aggregatedef')
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/executor/AlertExecutorCreationUtils.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/executor/AlertExecutorCreationUtils.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/executor/AlertExecutorCreationUtils.java
index 75b00a2..8ab290e 100644
--- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/executor/AlertExecutorCreationUtils.java
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/executor/AlertExecutorCreationUtils.java
@@ -97,6 +97,8 @@ public class AlertExecutorCreationUtils {
                                                           String alertExecutorID, int numPartitions, String partitionerCls) throws Exception{
 		LOG.info("Creating alert executors with alertExecutorID: " + alertExecutorID + ", numPartitions: " + numPartitions + ", Partition class is: "+ partitionerCls);
 
+        // TODO: Create sourceStreams with alertExecutorID into AlertExecutorService
+
 		PolicyPartitioner partitioner = (PolicyPartitioner)Class.forName(partitionerCls).newInstance();
 		AlertExecutor[] alertExecutors = new AlertExecutor[numPartitions];
         String[] _sourceStreams = sourceStreams.toArray(new String[0]);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-core/eagle-alert/eagle-alert-process/src/test/resources/str.siddhiext
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/test/resources/str.siddhiext b/eagle-core/eagle-alert/eagle-alert-process/src/test/resources/str.siddhiext
index a7e2ddb..6b64e53 100644
--- a/eagle-core/eagle-alert/eagle-alert-process/src/test/resources/str.siddhiext
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/test/resources/str.siddhiext
@@ -1,11 +1,12 @@
 #
-# Copyright (c) 2015, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
 #
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
+#    http://www.apache.org/licenses/LICENSE-2.0
 #
 # Unless required by applicable law or agreed to in writing, software
 # distributed under the License is distributed on an "AS IS" BASIS,

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-core/eagle-data-process/eagle-stream-pipeline/README.md
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/README.md b/eagle-core/eagle-data-process/eagle-stream-pipeline/README.md
new file mode 100644
index 0000000..d44d156
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-pipeline/README.md
@@ -0,0 +1,61 @@
+Eagle Declarative Streaming DSL
+===============================
+
+DSL Format
+----------
+
+	{
+		config {
+		  config.key = configValue
+		}
+
+		schema {
+		  metricStreamSchema {
+		    metric: string
+		    value: double
+		    timestamp: long
+		  }
+		}
+
+		dataflow {
+		  kafkaSource.source1 {
+		    schema = "metricStreamSchema"
+		  }
+		  kafkaSource.source2 {
+		    schema = {
+		      metric: string
+		      value: double
+		      timestamp: long
+		    }
+		  }
+		}
+	}
+
+Usage
+-----
+
+	val pipeline = Pipeline.parseResource("pipeline.conf")
+	val stream = Pipeline.compile(pipeline)
+	stream.submit[storm]
+
+Features
+--------
+* [x] Compile DSL Configure to Pipeline model
+* [x] Compile Pipeline model to Stream Execution Graph
+* [x] Submit Stream Execution Graph to actual running environment say storm
+* [x] Support Alert and Persistence for metric monitoring
+* [ ] Extensible stream module management and automatically scan and register module
+* [x] Pipeline runner CLI tool and shell script
+* [ ] Decouple pipeline compiler and scheduler into individual modules
+* [ ] Stream Pipeline Scheduler
+* [ ] Graph editor to define streaming graph in UI
+* [?] JSON/Config & Scala Case Class Mapping (https://github.com/scala/pickling)
+* [?] Raw message structure oriented programing is a little ugly, we should define a generic message/event consist of [payload:stream/timestamp/serializer/deserializer,data:message]
+* [ ] Provide stream schema inline and send to metadata when submitting
+* [ ] UI should support specify executorId when defining new stream
+* [ ] Lack of a entity named StreamEntity for the workflow of defining topology&policy end-to-end
+* [!] Fix configuration conflict, should pass through Config instead of ConfigFactory.load() manually
+* [ ] Override application configuration with pipeline configuration
+* [ ] Refactor schema registration structure and automatically submit stream schema when submitting pipeline
+* [ ] Submit alertStream, alertExecutorId mapping to AlertExecutorService when submitting pipeline
+* [x] Supports `inputs` field to define connector
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-core/eagle-data-process/eagle-stream-pipeline/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/pom.xml b/eagle-core/eagle-data-process/eagle-stream-pipeline/pom.xml
new file mode 100644
index 0000000..e8965f5
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-pipeline/pom.xml
@@ -0,0 +1,154 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>eagle-data-process-parent</artifactId>
+        <groupId>eagle</groupId>
+        <version>0.3.0</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+    <artifactId>eagle-stream-pipeline</artifactId>
+    <dependencies>
+        <dependency>
+            <groupId>org.reflections</groupId>
+            <artifactId>reflections</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>eagle</groupId>
+            <artifactId>eagle-service-base</artifactId>
+            <version>${project.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.ow2.asm</groupId>
+                    <artifactId>asm-all</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>asm</groupId>
+                    <artifactId>asm</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>asm</groupId>
+                    <artifactId>asm-all</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>asm</groupId>
+                    <artifactId>asm-commons</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>asm</groupId>
+                    <artifactId>asm-tree</artifactId>
+                </exclusion>
+                <exclusion>
+                    <artifactId>servlet-api</artifactId>
+                    <groupId>javax.servlet</groupId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>eagle</groupId>
+            <artifactId>eagle-storage-base</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.scala-lang</groupId>
+            <artifactId>scala-reflect</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.scala-lang</groupId>
+            <artifactId>scala-compiler</artifactId>
+            <version>${scala.version}.0</version>
+        </dependency>
+        <dependency>
+            <groupId>org.scalatest</groupId>
+            <artifactId>scalatest_${scala.version}</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>eagle</groupId>
+            <artifactId>eagle-stream-process-api</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.typesafe.akka</groupId>
+            <artifactId>akka-actor_${scala.version}</artifactId>
+            <version>${akka.actor.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.typesafe.akka</groupId>
+            <artifactId>akka-testkit_${scala.version}</artifactId>
+            <version>${akka.actor.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+            <scope>compile</scope>
+        </dependency>
+    </dependencies>
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.scala-tools</groupId>
+                <artifactId>maven-scala-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>scala-compile-first</id>
+                        <phase>process-resources</phase>
+                        <goals>
+                            <goal>add-source</goal>
+                            <goal>compile</goal>
+                        </goals>
+                    </execution>
+                    <execution>
+                        <id>scala-test-compile</id>
+                        <phase>process-test-resources</phase>
+                        <goals>
+                            <goal>testCompile</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <configuration>
+                    <skipTests>true</skipTests>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.scalatest</groupId>
+                <artifactId>scalatest-maven-plugin</artifactId>
+                <configuration>
+                    <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
+                    <junitxml>.</junitxml>
+                    <filereports>TestSuite.txt</filereports>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>test</id>
+                        <goals>
+                            <goal>test</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/Pipeline.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/Pipeline.scala b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/Pipeline.scala
new file mode 100644
index 0000000..65ab390
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/Pipeline.scala
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.stream.pipeline
+
+
+import org.apache.eagle.stream.pipeline.compiler.PipelineCompiler
+import org.apache.eagle.stream.pipeline.parser.PipelineParser
+import org.apache.eagle.stream.pipeline.runner.PipelineRunner
+
+object Pipeline
+  extends PipelineRunner
+  with PipelineParser
+  with PipelineCompiler
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/annotation/Extension.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/annotation/Extension.scala b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/annotation/Extension.scala
new file mode 100644
index 0000000..2ff81d4
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/annotation/Extension.scala
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.stream.pipeline.annotation
+
+import scala.annotation.StaticAnnotation
+
+case class Extension(extType:String) extends StaticAnnotation
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/compiler/PipelineCompiler.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/compiler/PipelineCompiler.scala b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/compiler/PipelineCompiler.scala
new file mode 100644
index 0000000..df8bbe5
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/compiler/PipelineCompiler.scala
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.stream.pipeline.compiler
+
+
+import org.apache.eagle.datastream.core._
+import org.apache.eagle.stream.pipeline.extension.ModuleManager._
+import org.apache.eagle.stream.pipeline.parser._
+import org.apache.eagle.stream.pipeline.utils.CompileException
+
+trait PipelineCompiler {
+  def compile(pipeline:Pipeline):StreamContext = {
+    val context = new StreamContext(pipeline.config)
+    val dataflow = pipeline.dataflow
+    val dag = new StreamDAG(context.dag)
+    dataflow.getProcessors.map(buildStreamProducer(dag,_)).foreach(producer =>{
+      producer.initWith(dag.graph,pipeline.config)
+      dag.addVertex(producer)
+    })
+    dataflow.getConnectors.foreach(connector =>{
+      val from = dag.getNodeByName(connector.from).get
+      val to = dag.getNodeByName(connector.to).get
+      dag.addEdge(from,to,buildStreamConnector(from,to,dataflow,connector))
+    })
+    context
+  }
+  private def  buildStreamProducer(dag:StreamDAG,processor:Processor):StreamProducer[Any] = {
+    if(findModuleType(processor.getType)){
+      getModuleMapperByType(processor.getType).map(processor).nameAs(processor.getId).stream(processor.streamId)
+    } else {
+      throw new CompileException(s"Unknown processor type [${processor.getType}]")
+    }
+  }
+  private def buildStreamConnector(from:StreamProducer[Any],to:StreamProducer[Any],dataflow:DataFlow,connector:Connector):StreamConnector[Any,Any]={
+    var groupByIndexes:Seq[Int] = connector.groupByIndexes.orNull
+    if(groupByIndexes!=null ){
+      if(connector.groupByFields.isDefined) throw new CompileException(s"Both ${Connector.GROUP_BY_FIELD_FIELD} and ${Connector.GROUP_BY_INDEX_FIELD} is defined at same time")
+    } else if(connector.groupByFields.isDefined){
+      groupByIndexes = connector.groupByFields.get.map(dataflow.getProcessor(from.name).get.getSchema.get.indexOfAttribute)
+    }
+    if(groupByIndexes == null){
+      ShuffleConnector(from,to)
+    } else {
+      GroupbyFieldsConnector(from,to,groupByIndexes)
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/extension/ModuleManager.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/extension/ModuleManager.scala b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/extension/ModuleManager.scala
new file mode 100644
index 0000000..9c3e9d5
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/extension/ModuleManager.scala
@@ -0,0 +1,182 @@
+package org.apache.eagle.stream.pipeline.extension
+
+import java.util.Properties
+import java.util.concurrent.atomic.AtomicBoolean
+
+import com.typesafe.config.ConfigFactory
+import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSourcedSpoutProvider
+import org.apache.eagle.datastream.core._
+import org.apache.eagle.partition.PartitionStrategy
+import org.apache.eagle.stream.pipeline.parser.Processor
+import org.apache.kafka.clients.producer.{Callback, KafkaProducer, ProducerRecord, RecordMetadata}
+import org.slf4j.LoggerFactory
+
+import scala.collection.JavaConverters._
+
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+
+object ModuleManager{
+  def getModuleMapperByType(moduleType:String):ModuleMapper = {
+    classOfProcessorMapping(moduleType)
+  }
+
+  def findModuleType(moduleType:String):Boolean = classOfProcessorMapping.contains(moduleType)
+
+  val classOfProcessorMapping = Map[String,ModuleMapper](
+    "KafkaSource" -> KafkaSourceStreamProducer,
+    "KafkaSink" -> KafkaSinkStreamProducer,
+    "Alert" -> AlertStreamProducer,
+    "Persistence" -> PersistProducer,
+    "Aggregator" -> AggregatorProducer,
+    "Console" -> ConsoleStreamProducer
+  )
+}
+
+trait ModuleMapper{
+  def getType:String
+  def map(module:Processor):StreamProducer[Any]
+}
+object KafkaSourceStreamProducer extends ModuleMapper{
+  def getType = "KafkaSource"
+  override def map(module:Processor): StreamProducer[Any] = {
+    val config = module.getConfig
+    new StormSourceProducer[Any](new KafkaSourcedSpoutProvider(null).getSpout(ConfigFactory.parseMap(config.asJava)))
+  }
+}
+object KafkaSinkStreamProducer extends ModuleMapper{
+  def getType = "KafkaSink"
+  override def map(module:Processor): StreamProducer[Any] = {
+    val config = module.getConfig
+    ForeachProducer[AnyRef](KafkaSinkExecutor(config))
+  }
+}
+object ConsoleStreamProducer extends ModuleMapper{
+  override def getType: String = "Stdout"
+  override def map(module:Processor): StreamProducer[Any] = ForeachProducer[Any](m=>print(s"$m\n"))
+}
+object AlertStreamProducer extends ModuleMapper{
+  def getType:String = "Alert"
+  override def map(module:Processor): StreamProducer[Any] = {
+    val config = module.getConfig
+    val moduleId = module.getId
+    // Support create functional AlertStreamProducer constructor
+    new AlertStreamProducer (
+      upStreamNames = config.getOrElse("upStreamNames",if(module.inputIds!=null) module.inputIds.asJava else null).asInstanceOf[java.util.List[String]],
+      alertExecutorId = config.getOrElse("alertExecutorId",moduleId).asInstanceOf[String],
+      consume = config.getOrElse("consume",true).asInstanceOf[Boolean],
+      strategy = config.get("strategy") match {case Some(strategy)=> Class.forName(strategy.asInstanceOf[String]).newInstance().asInstanceOf[PartitionStrategy] case None => null}
+    )
+  }
+}
+
+object PersistProducer extends ModuleMapper{
+  override def getType = "Persistence"
+  override def map(module:Processor): StreamProducer[Any] = {
+    val config = module.getConfig
+    new PersistProducer(config.getOrElse("executorId",module.getId).asInstanceOf[String],StorageType.withName(config.getOrElse("storageType",null).asInstanceOf[String]))
+  }
+}
+
+object AggregatorProducer extends ModuleMapper{
+  override def getType: String = "Aggregator"
+  override def map(module:Processor): StreamProducer[Any] = {
+    val config = module.getConfig
+    new AggregateProducer(
+      upStreamNames = config.getOrElse("upStreamNames",if(module.inputIds!=null) module.inputIds.asJava else null).asInstanceOf[java.util.List[String]],
+      config.getOrElse("analyzer",module.getId).asInstanceOf[String],
+      config.get("sql") match {case Some(sql) => sql.asInstanceOf[String] case None => null },
+      config.get("strategy") match {case Some(strategy)=> Class.forName(strategy.asInstanceOf[String]).newInstance().asInstanceOf[PartitionStrategy] case None => null}
+    )
+  }
+}
+
+/**
+  * @todo currently support single topic now, should support topic selector
+  * @param config
+  */
+case class KafkaSinkExecutor(config:Map[String,AnyRef]) extends ((AnyRef) => Unit) with Serializable{
+  val TOPIC_KEY = "topic"
+  def getDefaultProps = {
+    val props = new Properties()
+    props.putAll(Map[String,AnyRef](
+      "bootstrap.servers" -> "localhost:6667",
+      "acks" -> "all",
+      "retries" -> "3",
+      "batch.size" -> "16384",
+      "linger.ms" -> "1",
+      "buffer.memory" -> "33554432",
+      "key.serializer" -> "org.apache.kafka.common.serialization.StringSerializer",
+      "value.serializer" -> classOf[org.apache.eagle.dataproc.impl.storm.kafka.JsonSerializer].getCanonicalName
+    ).asJava)
+    props
+  }
+
+  @transient var initialized:AtomicBoolean = new AtomicBoolean(false)
+  @transient var producer:KafkaProducer[String,AnyRef] = null
+  @transient var topic:String = null
+  @transient var timeoutMs:Long = 3000
+
+  val LOG = LoggerFactory.getLogger(classOf[KafkaSinkExecutor])
+
+  private def init():Unit = {
+    if(this.initialized != null && this.initialized.get()){
+      LOG.info("Already initialized, skip")
+      return
+    }
+    this.initialized = new AtomicBoolean(false)
+    if (producer != null) {
+      LOG.info(s"Closing $producer")
+      producer.close()
+    }
+    LOG.info("Initializing and creating Kafka Producer")
+    if (config.contains(TOPIC_KEY)) {
+      this.topic = config.get(TOPIC_KEY).get.asInstanceOf[String]
+    } else {
+      throw new IllegalStateException("topic is not defined")
+    }
+    val props = getDefaultProps
+    props.putAll((config - TOPIC_KEY).asJava)
+    producer = new KafkaProducer[String, AnyRef](props)
+    LOG.info(s"Created new KafkaProducer: $producer")
+    initialized.set(true)
+  }
+
+  override def apply(value: AnyRef): Unit = {
+    if(initialized == null || !initialized.get()) init()
+    if(topic == null) throw new IllegalStateException("topic is not defined")
+    val isList = value.isInstanceOf[java.util.List[AnyRef]]
+    val record: ProducerRecord[String, AnyRef] = if(isList){
+      val list = value.asInstanceOf[java.util.List[AnyRef]]
+      if(list.size() == 1) {
+        new ProducerRecord[String, AnyRef](topic, value.asInstanceOf[java.util.List[AnyRef]].get(0))
+      }else{
+        new ProducerRecord[String, AnyRef](topic, value)
+      }
+    }else{
+      new ProducerRecord[String, AnyRef](topic,value)
+    }
+    producer.send(record,new Callback(){
+      override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = {
+        if(exception!=null){
+          LOG.error(s"Failed to send record $value to topic: $topic",exception)
+        }
+      }
+    })
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/parser/DataFlow.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/parser/DataFlow.scala b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/parser/DataFlow.scala
new file mode 100644
index 0000000..7e1f4cf
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/parser/DataFlow.scala
@@ -0,0 +1,235 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.stream.pipeline.parser
+
+import com.typesafe.config.Config
+import org.apache.eagle.stream.pipeline.utils.ParseException
+
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+
+import scala.collection.mutable
+
+
+class DataFlow {
+  def getInputs(id: String):Seq[Processor] = {
+    this.getConnectors.filter(_.to.equals(id)).map(c => getProcessor(c.from).get)
+  }
+
+  /**
+    * Connect if not, do nothing if already connected
+    *
+    * @param from
+    * @param to
+    */
+  def connect(from: String, to: String): Unit = {
+    val connector = Connector(from,to,null)
+    var exists = false
+    connectors.foreach(c => exists = (c.from.equals(from) && c.to.equals(to)) || exists)
+    if(!exists) addConnector(connector)
+  }
+
+  private var processors = mutable.Map[String,Processor]()
+  private var connectors = mutable.Seq[Connector]()
+  def setProcessors(processors:Seq[Processor]):Unit = {
+    processors.foreach{module =>
+      this.processors.put(module.getId,module)
+    }
+  }
+  def setProcessors(processors:mutable.Map[String,Processor]):Unit = {
+    this.processors = processors
+  }
+  def setConnectors(connectors:Seq[Connector]):Unit = {
+    connectors.foreach(connector =>{
+      this.connectors :+= connector
+    })
+  }
+  def addProcessor(module:Processor):Unit = {
+    if(contains(module)) throw new IllegalArgumentException(s"Duplicated processor id error, ${module.getId} has already been defined as ${getProcessor(module.getId)}")
+    processors.put(module.getId,module)
+  }
+
+  def contains(module:Processor):Boolean = processors.contains(module.getId)
+  def addConnector(connector:Connector):Unit = {
+    connectors :+= connector
+  }
+  def getProcessors:Seq[Processor] = processors.values.toSeq
+  def getProcessor(processorId:String):Option[Processor] = processors.get(processorId)
+  def getConnectors:Seq[Connector] = connectors
+}
+
+/**
+  * Stream Processor
+  *
+  * @param processorId
+  * @param processorType
+  * @param schema
+  * @param processorConfig
+  */
+case class Processor(var processorId:String = null,var processorType:String = null,var schema:Schema = null, var processorConfig:Map[String,AnyRef] = null) extends Serializable {
+  private[pipeline] var inputs:Seq[Processor] = null
+  private[pipeline] var inputIds:Seq[String] = null
+
+  def getId:String = processorId
+  def getType:String = processorType
+  def getConfig:Map[String,AnyRef] = processorConfig
+  def getSchema:Option[Schema] = if(schema == null) None else Some(schema)
+
+  /**
+    * @todo assume processorId as streamId
+    * @return
+    */
+  def streamId = processorId
+}
+
+case class Connector (from:String,to:String, config:Map[String,AnyRef]) extends Serializable{
+  import Connector._
+
+  def group:Option[String] = config.get(GROUP_FIELD).asInstanceOf[Option[String]]
+  def groupByFields:Option[Seq[String]] = config.get(GROUP_BY_FIELD_FIELD) match {
+    case Some(obj) => Some(obj.asInstanceOf[java.util.List[String]].asScala.toSeq)
+    case None => None
+  }
+  def groupByIndexes:Option[Seq[Int]] = config.get(GROUP_BY_INDEX_FIELD) match {
+    case Some(obj) => Some(obj.asInstanceOf[java.util.List[java.lang.Integer]].asScala.toSeq.map(Int.unbox(_)))
+    case None => None
+  }
+}
+
+object Connector{
+  val GROUP_FIELD = "grouping"
+  val GROUP_BY_FIELD_FIELD = "groupByField"
+  val GROUP_BY_INDEX_FIELD = "groupByIndex"
+}
+
+private [pipeline]
+object Processor {
+  val SCHEMA_FIELD:String = "schema"
+  val INPUTS_FIELD = "inputs"
+  def parse(processorId:String,processorType:String,context:Map[String,AnyRef], schemaSet:SchemaSet):Processor = {
+    val schema = context.get(SCHEMA_FIELD) match {
+      case Some(schemaDef) => schemaDef match {
+        case schemaId:String => schemaSet.get(schemaId).getOrElse {
+          throw new ParseException(s"Schema [$schemaId] is not found but referred by [$processorType:$processorId] in $context")
+        }
+        case schemaMap:java.util.HashMap[String,AnyRef] => Schema.parse(schemaMap.toMap)
+        case _ => throw new ParseException(s"Illegal value for schema: $schemaDef")
+      }
+      case None => null
+    }
+    val instance = new Processor(processorId,processorType,schema,context-SCHEMA_FIELD)
+    if(context.contains(INPUTS_FIELD)) instance.inputIds = context.get(INPUTS_FIELD).get.asInstanceOf[java.util.List[String]].asScala.toSeq
+    instance
+  }
+}
+
+
+trait DataFlowParser {
+  def parse(config:Config,schemaSet:SchemaSet = SchemaSet.empty()):DataFlow = {
+    val dataw = new DataFlow()
+    val map = config.root().unwrapped().toMap
+
+    // Parse processors and connectors
+    map.foreach(entry => {
+      parseSingle(entry._1,entry._2.asInstanceOf[java.util.HashMap[String,AnyRef]].toMap,dataw,schemaSet)
+    })
+    expand(dataw)
+    validate(dataw)
+    dataw
+  }
+
+  private def expand(datafw: DataFlow):Unit = {
+    datafw.getProcessors.foreach(proc =>{
+      if(proc.inputIds!=null) {
+        proc.inputIds.foreach(id => {
+          // connect if not
+          datafw.connect(id,proc.getId)
+        })
+      }
+      proc.inputs = datafw.getInputs(proc.getId)
+      proc.inputIds = proc.inputs.map(_.getId)
+    })
+  }
+
+  private def
+  validate(pipeline:DataFlow): Unit ={
+    def checkModuleExists(id:String): Unit ={
+      pipeline.getProcessor(id).orElse {
+        throw new ParseException(s"Stream [$id] is not defined before being referred")
+      }
+    }
+
+    pipeline.getConnectors.foreach {connector =>
+      checkModuleExists(connector.from)
+      checkModuleExists(connector.to)
+    }
+  }
+
+  private def
+  parseSingle(identifier:String,config:Map[String,AnyRef],dataflow:DataFlow, schemaSet: SchemaSet):Unit = {
+    Identifier.parse(identifier) match {
+      case DefinitionIdentifier(processorType) => {
+        config foreach {entry =>
+          dataflow.addProcessor(Processor.parse(entry._1, processorType,entry._2.asInstanceOf[java.util.HashMap[String, AnyRef]].toMap,schemaSet))
+        }
+      }
+      case ConnectionIdentifier(fromIds,toId) => fromIds.foreach { fromId =>
+        if(fromId.eq(toId)) throw new ParseException(s"Can't connect $fromId to $toId")
+        dataflow.addConnector(Connector(fromId,toId,config))
+      }
+      case _ => ???
+    }
+  }
+}
+
+
+private[pipeline] trait Identifier
+
+private[pipeline] case class DefinitionIdentifier(moduleType: String) extends Identifier
+private[pipeline] case class ConnectionIdentifier(fromIds: Seq[String], toId: String) extends Identifier
+
+private[pipeline] object Identifier {
+  val ConnectorFlag = "->"
+  val UnitFlagSplitPattern = "\\|"
+  val UnitFlagChar = "|"
+  val ConnectorPattern = s"([\\w-|\\s]+)\\s+$ConnectorFlag\\s+([\\w-_]+)".r
+  def parse(identifier: String): Identifier = {
+    // ${id} -> ${id}
+    ConnectorPattern.findFirstMatchIn(identifier) match {
+      case Some(matcher) => {
+        if(matcher.groupCount != 2){
+          throw new ParseException(s"Illegal connector definition: $identifier")
+        }else{
+          val source = matcher.group(1)
+          val destination = matcher.group(2)
+          if(source.contains(UnitFlagChar)) {
+            val sources = source.split(UnitFlagSplitPattern).toSeq
+            ConnectionIdentifier(sources.map{_.trim()},destination)
+          }else{
+            ConnectionIdentifier(Seq(source),destination)
+          }
+        }
+      }
+      case None => {
+        if(identifier.contains(ConnectorFlag)) throw new ParseException(s"Failed to parse $identifier")
+        DefinitionIdentifier(identifier)
+      }
+    }
+  }
+}
+
+object DataFlow extends DataFlowParser
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/parser/Pipeline.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/parser/Pipeline.scala b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/parser/Pipeline.scala
new file mode 100644
index 0000000..cc1e009
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/parser/Pipeline.scala
@@ -0,0 +1,89 @@
+package org.apache.eagle.stream.pipeline.parser
+
+import java.io.File
+
+import com.typesafe.config.{Config, ConfigFactory}
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+case class Pipeline(config:Config,dataflow:DataFlow)
+
+/**
+ * Pipeline configuration parser
+ *
+ * For example:
+ *
+ * {{{
+ * <code>
+ * {
+ *    config {
+ *      execution.environment.config = someValue
+ *    }
+ *    schema {
+ *      metricStreamSchema {
+ *        metric: string
+ *        value: double
+ *        timestamp: long
+ *      }
+ *    }
+ *    dataflow {
+ *      kafkaSource.source1 {
+ *        schema = "metricStreamSchema"
+ *      }
+ *      kafkaSource.source2 {
+ *        schema = {
+ *          metric: string
+ *          value: double
+ *          timestamp: long
+ *        }
+ *      }
+ *    }
+ * }
+ * </code>
+ * }}}
+ */
+trait PipelineParser{
+  val CONFIG_FIELD = "config"
+  val SCHEMA_FIELD = "schema"
+  val DATAFLOW_FIELD = "dataflow"
+
+  def parse(config:Config):Pipeline = {
+    if(config.isEmpty) throw new IllegalArgumentException("Pipeline configuration is empty")
+    var pConfig:Config = ConfigFactory.empty()
+    var pSchemaSet:SchemaSet = SchemaSet.empty()
+    var pDataflow:DataFlow = null
+    if(config.hasPath(CONFIG_FIELD)) pConfig = config.getConfig(CONFIG_FIELD)
+    if(config.hasPath(SCHEMA_FIELD)) pSchemaSet = SchemaSet.parse(config.getConfig(SCHEMA_FIELD))
+    if(config.hasPath(DATAFLOW_FIELD)) pDataflow = DataFlow.parse(config.getConfig(DATAFLOW_FIELD),pSchemaSet)
+
+    // Merge pipeline config over base config
+    val baseConfig =ConfigFactory.load()
+    pConfig = if(pConfig!=null) pConfig.withFallback(baseConfig) else baseConfig
+    new Pipeline(pConfig,pDataflow)
+  }
+
+  def parseString(config:String):Pipeline = parse(ConfigFactory.parseString(config))
+  def parseResource(resource:String):Pipeline = {
+    // TODO: Load environment, currently hard-code with storm
+    if(resource.startsWith("/") || resource.startsWith("./")){
+      parse(ConfigFactory.parseFile(new File(resource)))
+    } else{
+      parse(ConfigFactory.parseResourcesAnySyntax(getClass.getClassLoader,resource))
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/parser/Schema.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/parser/Schema.scala b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/parser/Schema.scala
new file mode 100644
index 0000000..7653f9e
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/parser/Schema.scala
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.stream.pipeline.parser
+
+import com.typesafe.config.Config
+
+import scala.collection.JavaConversions.mapAsScalaMap
+import scala.collection.mutable
+
+class Field(name:String) extends Serializable{
+  def getName:String = name
+}
+
+case class StringField(name:String) extends Field(name)
+case class LongField(name:String) extends Field(name)
+case class IntegerField(name:String) extends Field(name)
+case class BooleanField(name:String) extends Field(name)
+case class FloatField(name:String) extends Field(name)
+case class DoubleField(name:String) extends Field(name)
+case class DatetimeField(name:String,format:String) extends Field(name)
+
+object Field{
+  def string(name:String) = StringField(name)
+  def long(name:String) = LongField(name)
+  def integer(name:String) = IntegerField(name)
+  def boolean(name:String) = BooleanField(name)
+  def float(name:String) = FloatField(name)
+  def double(name:String) = DoubleField(name)
+  def datetime(name:String)(format:String) = DatetimeField(name,format)
+
+  def apply(name:String,typeName:String):Field = typeName match {
+    case "string" => string(name)
+    case "long" => long(name)
+    case "integer" => integer(name)
+    case "boolean" => boolean(name)
+    case "float" => float(name)
+    case "double" => double(name)
+    case _ => throw new UnsupportedOperationException(s"""Unknown attribute type $typeName for attribute "$name"""")
+  }
+}
+
+case class Schema(attributes:Seq[Field]) extends Serializable{
+  def getAttribute(attributeName:String):Option[Field]={
+    if(attributes != null){
+      attributes.find(_.getName.eq(attributeName))
+    }else None
+  }
+
+  def indexOfAttribute(attributeName:String):Int = {
+    if(attributes != null){
+      attributes.indexWhere(_.getName.eq(attributeName))
+    } else -1
+  }
+
+  @throws[IllegalArgumentException]
+  def indexOfAttributeOrException(attributeName:String):Int = {
+    if(attributes != null){
+      attributes.indexWhere(_.getName.eq(attributeName))
+    } else throw new IllegalArgumentException(s"Attribute [$attributeName] is not found in stream $this")
+  }
+}
+
+object Schema{
+  def parse(map:Map[String,AnyRef]):Schema = {
+    new Schema(map.keys.map {attributeName =>
+      map(attributeName) match{
+        case simpleType:String => Field(attributeName,simpleType)
+        case complexType:java.util.Map[String,AnyRef] => throw new IllegalStateException(s"ComplexType attribute definition is not supported yet [$attributeName : $complexType] ")
+        case otherType@_ => throw new IllegalStateException(s"Illegal attribute definition $attributeName : $otherType")
+      }
+    }.toSeq)
+  }
+
+  /**
+   * @param attributes support string, symbol, Attribute and so on.
+   * @return
+   */
+  def build(attributes:Seq[AnyRef]):Schema = {
+    new Schema(attributes.map{ a:AnyRef =>
+      a match {
+        case t:(String, AnyRef) => {
+          t._2 match {
+            case v:String => Field(t._1,v)
+            case v:Symbol => Field(t._1,v.name)
+            case _ => throw new UnsupportedOperationException(s"Illegal attribute definition $a")
+          }
+        }
+        case t:Field => t
+        case _ => throw new UnsupportedOperationException(s"Illegal attribute definition $a")
+      }
+    })
+  }
+}
+
+private[pipeline] class StreamUndefinedException(message:String = "stream is not defined",throwable: Throwable = null) extends Exception(message,throwable)
+
+private[pipeline] class SchemaSet {
+  private val processorSchemaCache = mutable.Map[String,Schema]()
+  def set(schemaId:String,schema:Schema):Unit = {
+    if(processorSchemaCache.contains(schemaId)) throw new IllegalArgumentException(
+      s"""
+         |Failed to define schema for $schemaId as $schema,
+         |because it has been defined as ${processorSchemaCache(schemaId)},
+         |please call updateSchema(processorId,schema) instead
+       """)
+    processorSchemaCache.put(schemaId,schema)
+  }
+  def get(schemaId:String):Option[Schema] = processorSchemaCache.get(schemaId)
+}
+
+private[pipeline] object SchemaSet{
+  def empty() = new SchemaSet()
+  /**
+   * For example:
+   *
+   * <code>
+   *    {
+   *      metricStream {
+   *        metric: string
+   *        value: double
+   *        timestamp: long
+   *      }
+   *    }
+   * </code>
+   * @param schemaConfig
+   * @return
+   */
+  def parse(schemaConfig:Map[String,AnyRef]):SchemaSet = {
+    val schemas = new SchemaSet()
+    schemaConfig.foreach(entry =>{
+      schemas.set(entry._1,Schema.parse(entry._2.asInstanceOf[java.util.HashMap[String,AnyRef]].toMap))
+    })
+    schemas
+  }
+
+  def parse(config:Config):SchemaSet = parse(config.root().unwrapped().asInstanceOf[java.util.HashMap[String,AnyRef]].toMap)
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/runner/PipelineRunner.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/runner/PipelineRunner.scala b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/runner/PipelineRunner.scala
new file mode 100644
index 0000000..1c964e1
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/runner/PipelineRunner.scala
@@ -0,0 +1,115 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+package org.apache.eagle.stream.pipeline.runner
+
+import java.util
+
+import com.typesafe.config.Config
+import org.apache.commons.cli.{CommandLine, Options}
+import org.apache.eagle.dataproc.util.ConfigOptionParser
+import org.apache.eagle.datastream.ExecutionEnvironments.storm
+import org.apache.eagle.datastream.core.ExecutionEnvironment
+import org.apache.eagle.stream.pipeline.compiler.PipelineCompiler
+import org.apache.eagle.stream.pipeline.parser.PipelineParser
+import org.slf4j.LoggerFactory
+
+import scala.reflect.runtime.{universe => ru}
+
+trait PipelineRunner extends PipelineParser with PipelineCompiler{
+  import PipelineCLIOptionParser._
+  private val LOG = LoggerFactory.getLogger("PipelineCLIOptionParser")
+  def submit[T <: ExecutionEnvironment](resource:String)(implicit typeTag:ru.TypeTag[T]) =
+    compile(parseResource(resource)).submit[T]
+  def submit(resource:String,clazz:Class[ExecutionEnvironment]) =
+    compile(parseResource(resource)).submit(clazz)
+  def submit(pipelineConfig:Config,clazz:Class[ExecutionEnvironment]) =
+    compile(parse(pipelineConfig)).submit(clazz)
+  def submit[T <: ExecutionEnvironment](pipelineConfig:Config)(implicit typeTag: ru.TypeTag[T]) =
+    compile(parse(pipelineConfig)).submit[T]
+
+  def apply(args:Array[String]):PipelineRunner = {
+    new ConfigOptionParser().load(args)
+    this
+  }
+
+  def main(args: Array[String]): Unit = {
+    val config = PipelineCLIOptionParser.load(args)
+    if(config.hasPath(PIPELINE_CONFIG_KEY)) {
+      submit[storm](config.getString(PIPELINE_CONFIG_KEY))
+    } else {
+      sys.error(
+        s"""
+           |Error: --$PIPELINE_OPT_KEY is required
+           |$USAGE
+         """.stripMargin)
+    }
+  }
+}
+
+private[runner] object PipelineCLIOptionParser extends ConfigOptionParser{
+  val LOG = LoggerFactory.getLogger("PipelineCLIOptionParser")
+  val PIPELINE_OPT_KEY="pipeline"
+
+  val PIPELINE_CONFIG_KEY="pipeline.config"
+
+  val CONFIG_OPT_KEY="conf"
+  val CONFIG_RESOURCE_KEY="config.resource"
+  val CONFIG_FILE_KEY="config.file"
+  val USAGE =
+    """
+      |Usage: java org.apache.eagle.stream.pipeline.Pipeline [options]
+      |
+      |Options:
+      |   --pipeline   pipeline configuration
+      |   --conf       common configuration
+      |   --env        storm (support spark, etc later)
+      |   --mode       local/remote/cluster
+    """.stripMargin
+  
+  override protected def options(): Options = {
+    val options = super.options()
+    options.addOption(PIPELINE_OPT_KEY, true, "Pipeline configuration file")
+    options.addOption(CONFIG_OPT_KEY, true, "Config properties file")
+    options
+  }
+
+  override protected def parseCommand(cmd: CommandLine): util.Map[String, String] = {
+    val map = super.parseCommand(cmd)
+
+    if (cmd.hasOption(PIPELINE_OPT_KEY)) {
+      val pipelineConf = cmd.getOptionValue(PIPELINE_OPT_KEY)
+      if(pipelineConf == null){
+        throw new IllegalArgumentException(s"--$PIPELINE_OPT_KEY should not be null")
+      } else {
+        LOG.info(s"Set $PIPELINE_CONFIG_KEY as $pipelineConf")
+        map.put(PIPELINE_CONFIG_KEY, pipelineConf)
+      }
+    }
+
+    if(cmd.hasOption(CONFIG_OPT_KEY)){
+      val commonConf = cmd.getOptionValue(CONFIG_OPT_KEY)
+      if(commonConf.contains("/")){
+        LOG.info(s"Set $CONFIG_FILE_KEY as $commonConf")
+        map.put(CONFIG_FILE_KEY, commonConf)
+      }else {
+        LOG.info(s"Set $CONFIG_RESOURCE_KEY $commonConf")
+        map.put(CONFIG_RESOURCE_KEY, commonConf)
+      }
+    }
+    map
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/utils/Exceptions.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/utils/Exceptions.scala b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/utils/Exceptions.scala
new file mode 100644
index 0000000..1102a33
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/utils/Exceptions.scala
@@ -0,0 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.stream.pipeline.utils
+
+class ParseException(message:String) extends Exception(message)
+class CompileException(message:String) extends Exception(message)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/application.conf b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/application.conf
new file mode 100644
index 0000000..d285a6f
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/application.conf
@@ -0,0 +1,34 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing software
+# distributed under the License is distributed on an "AS IS" BASIS
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+{
+	"eagleProps" : {
+		"dataJoinPollIntervalSec" : 30
+		"mailHost" : "smtp.server.host"
+		"mailSmtpPort":"25"
+		"mailDebug" : "true"
+		"eagleService": {
+			"host": "localhost"
+			"port": 38080
+			"username": "admin"
+			"password": "secret"
+		}
+	}
+	"dynamicConfigSource" : {
+		"enabled" : true
+		"initDelayMillis" : 0
+		"delayMillis" : 30000
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/eagle-pipeline.sh
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/eagle-pipeline.sh b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/eagle-pipeline.sh
new file mode 100644
index 0000000..4250681
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/eagle-pipeline.sh
@@ -0,0 +1,20 @@
+#!/usr/bin/env bash
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing software
+# distributed under the License is distributed on an "AS IS" BASIS
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# ./eagle-pipeline.sh --pipeline [pipeline-definition-config] --config [base-configuration]
+
+
+

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/log4j.properties b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/log4j.properties
new file mode 100644
index 0000000..c8a4f46
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/log4j.properties
@@ -0,0 +1,19 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+log4j.rootLogger=INFO, stdout
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_1.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_1.conf b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_1.conf
new file mode 100644
index 0000000..8bd4fd3
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_1.conf
@@ -0,0 +1,131 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing software
+# distributed under the License is distributed on an "AS IS" BASIS
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+{
+	config {
+		alertExecutorConfigs {
+			defaultAlertExecutor  {
+				"parallelism" : 1
+				"partitioner" : "org.apache.eagle.policy.DefaultPolicyPartitioner"
+				"needValidation" : "true"
+			}
+		}
+		eagleProps  {
+			"site" : "sandbox"
+			"dataSource": "eventSource"
+			"dataJoinPollIntervalSec" : 30
+			"mailHost" : "mail.host.com"
+			"mailSmtpPort":"25"
+			"mailDebug" : "true"
+			"eagleService": {
+				"host": "localhost"
+				"port": 38080
+				"username": "admin"
+				"password": "secret"
+			}
+		}
+		dynamicConfigSource  {
+			"enabled" : true
+			"initDelayMillis" : 0
+			"delayMillis" : 30000
+		}
+	}
+
+	schema {
+		metricStreamSchema {
+			metric: string
+			value: double
+			timestamp: long
+		}
+	}
+
+	dataflow {
+		KafkaSource.metricStream_1 {
+			parallism = 1000
+			topic = "metric_event_1"
+			zkConnection = "localhost:2181"
+			zkConnectionTimeoutMS = 15000
+			consumerGroupId = "Consumer"
+			fetchSize = 1048586
+			transactionZKServers = "localhost"
+			transactionZKPort = 2181
+			transactionZKRoot = "/consumers"
+			transactionStateUpdateMS = 2000
+			deserializerClass = "org.apache.eagle.datastream.storm.JsonMessageDeserializer"
+			schema = "metricStreamSchema"
+		}
+
+		KafkaSource.metricStream_2 {
+			parallism = 1000
+			topic = "metric_event_2"
+			zkConnection = "localhost:2181"
+			zkConnectionTimeoutMS = 15000
+			consumerGroupId = "Consumer"
+			fetchSize = 1048586
+			transactionZKServers = "localhost"
+			transactionZKPort = 2181
+			transactionZKRoot = "/consumers"
+			transactionStateUpdateMS = 2000
+			deserializerClass = "org.apache.eagle.datastream.storm.JsonMessageDeserializer"
+		}
+
+		KafkaSource.metricStream_3{
+			parallism = 1000
+			topic = "metric_event_3"
+			zkConnection = "localhost:2181"
+			zkConnectionTimeoutMS = 15000
+			consumerGroupId = "Consumer"
+			fetchSize = 1048586
+			transactionZKServers = "localhost"
+			transactionZKPort = 2181
+			transactionZKRoot = "/consumers"
+			transactionStateUpdateMS = 2000
+			deserializerClass = "org.apache.eagle.datastream.storm.JsonMessageDeserializer"
+			schema = "metricStreamSchema"
+		}
+
+		KafkaSink.metricStore {
+			schema = "metricStreamSchema"
+			parallism = 1000
+			topic = "metric_event_2"
+			zkConnection = "localhost:2181"
+			zkConnectionTimeoutMS = 15000
+			consumerGroupId = "Consumer"
+			fetchSize = 1048586
+			transactionZKServers = "localhost"
+			transactionZKPort = 2181
+			transactionZKRoot = "/consumers"
+			transactionStateUpdateMS = 2000
+			deserializerClass = "org.apache.eagle.datastream.storm.JsonMessageDeserializer"
+		}
+
+		Alert.alert {
+			upStreamNames = [metricStream_1,metricStream_2]
+			alertExecutorId = defaultAlertExecutor
+		}
+
+//		aggregator.aggreator {
+//			executor = "aggreationExecutor"
+//		}
+
+		metricStream_1|metricStream_2 -> alert {
+			group = shuffle
+		}
+
+		metricStream_1|metricStream_2 -> metricStore {
+			group = shuffle
+		}
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_2.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_2.conf b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_2.conf
new file mode 100644
index 0000000..f458464
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_2.conf
@@ -0,0 +1,93 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing software
+# distributed under the License is distributed on an "AS IS" BASIS
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+{
+	config {
+		envContextConfig {
+			"env" : "storm"
+			"mode" : "local"
+			"topologyName" : "dsl-based-topology"
+		}
+		eagleProps  {
+			"site" : "sandbox"
+			"dataSource": "eventSource"
+			"dataJoinPollIntervalSec" : 30
+			"mailHost" : "mail.host.com"
+			"mailSmtpPort":"25"
+			"mailDebug" : "true"
+			"eagleService": {
+				"host": "localhost"
+				"port": 38080
+				"username": "admin"
+				"password": "secret"
+			}
+		}
+		dynamicConfigSource  {
+			"enabled" : true
+			"initDelayMillis" : 0
+			"delayMillis" : 30000
+		}
+	}
+
+	dataflow {
+		KafkaSource.metricStream_1 {
+			parallism = 1000
+			topic = "metric_event_1"
+			zkConnection = "localhost:2181"
+			zkConnectionTimeoutMS = 15000
+			consumerGroupId = "Consumer"
+			fetchSize = 1048586
+			transactionZKServers = "localhost"
+			transactionZKPort = 2181
+			transactionZKRoot = "/consumers"
+			transactionStateUpdateMS = 2000
+			deserializerClass = "org.apache.eagle.datastream.storm.JsonMessageDeserializer"
+		}
+
+		KafkaSource.metricStream_2 {
+			parallism = 1000
+			topic = "metric_event_2"
+			zkConnection = "localhost:2181"
+			zkConnectionTimeoutMS = 15000
+			consumerGroupId = "Consumer"
+			fetchSize = 1048586
+			transactionZKServers = "localhost"
+			transactionZKPort = 2181
+			transactionZKRoot = "/consumers"
+			transactionStateUpdateMS = 2000
+			deserializerClass = "org.apache.eagle.datastream.storm.JsonMessageDeserializer"
+		}
+
+		KafkaSource.metricStream_3{
+			parallism = 1000
+			topic = "metric_event_3"
+			zkConnection = "localhost:2181"
+			zkConnectionTimeoutMS = 15000
+			consumerGroupId = "Consumer"
+			fetchSize = 1048586
+			transactionZKServers = "localhost"
+			transactionZKPort = 2181
+			transactionZKRoot = "/consumers"
+			transactionStateUpdateMS = 2000
+			deserializerClass = "org.apache.eagle.datastream.storm.JsonMessageDeserializer"
+		}
+
+		Console.printer {}
+
+		metricStream_1|metricStream_2|metricStream_3 -> printer {
+			grouping = shuffle
+		}
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_3.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_3.conf b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_3.conf
new file mode 100644
index 0000000..2ee316c
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_3.conf
@@ -0,0 +1,152 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing software
+# distributed under the License is distributed on an "AS IS" BASIS
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+{
+	config {
+		envContextConfig {
+			"env" : "storm"
+			"mode" : "local"
+			"topologyName" : "dsl-based-topology"
+			"parallelismConfig" : {
+				"kafkaMsgConsumer" : 1
+			}
+		}
+		alertExecutorConfigs {
+			defaultAlertExecutor  {
+				"parallelism" : 1
+				"partitioner" : "org.apache.eagle.policy.DefaultPolicyPartitioner"
+				"needValidation" : "true"
+			}
+		}
+		eagleProps  {
+			"site" : "sandbox"
+			"dataSource": "HADOOP"
+			"dataJoinPollIntervalSec" : 30
+			"mailHost" : "atom.corp.ebay.com"
+			"mailSmtpPort":"25"
+			"mailDebug" : "true"
+			"eagleService": {
+				"host": "localhost"
+				"port": 38080
+				"username": "admin"
+				"password": "secret"
+			}
+		}
+		dynamicConfigSource  {
+			"enabled" : true
+			"initDelayMillis" : 0
+			"delayMillis" : 30000
+		}
+	}
+
+	schema {
+//		JmxStreamOne {
+//			attributes {
+//				metric: string
+//				value: double
+//				timestamp: long
+//			}
+//			alertExecutorId = [defaultAlertExecutor,anotherAlertExecutor]
+//		}
+		JmxStreamOne {
+			metric: string
+			value: double
+			timestamp: long
+		}
+	}
+
+	dataflow {
+		KafkaSource.JmxStreamOne {
+			parallism = 1000
+			topic = "metric_event_1"
+			zkConnection = "localhost:2181"
+			zkConnectionTimeoutMS = 15000
+			consumerGroupId = "Consumer"
+			fetchSize = 1048586
+			transactionZKServers = "localhost"
+			transactionZKPort = 2181
+			transactionZKRoot = "/consumers"
+			transactionStateUpdateMS = 2000
+			deserializerClass = "org.apache.eagle.datastream.storm.JsonMessageDeserializer"
+		}
+
+		KafkaSource.JmxStreamTwo {
+			parallism = 1000
+			topic = "metric_event_2"
+			zkConnection = "localhost:2181"
+			zkConnectionTimeoutMS = 15000
+			consumerGroupId = "Consumer"
+			fetchSize = 1048586
+			transactionZKServers = "localhost"
+			transactionZKPort = 2181
+			transactionZKRoot = "/consumers"
+			transactionStateUpdateMS = 2000
+			deserializerClass = "org.apache.eagle.datastream.storm.JsonMessageDeserializer"
+		}
+
+		KafkaSource.JmxStreamThree{
+			parallism = 1000
+			topic = "metric_event_3"
+			zkConnection = "localhost:2181"
+			zkConnectionTimeoutMS = 15000
+			consumerGroupId = "Consumer"
+			fetchSize = 1048586
+			transactionZKServers = "localhost"
+			transactionZKPort = 2181
+			transactionZKRoot = "/consumers"
+			transactionStateUpdateMS = 2000
+			deserializerClass = "org.apache.eagle.datastream.storm.JsonMessageDeserializer"
+		}
+
+		Console.printer {
+			format = "%s"
+		}
+
+		KafkaSink.metricStore {
+			topic = "metric_event_persist"
+		}
+
+//		KafkaSink.alertStore {
+//			"topic" = "alert_persist"
+//			"bootstrap.servers" = "localhost:6667"
+//		}
+
+		Alert.alert {
+			inputs = [JmxStreamOne,JmxStreamTwo,JmxStreamThree]
+
+			upStreamNames = [JmxStreamOne,JmxStreamTwo,JmxStreamThree]
+			alertExecutorId = defaultAlertExecutor
+		}
+
+//		Aggregator.aggreator {
+//			upStreamNames = []
+//			analyzerId = ""
+//			cepQl = ""
+//			strategy = ""
+//		}
+
+		JmxStreamOne|JmxStreamTwo|JmxStreamThree -> alert {
+			grouping = shuffle
+		}
+
+		JmxStreamOne|JmxStreamTwo|JmxStreamThree -> metricStore {
+			grouping = shuffle
+		}
+
+		JmxStreamOne|JmxStreamTwo|JmxStreamThree -> printer {
+			grouping = shuffle
+		}
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_4.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_4.conf b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_4.conf
new file mode 100644
index 0000000..9e297ee
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_4.conf
@@ -0,0 +1,122 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing software
+# distributed under the License is distributed on an "AS IS" BASIS
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+{
+	config {
+		envContextConfig {
+			"env" : "storm"
+			"mode" : "local"
+			"topologyName" : "dsl-topology"
+			"parallelismConfig" : {
+				"kafkaMsgConsumer" : 1
+			}
+		}
+		alertExecutorConfigs {
+			defaultAlertExecutor  {
+				"parallelism" : 1
+				"partitioner" : "org.apache.eagle.policy.DefaultPolicyPartitioner"
+				"needValidation" : "true"
+			}
+		}
+		eagleProps {
+			"site" : "sandbox"
+			"dataSource": "HADOOP"
+		}
+	}
+	
+	dataflow {
+		KafkaSource.JmxStreamOne {
+			parallism = 1000
+			topic = "metric_event_1"
+			zkConnection = "localhost:2181"
+			zkConnectionTimeoutMS = 15000
+			consumerGroupId = "Consumer"
+			fetchSize = 1048586
+			transactionZKServers = "localhost"
+			transactionZKPort = 2181
+			transactionZKRoot = "/consumers"
+			transactionStateUpdateMS = 2000
+			deserializerClass = "org.apache.eagle.datastream.storm.JsonMessageDeserializer"
+		}
+
+		KafkaSource.JmxStreamTwo {
+			parallism = 1000
+			topic = "metric_event_2"
+			zkConnection = "localhost:2181"
+			zkConnectionTimeoutMS = 15000
+			consumerGroupId = "Consumer"
+			fetchSize = 1048586
+			transactionZKServers = "localhost"
+			transactionZKPort = 2181
+			transactionZKRoot = "/consumers"
+			transactionStateUpdateMS = 2000
+			deserializerClass = "org.apache.eagle.datastream.storm.JsonMessageDeserializer"
+		}
+
+		KafkaSource.JmxStreamThree{
+			parallism = 1000
+			topic = "metric_event_3"
+			zkConnection = "localhost:2181"
+			zkConnectionTimeoutMS = 15000
+			consumerGroupId = "Consumer"
+			fetchSize = 1048586
+			transactionZKServers = "localhost"
+			transactionZKPort = 2181
+			transactionZKRoot = "/consumers"
+			transactionStateUpdateMS = 2000
+			deserializerClass = "org.apache.eagle.datastream.storm.JsonMessageDeserializer"
+		}
+
+		Console.printer {
+			format = "%s"
+		}
+
+		KafkaSink.metricStore {
+			topic = "metric_event_persist"
+		}
+
+//		KafkaSink.aggSink {
+//			topic = "metric_agg_persist"
+//		}
+
+		Alert.defaultAlertExecutor {
+			// upStreamNames = [JmxStreamOne,JmxStreamTwo,JmxStreamThree]
+			// alertExecutorId = defaultAlertExecutor
+		}
+
+//		Aggregator.Aggregator{ sql = """
+//				@info("query")
+//				from JmxStreamOne[value > 100.0] select * insert into OutputStream;
+//			"""
+//		}
+//		JmxStreamOne -> Aggregator{
+//			grouping = shuffle
+//		}
+//		Aggregator -> aggregatedSink{
+//			grouping = shuffle
+//		}
+		JmxStreamOne|JmxStreamTwo|JmxStreamThree -> defaultAlertExecutor {
+			grouping = shuffle
+		}
+
+		JmxStreamOne|JmxStreamTwo|JmxStreamThree -> metricStore {
+			grouping = shuffle
+		}
+
+		JmxStreamOne|JmxStreamTwo|JmxStreamThree -> printer {
+			grouping = shuffle
+		}
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c1485aac/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/scala/org/apache/eagle/stream/pipeline/ConfigSpec.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/scala/org/apache/eagle/stream/pipeline/ConfigSpec.scala b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/scala/org/apache/eagle/stream/pipeline/ConfigSpec.scala
new file mode 100644
index 0000000..7b552da
--- /dev/null
+++ b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/scala/org/apache/eagle/stream/pipeline/ConfigSpec.scala
@@ -0,0 +1,37 @@
+package org.apache.eagle.stream.pipeline
+
+import com.typesafe.config.ConfigFactory
+import org.scalatest.{FlatSpec, Matchers}
+
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+class ConfigSpec extends FlatSpec with Matchers{
+  "Config" should "be overrode correctly" in {
+    val conf1 = ConfigFactory.parseString(
+      """
+        |value=1
+      """.stripMargin)
+    val conf2 = ConfigFactory.parseString(
+      """
+        |value=2
+      """.stripMargin)
+    val conf3 = conf1.withFallback(conf2)
+    val conf4 = conf2.withFallback(conf1)
+    conf3.getInt("value") should be(1)
+    conf4.getInt("value") should be(2)
+  }
+}