You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by yo...@apache.org on 2016/08/14 06:23:08 UTC

[08/13] incubator-eagle git commit: EAGLE-341 clean inner process alert engine code clean inner process alert engine code

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-app/eagle-stream-application-manager/src/test/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-stream-application-manager/src/test/resources/application.conf b/eagle-core/eagle-app/eagle-stream-application-manager/src/test/resources/application.conf
deleted file mode 100644
index 4c21a7c..0000000
--- a/eagle-core/eagle-app/eagle-stream-application-manager/src/test/resources/application.conf
+++ /dev/null
@@ -1,42 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-### scheduler propertise
-appCommandLoaderIntervalSecs = 1
-appHealthCheckIntervalSecs = 5
-
-### execution platform properties
-envContextConfig.env = "storm"
-envContextConfig.url = "http://sandbox.hortonworks.com:8744"
-envContextConfig.nimbusHost = "sandbox.hortonworks.com"
-envContextConfig.nimbusThriftPort = 6627
-envContextConfig.jarFile = "/dir-to-jar/eagle-topology-0.3.0-incubating-assembly.jar"
-
-### default topology properties
-eagleProps.mailHost = "mailHost.com"
-eagleProps.mailSmtpPort = "25"
-eagleProps.mailDebug = "true"
-eagleProps.eagleService.host = "localhost"
-eagleProps.eagleService.port = 9099
-eagleProps.eagleService.username = "admin"
-eagleProps.eagleService.password = "secret"
-eagleProps.dataJoinPollIntervalSec = 30
-
-dynamicConfigSource.enabled = true
-dynamicConfigSource.initDelayMillis = 0
-dynamicConfigSource.delayMillis = 30000
-
-

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-app/eagle-stream-application-manager/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-stream-application-manager/src/test/resources/log4j.properties b/eagle-core/eagle-app/eagle-stream-application-manager/src/test/resources/log4j.properties
deleted file mode 100644
index 25331ab..0000000
--- a/eagle-core/eagle-app/eagle-stream-application-manager/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,35 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-log4j.rootLogger=INFO, stdout
-
- eagle.log.dir=../logs
- eagle.log.file=eagle.log
-
-# standard output
-log4j.appender.stdout=org.apache.log4j.ConsoleAppender
-log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
-log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n
-
-# Daily Rolling File Appender
- log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender
- log4j.appender.DRFA.File=${eagle.log.dir}/${eagle.log.file}
- log4j.appender.DRFA.DatePattern=.yyyy-MM-dd
-## 30-day backup
-# log4j.appender.DRFA.MaxBackupIndex=30
- log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout
-
-# Pattern format: Date LogLevel LoggerName LogMessage
-log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-app/eagle-stream-application-manager/src/test/scala/org/apache/eagle/stream/application/scheduler/MockTopology.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-stream-application-manager/src/test/scala/org/apache/eagle/stream/application/scheduler/MockTopology.scala b/eagle-core/eagle-app/eagle-stream-application-manager/src/test/scala/org/apache/eagle/stream/application/scheduler/MockTopology.scala
deleted file mode 100644
index e87ee92..0000000
--- a/eagle-core/eagle-app/eagle-stream-application-manager/src/test/scala/org/apache/eagle/stream/application/scheduler/MockTopology.scala
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-
-package org.apache.eagle.stream.application.scheduler
-
-import com.typesafe.config.Config
-import org.apache.eagle.stream.application.TopologyExecutable
-import org.slf4j.LoggerFactory
-
-class MockTopology extends TopologyExecutable {
-  private val LOG = LoggerFactory.getLogger(classOf[MockTopology])
-  override def submit(topology: String, config: Config): Unit = {
-    LOG.info(s"$topology is running")
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-app/eagle-stream-application-manager/src/test/scala/org/apache/eagle/stream/application/scheduler/StormApplicationManagerSpec.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-stream-application-manager/src/test/scala/org/apache/eagle/stream/application/scheduler/StormApplicationManagerSpec.scala b/eagle-core/eagle-app/eagle-stream-application-manager/src/test/scala/org/apache/eagle/stream/application/scheduler/StormApplicationManagerSpec.scala
deleted file mode 100644
index 1cad3a7..0000000
--- a/eagle-core/eagle-app/eagle-stream-application-manager/src/test/scala/org/apache/eagle/stream/application/scheduler/StormApplicationManagerSpec.scala
+++ /dev/null
@@ -1,40 +0,0 @@
-package org.apache.eagle.stream.application.scheduler
-
-import com.typesafe.config.ConfigFactory
-import org.apache.eagle.common.config.EagleConfigConstants
-import org.apache.eagle.stream.application.ExecutionPlatform
-import org.apache.eagle.stream.application.impl.StormExecutionPlatform
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-
-
-object StormApplicationManagerSpec extends App {
-  val manager: ExecutionPlatform = new StormExecutionPlatform
-  val baseConfig = ConfigFactory.load()
-  val topoConfigStr: String = "webConfig{\"hbase.zookeeper.property.clientPort\":\"2181\", \"hbase.zookeeper.quorum\":\"localhost\"}\nappConfig{\n  \"envContextConfig\" : {\n    \"env\" : \"storm\",\n    \"mode\" : \"cluster\",\n    \"topologyName\" : \"sandbox-hbaseSecurityLog-topology\",\n    \"stormConfigFile\" : \"security-auditlog-storm.yaml\",\n    \"parallelismConfig\" : {\n      \"kafkaMsgConsumer\" : 1,\n      \"hbaseSecurityLogAlertExecutor*\" : 1\n    }\n  },\n  \"dataSourceConfig\": {\n    \"topic\" : \"sandbox_hbase_security_log\",\n    \"zkConnection\" : \"127.0.0.1:2181\",\n    \"zkConnectionTimeoutMS\" : 15000,\n    \"brokerZkPath\" : \"/brokers\",\n    \"fetchSize\" : 1048586,\n    \"deserializerClass\" : \"org.apache.eagle.security.hbase.parse.HbaseAuditLogKafkaDeserializer\",\n    \"transactionZKServers\" : \"127.0.0.1\",\n    \"transactionZKPort\" : 2181,\n    \"transactionZKRoot\" : \"/consumers\",\n    \"consumerGroupId\" : \"eagle.hbasesecurity.consumer\",\n  
   \"transactionStateUpdateMS\" : 2000\n  },\n  \"alertExecutorConfigs\" : {\n     \"hbaseSecurityLogAlertExecutor\" : {\n       \"parallelism\" : 1,\n       \"partitioner\" : \"org.apache.eagle.policy.DefaultPolicyPartitioner\"\n       \"needValidation\" : \"true\"\n     }\n  },\n  \"eagleProps\" : {\n    \"site\" : \"sandbox\",\n    \"application\": \"hbaseSecurityLog\",\n    \"dataJoinPollIntervalSec\" : 30,\n    \"mailHost\" : \"mailHost.com\",\n    \"mailSmtpPort\":\"25\",\n    \"mailDebug\" : \"true\",\n    \"eagleService\": {\n      \"host\": \"localhost\",\n      \"port\": 9099\n      \"username\": \"admin\",\n      \"password\": \"secret\"\n    }\n  },\n  \"dynamicConfigSource\" : {\n    \"enabled\" : true,\n    \"initDelayMillis\" : 0,\n    \"delayMillis\" : 30000\n  }\n}"
-
-  val topoConfig = ConfigFactory.parseString(topoConfigStr)
-  val conf = topoConfig.getConfig(EagleConfigConstants.APP_CONFIG).withFallback(baseConfig)
-
-  //val (ret, nextState) = manager.execute("START", topologyDescModel, null, conf)
-  //println(s"Result: ret=$ret, nextState=$nextState")
-}
-
-
-

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-app/eagle-stream-application-manager/src/test/scala/org/apache/eagle/stream/application/scheduler/TestScheduler.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-stream-application-manager/src/test/scala/org/apache/eagle/stream/application/scheduler/TestScheduler.scala b/eagle-core/eagle-app/eagle-stream-application-manager/src/test/scala/org/apache/eagle/stream/application/scheduler/TestScheduler.scala
deleted file mode 100644
index 3db2d67..0000000
--- a/eagle-core/eagle-app/eagle-stream-application-manager/src/test/scala/org/apache/eagle/stream/application/scheduler/TestScheduler.scala
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.stream.application.scheduler
-
-import akka.actor.{ActorSystem, Props}
-import akka.testkit.{TestActorRef, TestKit}
-import com.typesafe.config.ConfigFactory
-import org.scalatest.{Ignore, BeforeAndAfterAll, MustMatchers, WordSpecLike}
-
-@Ignore
-class TestSchedulerSpec extends TestKit(ActorSystem("stream-app-scheduler"))
-with WordSpecLike with MustMatchers with BeforeAndAfterAll {
-
-  "A Scheduler actor" must {
-    "Forward a message it receives" in {
-      val coordinator = TestActorRef[StreamAppCoordinator]
-      coordinator ! CommandLoaderEvent
-      expectNoMsg()
-    }
-  }
-
-  "A Integrated test" must {
-    "run end-to-end" in {
-      val coordinator = system.actorOf(Props[StreamAppCoordinator])
-      coordinator ! CommandLoaderEvent
-      expectNoMsg()
-    }
-  }
-
-  override def afterAll(): Unit = {
-    super.afterAll()
-    system.shutdown()
-  }
-}
-
-@Ignore
-object TestStreamAppScheduler extends App {
-  val conf: String = """
-                          akka.loglevel = "DEBUG"
-                          akka.actor.debug {
-                            receive = on
-                            lifecycle = on
-                          }
-                     """
-  new ApplicationScheduler().start(ConfigFactory.parseString(conf))
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-app/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/pom.xml b/eagle-core/eagle-app/pom.xml
index 6f3069c..5637b01 100644
--- a/eagle-core/eagle-app/pom.xml
+++ b/eagle-core/eagle-app/pom.xml
@@ -32,7 +32,6 @@
 
     <modules>
         <module>eagle-app-base</module>
-        <module>eagle-stream-application-manager</module>
         <module>eagle-application-service</module>
     </modules>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-pipeline/README.md
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/README.md b/eagle-core/eagle-data-process/eagle-stream-pipeline/README.md
deleted file mode 100644
index b8a0bdc..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-pipeline/README.md
+++ /dev/null
@@ -1,80 +0,0 @@
-<!--
-{% comment %}
-Licensed to the Apache Software Foundation (ASF) under one or more
-contributor license agreements.  See the NOTICE file distributed with
-this work for additional information regarding copyright ownership.
-The ASF licenses this file to you under the Apache License, Version 2.0
-(the "License"); you may not use this file except in compliance with
-the License.  You may obtain a copy of the License at
-
-http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-{% endcomment %}
--->
-
-Eagle Declarative Streaming DSL
-===============================
-
-DSL Format
-----------
-
-	{
-		config {
-		  config.key = configValue
-		}
-
-		schema {
-		  metricStreamSchema {
-		    metric: string
-		    value: double
-		    timestamp: long
-		  }
-		}
-
-		dataflow {
-		  kafkaSource.source1 {
-		    schema = "metricStreamSchema"
-		  }
-		  kafkaSource.source2 {
-		    schema = {
-		      metric: string
-		      value: double
-		      timestamp: long
-		    }
-		  }
-		}
-	}
-
-Usage
------
-
-	val pipeline = Pipeline.parseResource("pipeline.conf")
-	val stream = Pipeline.compile(pipeline)
-	stream.submit[storm]
-
-Features
---------
-* [x] Compile DSL Configure to Pipeline model
-* [x] Compile Pipeline model to Stream Execution Graph
-* [x] Submit Stream Execution Graph to actual running environment say storm
-* [x] Support Alert and Persistence for metric monitoring
-* [ ] Extensible stream module management and automatically scan and register module
-* [x] Pipeline runner CLI tool and shell script
-* [ ] Decouple pipeline compiler and scheduler into individual modules
-* [ ] Stream Pipeline Scheduler
-* [ ] Graph editor to define streaming graph in UI
-* [?] JSON/Config & Scala Case Class Mapping (https://github.com/scala/pickling)
-* [?] Raw message structure oriented programing is a little ugly, we should define a generic message/event consist of [payload:stream/timestamp/serializer/deserializer,data:message]
-* [ ] Provide stream schema inline and send to metadata when submitting
-* [ ] UI should support specify executorId when defining new stream
-* [ ] Lack of a entity named StreamEntity for the workflow of defining topology&policy end-to-end
-* [!] Fix configuration conflict, should pass through Config instead of ConfigFactory.load() manually
-* [ ] Override application configuration with pipeline configuration
-* [ ] Refactor schema registration structure and automatically submit stream schema when submitting pipeline
-* [ ] Submit alertStream, alertExecutorId mapping to AlertExecutorService when submitting pipeline
-* [x] Supports `inputs` field to define connector
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-pipeline/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/pom.xml b/eagle-core/eagle-data-process/eagle-stream-pipeline/pom.xml
deleted file mode 100644
index 18fb610..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-pipeline/pom.xml
+++ /dev/null
@@ -1,156 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one or more
-  ~ contributor license agreements.  See the NOTICE file distributed with
-  ~ this work for additional information regarding copyright ownership.
-  ~ The ASF licenses this file to You under the Apache License, Version 2.0
-  ~ (the "License"); you may not use this file except in compliance with
-  ~ the License.  You may obtain a copy of the License at
-  ~
-  ~    http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing, software
-  ~ distributed under the License is distributed on an "AS IS" BASIS,
-  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  ~ See the License for the specific language governing permissions and
-  ~ limitations under the License.
-  -->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-    <parent>
-        <artifactId>eagle-data-process-parent</artifactId>
-        <groupId>org.apache.eagle</groupId>
-        <version>0.5.0-incubating-SNAPSHOT</version>
-    </parent>
-    <modelVersion>4.0.0</modelVersion>
-    <artifactId>eagle-stream-pipeline</artifactId>
-    <dependencies>
-        <!--<dependency>-->
-            <!--<groupId>org.reflections</groupId>-->
-            <!--<artifactId>reflections</artifactId>-->
-        <!--</dependency>-->
-        <dependency>
-            <groupId>org.apache.eagle</groupId>
-            <artifactId>eagle-service-base</artifactId>
-            <version>${project.version}</version>
-            <exclusions>
-                <exclusion>
-                    <groupId>org.ow2.asm</groupId>
-                    <artifactId>asm-all</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>asm</groupId>
-                    <artifactId>asm</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>asm</groupId>
-                    <artifactId>asm-all</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>asm</groupId>
-                    <artifactId>asm-commons</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>asm</groupId>
-                    <artifactId>asm-tree</artifactId>
-                </exclusion>
-                <exclusion>
-                    <artifactId>servlet-api</artifactId>
-                    <groupId>javax.servlet</groupId>
-                </exclusion>
-                <exclusion>
-                    <artifactId>slf4j-log4j12</artifactId>
-                    <groupId>org.slf4j</groupId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.eagle</groupId>
-            <artifactId>eagle-storage-base</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-	<dependency>
-       		<groupId>log4j</groupId>
-		<artifactId>log4j</artifactId>
-      	</dependency>
-<!--        <dependency>
-            <groupId>org.slf4j</groupId>
-            <artifactId>slf4j-api</artifactId>
-            <scope>test</scope>
-        </dependency>
--->
-        <!--<dependency>-->
-            <!--<groupId>org.scala-lang</groupId>-->
-            <!--<artifactId>scala-reflect</artifactId>-->
-        <!--</dependency>-->
-        <!--<dependency>-->
-            <!--<groupId>org.scala-lang</groupId>-->
-            <!--<artifactId>scala-compiler</artifactId>-->
-            <!--<version>${scala.version}.0</version>-->
-        <!--</dependency>-->
-        <dependency>
-            <groupId>org.scalatest</groupId>
-            <artifactId>scalatest_${scala.version}</artifactId>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.eagle</groupId>
-            <artifactId>eagle-stream-process-api</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-        <!--<dependency>-->
-            <!--<groupId>com.typesafe.akka</groupId>-->
-            <!--<artifactId>akka-testkit_${scala.version}</artifactId>-->
-            <!--<version>${akka.actor.version}</version>-->
-            <!--<scope>test</scope>-->
-        <!--</dependency>-->
-    </dependencies>
-    <build>
-        <plugins>
-            <plugin>
-                <groupId>org.scala-tools</groupId>
-                <artifactId>maven-scala-plugin</artifactId>
-                <executions>
-                    <execution>
-                        <id>scala-compile-first</id>
-                        <phase>process-resources</phase>
-                        <goals>
-                            <goal>add-source</goal>
-                            <goal>compile</goal>
-                        </goals>
-                    </execution>
-                    <execution>
-                        <id>scala-test-compile</id>
-                        <phase>process-test-resources</phase>
-                        <goals>
-                            <goal>testCompile</goal>
-                        </goals>
-                    </execution>
-                </executions>
-            </plugin>
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-surefire-plugin</artifactId>
-                <configuration>
-                    <skipTests>true</skipTests>
-                </configuration>
-            </plugin>
-            <plugin>
-                <groupId>org.scalatest</groupId>
-                <artifactId>scalatest-maven-plugin</artifactId>
-                <configuration>
-                    <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
-                    <junitxml>.</junitxml>
-                    <filereports>TestSuite.txt</filereports>
-                </configuration>
-                <executions>
-                    <execution>
-                        <id>test</id>
-                        <goals>
-                            <goal>test</goal>
-                        </goals>
-                    </execution>
-                </executions>
-            </plugin>
-        </plugins>
-    </build>
-</project>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/Pipeline.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/Pipeline.scala b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/Pipeline.scala
deleted file mode 100644
index 65ab390..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/Pipeline.scala
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.stream.pipeline
-
-
-import org.apache.eagle.stream.pipeline.compiler.PipelineCompiler
-import org.apache.eagle.stream.pipeline.parser.PipelineParser
-import org.apache.eagle.stream.pipeline.runner.PipelineRunner
-
-object Pipeline
-  extends PipelineRunner
-  with PipelineParser
-  with PipelineCompiler
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/annotation/Extension.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/annotation/Extension.scala b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/annotation/Extension.scala
deleted file mode 100644
index 2ff81d4..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/annotation/Extension.scala
+++ /dev/null
@@ -1,21 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.stream.pipeline.annotation
-
-import scala.annotation.StaticAnnotation
-
-case class Extension(extType:String) extends StaticAnnotation
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/compiler/PipelineCompiler.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/compiler/PipelineCompiler.scala b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/compiler/PipelineCompiler.scala
deleted file mode 100644
index df8bbe5..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/compiler/PipelineCompiler.scala
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.stream.pipeline.compiler
-
-
-import org.apache.eagle.datastream.core._
-import org.apache.eagle.stream.pipeline.extension.ModuleManager._
-import org.apache.eagle.stream.pipeline.parser._
-import org.apache.eagle.stream.pipeline.utils.CompileException
-
-trait PipelineCompiler {
-  def compile(pipeline:Pipeline):StreamContext = {
-    val context = new StreamContext(pipeline.config)
-    val dataflow = pipeline.dataflow
-    val dag = new StreamDAG(context.dag)
-    dataflow.getProcessors.map(buildStreamProducer(dag,_)).foreach(producer =>{
-      producer.initWith(dag.graph,pipeline.config)
-      dag.addVertex(producer)
-    })
-    dataflow.getConnectors.foreach(connector =>{
-      val from = dag.getNodeByName(connector.from).get
-      val to = dag.getNodeByName(connector.to).get
-      dag.addEdge(from,to,buildStreamConnector(from,to,dataflow,connector))
-    })
-    context
-  }
-  private def  buildStreamProducer(dag:StreamDAG,processor:Processor):StreamProducer[Any] = {
-    if(findModuleType(processor.getType)){
-      getModuleMapperByType(processor.getType).map(processor).nameAs(processor.getId).stream(processor.streamId)
-    } else {
-      throw new CompileException(s"Unknown processor type [${processor.getType}]")
-    }
-  }
-  private def buildStreamConnector(from:StreamProducer[Any],to:StreamProducer[Any],dataflow:DataFlow,connector:Connector):StreamConnector[Any,Any]={
-    var groupByIndexes:Seq[Int] = connector.groupByIndexes.orNull
-    if(groupByIndexes!=null ){
-      if(connector.groupByFields.isDefined) throw new CompileException(s"Both ${Connector.GROUP_BY_FIELD_FIELD} and ${Connector.GROUP_BY_INDEX_FIELD} is defined at same time")
-    } else if(connector.groupByFields.isDefined){
-      groupByIndexes = connector.groupByFields.get.map(dataflow.getProcessor(from.name).get.getSchema.get.indexOfAttribute)
-    }
-    if(groupByIndexes == null){
-      ShuffleConnector(from,to)
-    } else {
-      GroupbyFieldsConnector(from,to,groupByIndexes)
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/extension/ModuleManager.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/extension/ModuleManager.scala b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/extension/ModuleManager.scala
deleted file mode 100644
index 2174560..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/extension/ModuleManager.scala
+++ /dev/null
@@ -1,186 +0,0 @@
-package org.apache.eagle.stream.pipeline.extension
-
-import java.util.Properties
-import java.util.concurrent.atomic.AtomicBoolean
-
-import com.typesafe.config.ConfigFactory
-import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSourcedSpoutProvider
-import org.apache.eagle.datastream.core._
-import org.apache.eagle.partition.PartitionStrategy
-import org.apache.eagle.stream.pipeline.parser.Processor
-import org.apache.kafka.clients.producer.{Callback, KafkaProducer, ProducerRecord, RecordMetadata}
-//import org.slf4j.LoggerFactory
-
-import scala.collection.JavaConverters._
-
-/**
-  * Licensed to the Apache Software Foundation (ASF) under one or more
-  * contributor license agreements.  See the NOTICE file distributed with
-  * this work for additional information regarding copyright ownership.
-  * The ASF licenses this file to You under the Apache License, Version 2.0
-  * (the "License"); you may not use this file except in compliance with
-  * the License.  You may obtain a copy of the License at
-  *
-  * http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-
-
-object ModuleManager{
-  def getModuleMapperByType(moduleType:String):ModuleMapper = {
-    classOfProcessorMapping(moduleType)
-  }
-
-  def findModuleType(moduleType:String):Boolean = classOfProcessorMapping.contains(moduleType)
-
-  val classOfProcessorMapping = Map[String,ModuleMapper](
-    "KafkaSource" -> KafkaSourceStreamProducer,
-    "KafkaSink" -> KafkaSinkStreamProducer,
-    "Alert" -> AlertStreamProducer,
-    "Persistence" -> PersistProducer,
-    "Aggregator" -> AggregatorProducer,
-    "Console" -> ConsoleStreamProducer
-  )
-}
-
-trait ModuleMapper{
-  def getType:String
-  def map(module:Processor):StreamProducer[Any]
-}
-object KafkaSourceStreamProducer extends ModuleMapper{
-  def getType = "KafkaSource"
-  override def map(module:Processor): StreamProducer[Any] = {
-    val config = module.getConfig
-    new StormSourceProducer[Any](new KafkaSourcedSpoutProvider(null).getSpout(ConfigFactory.parseMap(config.asJava)))
-  }
-}
-object KafkaSinkStreamProducer extends ModuleMapper{
-  def getType = "KafkaSink"
-  override def map(module:Processor): StreamProducer[Any] = {
-    val config = module.getConfig
-    ForeachProducer[AnyRef](KafkaSinkExecutor(config))
-  }
-}
-object ConsoleStreamProducer extends ModuleMapper{
-  override def getType: String = "Stdout"
-  override def map(module:Processor): StreamProducer[Any] = ForeachProducer[Any](m=>print(s"$m\n"))
-}
-object AlertStreamProducer extends ModuleMapper{
-  def getType:String = "Alert"
-  override def map(module:Processor): StreamProducer[Any] = {
-    val config = module.getConfig
-    val moduleId = module.getId
-    // Support create functional AlertStreamProducer constructor
-    new AlertStreamProducer (
-      upStreamNames = config.getOrElse("upStreamNames",if(module.inputIds!=null) module.inputIds.asJava else null).asInstanceOf[java.util.List[String]],
-      alertExecutorId = config.getOrElse("alertExecutorId",moduleId).asInstanceOf[String],
-      consume = config.getOrElse("consume",true).asInstanceOf[Boolean],
-      strategy = config.get("strategy") match {case Some(strategy)=> Class.forName(strategy.asInstanceOf[String]).newInstance().asInstanceOf[PartitionStrategy] case None => null}
-    )
-  }
-}
-
-object PersistProducer extends ModuleMapper{
-  override def getType = "Persistence"
-  override def map(module:Processor): StreamProducer[Any] = {
-    val config = module.getConfig
-    new PersistProducer(config.getOrElse("executorId",module.getId).asInstanceOf[String],StorageType.withName(config.getOrElse("storageType",null).asInstanceOf[String]))
-  }
-}
-
-object AggregatorProducer extends ModuleMapper{
-  override def getType: String = "Aggregator"
-  override def map(module:Processor): StreamProducer[Any] = {
-    val config = module.getConfig
-    new AggregateProducer(
-      upStreamNames = config.getOrElse("upStreamNames",if(module.inputIds!=null) module.inputIds.asJava else null).asInstanceOf[java.util.List[String]],
-      config.getOrElse("analyzer",module.getId).asInstanceOf[String],
-      config.get("sql") match {case Some(sql) => sql.asInstanceOf[String] case None => null },
-      config.get("strategy") match {case Some(strategy)=> Class.forName(strategy.asInstanceOf[String]).newInstance().asInstanceOf[PartitionStrategy] case None => null}
-    )
-  }
-}
-
-object KafkaSinkExecutor{
-//  val LOG = LoggerFactory.getLogger(classOf[KafkaSinkExecutor])
-}
-
-/**
-  * @todo currently support single topic now, should support topic selector
-  * @param config
-  */
-case class KafkaSinkExecutor(config:Map[String,AnyRef]) extends ((AnyRef) => Unit) with Serializable{
-
-  val TOPIC_KEY = "topic"
-  def getDefaultProps = {
-    val props = new Properties()
-    props.putAll(Map[String,AnyRef](
-      "bootstrap.servers" -> "localhost:6667",
-      "acks" -> "all",
-      "retries" -> "3",
-      "batch.size" -> "16384",
-      "linger.ms" -> "1",
-      "buffer.memory" -> "33554432",
-      "key.serializer" -> "org.apache.kafka.common.serialization.StringSerializer",
-      "value.serializer" -> classOf[org.apache.eagle.dataproc.impl.storm.kafka.JsonSerializer].getCanonicalName
-    ).asJava)
-    props
-  }
-
-  @transient var initialized:AtomicBoolean = new AtomicBoolean(false)
-  @transient var producer:KafkaProducer[String,AnyRef] = null
-  @transient var topic:String = null
-  @transient var timeoutMs:Long = 3000
-
-  private def init():Unit = {
-    if(this.initialized != null && this.initialized.get()){
-//      LOG.info("Already initialized, skip")
-      return
-    }
-    this.initialized = new AtomicBoolean(false)
-    if (producer != null) {
-//      LOG.info(s"Closing $producer")
-      producer.close()
-    }
-//    LOG.info("Initializing and creating Kafka Producer")
-    if (config.contains(TOPIC_KEY)) {
-      this.topic = config.get(TOPIC_KEY).get.asInstanceOf[String]
-    } else {
-      throw new IllegalStateException("topic is not defined")
-    }
-    val props = getDefaultProps
-    props.putAll((config - TOPIC_KEY).asJava)
-    producer = new KafkaProducer[String, AnyRef](props)
-//    LOG.info(s"Created new KafkaProducer: $producer")
-    initialized.set(true)
-  }
-
-  override def apply(value: AnyRef): Unit = {
-    if(initialized == null || !initialized.get()) init()
-    if(topic == null) throw new IllegalStateException("topic is not defined")
-    val isList = value.isInstanceOf[java.util.List[AnyRef]]
-    val record: ProducerRecord[String, AnyRef] = if(isList){
-      val list = value.asInstanceOf[java.util.List[AnyRef]]
-      if(list.size() == 1) {
-        new ProducerRecord[String, AnyRef](topic, value.asInstanceOf[java.util.List[AnyRef]].get(0))
-      }else{
-        new ProducerRecord[String, AnyRef](topic, value)
-      }
-    }else{
-      new ProducerRecord[String, AnyRef](topic,value)
-    }
-    producer.send(record,new Callback(){
-      override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = {
-        if(exception!=null){
-//          LOG.error(s"Failed to send record $value to topic: $topic",exception)
-          throw new IllegalStateException(s"Failed to send record $value to topic: $topic",exception)
-        }
-      }
-    })
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/parser/DataFlow.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/parser/DataFlow.scala b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/parser/DataFlow.scala
deleted file mode 100644
index 7e1f4cf..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/parser/DataFlow.scala
+++ /dev/null
@@ -1,235 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.stream.pipeline.parser
-
-import com.typesafe.config.Config
-import org.apache.eagle.stream.pipeline.utils.ParseException
-
-import scala.collection.JavaConversions._
-import scala.collection.JavaConverters._
-
-import scala.collection.mutable
-
-
-class DataFlow {
-  def getInputs(id: String):Seq[Processor] = {
-    this.getConnectors.filter(_.to.equals(id)).map(c => getProcessor(c.from).get)
-  }
-
-  /**
-    * Connect if not, do nothing if already connected
-    *
-    * @param from
-    * @param to
-    */
-  def connect(from: String, to: String): Unit = {
-    val connector = Connector(from,to,null)
-    var exists = false
-    connectors.foreach(c => exists = (c.from.equals(from) && c.to.equals(to)) || exists)
-    if(!exists) addConnector(connector)
-  }
-
-  private var processors = mutable.Map[String,Processor]()
-  private var connectors = mutable.Seq[Connector]()
-  def setProcessors(processors:Seq[Processor]):Unit = {
-    processors.foreach{module =>
-      this.processors.put(module.getId,module)
-    }
-  }
-  def setProcessors(processors:mutable.Map[String,Processor]):Unit = {
-    this.processors = processors
-  }
-  def setConnectors(connectors:Seq[Connector]):Unit = {
-    connectors.foreach(connector =>{
-      this.connectors :+= connector
-    })
-  }
-  def addProcessor(module:Processor):Unit = {
-    if(contains(module)) throw new IllegalArgumentException(s"Duplicated processor id error, ${module.getId} has already been defined as ${getProcessor(module.getId)}")
-    processors.put(module.getId,module)
-  }
-
-  def contains(module:Processor):Boolean = processors.contains(module.getId)
-  def addConnector(connector:Connector):Unit = {
-    connectors :+= connector
-  }
-  def getProcessors:Seq[Processor] = processors.values.toSeq
-  def getProcessor(processorId:String):Option[Processor] = processors.get(processorId)
-  def getConnectors:Seq[Connector] = connectors
-}
-
-/**
-  * Stream Processor
-  *
-  * @param processorId
-  * @param processorType
-  * @param schema
-  * @param processorConfig
-  */
-case class Processor(var processorId:String = null,var processorType:String = null,var schema:Schema = null, var processorConfig:Map[String,AnyRef] = null) extends Serializable {
-  private[pipeline] var inputs:Seq[Processor] = null
-  private[pipeline] var inputIds:Seq[String] = null
-
-  def getId:String = processorId
-  def getType:String = processorType
-  def getConfig:Map[String,AnyRef] = processorConfig
-  def getSchema:Option[Schema] = if(schema == null) None else Some(schema)
-
-  /**
-    * @todo assume processorId as streamId
-    * @return
-    */
-  def streamId = processorId
-}
-
-case class Connector (from:String,to:String, config:Map[String,AnyRef]) extends Serializable{
-  import Connector._
-
-  def group:Option[String] = config.get(GROUP_FIELD).asInstanceOf[Option[String]]
-  def groupByFields:Option[Seq[String]] = config.get(GROUP_BY_FIELD_FIELD) match {
-    case Some(obj) => Some(obj.asInstanceOf[java.util.List[String]].asScala.toSeq)
-    case None => None
-  }
-  def groupByIndexes:Option[Seq[Int]] = config.get(GROUP_BY_INDEX_FIELD) match {
-    case Some(obj) => Some(obj.asInstanceOf[java.util.List[java.lang.Integer]].asScala.toSeq.map(Int.unbox(_)))
-    case None => None
-  }
-}
-
-object Connector{
-  val GROUP_FIELD = "grouping"
-  val GROUP_BY_FIELD_FIELD = "groupByField"
-  val GROUP_BY_INDEX_FIELD = "groupByIndex"
-}
-
-private [pipeline]
-object Processor {
-  val SCHEMA_FIELD:String = "schema"
-  val INPUTS_FIELD = "inputs"
-  def parse(processorId:String,processorType:String,context:Map[String,AnyRef], schemaSet:SchemaSet):Processor = {
-    val schema = context.get(SCHEMA_FIELD) match {
-      case Some(schemaDef) => schemaDef match {
-        case schemaId:String => schemaSet.get(schemaId).getOrElse {
-          throw new ParseException(s"Schema [$schemaId] is not found but referred by [$processorType:$processorId] in $context")
-        }
-        case schemaMap:java.util.HashMap[String,AnyRef] => Schema.parse(schemaMap.toMap)
-        case _ => throw new ParseException(s"Illegal value for schema: $schemaDef")
-      }
-      case None => null
-    }
-    val instance = new Processor(processorId,processorType,schema,context-SCHEMA_FIELD)
-    if(context.contains(INPUTS_FIELD)) instance.inputIds = context.get(INPUTS_FIELD).get.asInstanceOf[java.util.List[String]].asScala.toSeq
-    instance
-  }
-}
-
-
-trait DataFlowParser {
-  def parse(config:Config,schemaSet:SchemaSet = SchemaSet.empty()):DataFlow = {
-    val dataw = new DataFlow()
-    val map = config.root().unwrapped().toMap
-
-    // Parse processors and connectors
-    map.foreach(entry => {
-      parseSingle(entry._1,entry._2.asInstanceOf[java.util.HashMap[String,AnyRef]].toMap,dataw,schemaSet)
-    })
-    expand(dataw)
-    validate(dataw)
-    dataw
-  }
-
-  private def expand(datafw: DataFlow):Unit = {
-    datafw.getProcessors.foreach(proc =>{
-      if(proc.inputIds!=null) {
-        proc.inputIds.foreach(id => {
-          // connect if not
-          datafw.connect(id,proc.getId)
-        })
-      }
-      proc.inputs = datafw.getInputs(proc.getId)
-      proc.inputIds = proc.inputs.map(_.getId)
-    })
-  }
-
-  private def
-  validate(pipeline:DataFlow): Unit ={
-    def checkModuleExists(id:String): Unit ={
-      pipeline.getProcessor(id).orElse {
-        throw new ParseException(s"Stream [$id] is not defined before being referred")
-      }
-    }
-
-    pipeline.getConnectors.foreach {connector =>
-      checkModuleExists(connector.from)
-      checkModuleExists(connector.to)
-    }
-  }
-
-  private def
-  parseSingle(identifier:String,config:Map[String,AnyRef],dataflow:DataFlow, schemaSet: SchemaSet):Unit = {
-    Identifier.parse(identifier) match {
-      case DefinitionIdentifier(processorType) => {
-        config foreach {entry =>
-          dataflow.addProcessor(Processor.parse(entry._1, processorType,entry._2.asInstanceOf[java.util.HashMap[String, AnyRef]].toMap,schemaSet))
-        }
-      }
-      case ConnectionIdentifier(fromIds,toId) => fromIds.foreach { fromId =>
-        if(fromId.eq(toId)) throw new ParseException(s"Can't connect $fromId to $toId")
-        dataflow.addConnector(Connector(fromId,toId,config))
-      }
-      case _ => ???
-    }
-  }
-}
-
-
-private[pipeline] trait Identifier
-
-private[pipeline] case class DefinitionIdentifier(moduleType: String) extends Identifier
-private[pipeline] case class ConnectionIdentifier(fromIds: Seq[String], toId: String) extends Identifier
-
-private[pipeline] object Identifier {
-  val ConnectorFlag = "->"
-  val UnitFlagSplitPattern = "\\|"
-  val UnitFlagChar = "|"
-  val ConnectorPattern = s"([\\w-|\\s]+)\\s+$ConnectorFlag\\s+([\\w-_]+)".r
-  def parse(identifier: String): Identifier = {
-    // ${id} -> ${id}
-    ConnectorPattern.findFirstMatchIn(identifier) match {
-      case Some(matcher) => {
-        if(matcher.groupCount != 2){
-          throw new ParseException(s"Illegal connector definition: $identifier")
-        }else{
-          val source = matcher.group(1)
-          val destination = matcher.group(2)
-          if(source.contains(UnitFlagChar)) {
-            val sources = source.split(UnitFlagSplitPattern).toSeq
-            ConnectionIdentifier(sources.map{_.trim()},destination)
-          }else{
-            ConnectionIdentifier(Seq(source),destination)
-          }
-        }
-      }
-      case None => {
-        if(identifier.contains(ConnectorFlag)) throw new ParseException(s"Failed to parse $identifier")
-        DefinitionIdentifier(identifier)
-      }
-    }
-  }
-}
-
-object DataFlow extends DataFlowParser
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/parser/Pipeline.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/parser/Pipeline.scala b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/parser/Pipeline.scala
deleted file mode 100644
index eb09156..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/parser/Pipeline.scala
+++ /dev/null
@@ -1,95 +0,0 @@
-package org.apache.eagle.stream.pipeline.parser
-
-import java.io.File
-
-import com.typesafe.config.{Config, ConfigFactory}
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-case class Pipeline(config:Config,dataflow:DataFlow)
-
-/**
- * Pipeline configuration parser
- *
- * For example:
- *
- * {{{
- * <code>
- * {
- *    config {
- *      execution.environment.config = someValue
- *    }
- *    schema {
- *      metricStreamSchema {
- *        metric: string
- *        value: double
- *        timestamp: long
- *      }
- *    }
- *    dataflow {
- *      kafkaSource.source1 {
- *        schema = "metricStreamSchema"
- *      }
- *      kafkaSource.source2 {
- *        schema = {
- *          metric: string
- *          value: double
- *          timestamp: long
- *        }
- *      }
- *    }
- * }
- * </code>
- * }}}
- */
-trait PipelineParser{
-  val CONFIG_FIELD = "config"
-  val SCHEMA_FIELD = "schema"
-  val DATAFLOW_FIELD = "dataflow"
-
-  def parse(config:Config):Pipeline = {
-    if(config.isEmpty) throw new IllegalArgumentException("Pipeline configuration is empty")
-    var pConfig:Config = ConfigFactory.empty()
-    var pSchemaSet:SchemaSet = SchemaSet.empty()
-    var pDataflow:DataFlow = null
-    if(config.hasPath(CONFIG_FIELD)) pConfig = config.getConfig(CONFIG_FIELD)
-    if(config.hasPath(SCHEMA_FIELD)) pSchemaSet = SchemaSet.parse(config.getConfig(SCHEMA_FIELD))
-    if(config.hasPath(DATAFLOW_FIELD)) pDataflow = DataFlow.parse(config.getConfig(DATAFLOW_FIELD),pSchemaSet)
-
-    // Merge pipeline config over base config
-    val baseConfig =ConfigFactory.load()
-    pConfig = if(pConfig!=null) pConfig.withFallback(baseConfig) else baseConfig
-    new Pipeline(pConfig,pDataflow)
-  }
-
-  def parseString(config:String):Pipeline = parse(ConfigFactory.parseString(config))
-
-  def parseStringWithConfig(dataFlow:String, config: Config) = {
-    val pConfig = config.withFallback(ConfigFactory.parseString(dataFlow))
-    parse(pConfig)
-  }
-
-  def parseResource(resource:String):Pipeline = {
-    // TODO: Load environment, currently hard-code with storm
-    if(resource.startsWith("/") || resource.startsWith("./")){
-      parse(ConfigFactory.parseFile(new File(resource)))
-    } else{
-      parse(ConfigFactory.parseResourcesAnySyntax(getClass.getClassLoader,resource))
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/parser/Schema.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/parser/Schema.scala b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/parser/Schema.scala
deleted file mode 100644
index 7653f9e..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/parser/Schema.scala
+++ /dev/null
@@ -1,152 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.stream.pipeline.parser
-
-import com.typesafe.config.Config
-
-import scala.collection.JavaConversions.mapAsScalaMap
-import scala.collection.mutable
-
-class Field(name:String) extends Serializable{
-  def getName:String = name
-}
-
-case class StringField(name:String) extends Field(name)
-case class LongField(name:String) extends Field(name)
-case class IntegerField(name:String) extends Field(name)
-case class BooleanField(name:String) extends Field(name)
-case class FloatField(name:String) extends Field(name)
-case class DoubleField(name:String) extends Field(name)
-case class DatetimeField(name:String,format:String) extends Field(name)
-
-object Field{
-  def string(name:String) = StringField(name)
-  def long(name:String) = LongField(name)
-  def integer(name:String) = IntegerField(name)
-  def boolean(name:String) = BooleanField(name)
-  def float(name:String) = FloatField(name)
-  def double(name:String) = DoubleField(name)
-  def datetime(name:String)(format:String) = DatetimeField(name,format)
-
-  def apply(name:String,typeName:String):Field = typeName match {
-    case "string" => string(name)
-    case "long" => long(name)
-    case "integer" => integer(name)
-    case "boolean" => boolean(name)
-    case "float" => float(name)
-    case "double" => double(name)
-    case _ => throw new UnsupportedOperationException(s"""Unknown attribute type $typeName for attribute "$name"""")
-  }
-}
-
-case class Schema(attributes:Seq[Field]) extends Serializable{
-  def getAttribute(attributeName:String):Option[Field]={
-    if(attributes != null){
-      attributes.find(_.getName.eq(attributeName))
-    }else None
-  }
-
-  def indexOfAttribute(attributeName:String):Int = {
-    if(attributes != null){
-      attributes.indexWhere(_.getName.eq(attributeName))
-    } else -1
-  }
-
-  @throws[IllegalArgumentException]
-  def indexOfAttributeOrException(attributeName:String):Int = {
-    if(attributes != null){
-      attributes.indexWhere(_.getName.eq(attributeName))
-    } else throw new IllegalArgumentException(s"Attribute [$attributeName] is not found in stream $this")
-  }
-}
-
-object Schema{
-  def parse(map:Map[String,AnyRef]):Schema = {
-    new Schema(map.keys.map {attributeName =>
-      map(attributeName) match{
-        case simpleType:String => Field(attributeName,simpleType)
-        case complexType:java.util.Map[String,AnyRef] => throw new IllegalStateException(s"ComplexType attribute definition is not supported yet [$attributeName : $complexType] ")
-        case otherType@_ => throw new IllegalStateException(s"Illegal attribute definition $attributeName : $otherType")
-      }
-    }.toSeq)
-  }
-
-  /**
-   * @param attributes support string, symbol, Attribute and so on.
-   * @return
-   */
-  def build(attributes:Seq[AnyRef]):Schema = {
-    new Schema(attributes.map{ a:AnyRef =>
-      a match {
-        case t:(String, AnyRef) => {
-          t._2 match {
-            case v:String => Field(t._1,v)
-            case v:Symbol => Field(t._1,v.name)
-            case _ => throw new UnsupportedOperationException(s"Illegal attribute definition $a")
-          }
-        }
-        case t:Field => t
-        case _ => throw new UnsupportedOperationException(s"Illegal attribute definition $a")
-      }
-    })
-  }
-}
-
-private[pipeline] class StreamUndefinedException(message:String = "stream is not defined",throwable: Throwable = null) extends Exception(message,throwable)
-
-private[pipeline] class SchemaSet {
-  private val processorSchemaCache = mutable.Map[String,Schema]()
-  def set(schemaId:String,schema:Schema):Unit = {
-    if(processorSchemaCache.contains(schemaId)) throw new IllegalArgumentException(
-      s"""
-         |Failed to define schema for $schemaId as $schema,
-         |because it has been defined as ${processorSchemaCache(schemaId)},
-         |please call updateSchema(processorId,schema) instead
-       """)
-    processorSchemaCache.put(schemaId,schema)
-  }
-  def get(schemaId:String):Option[Schema] = processorSchemaCache.get(schemaId)
-}
-
-private[pipeline] object SchemaSet{
-  def empty() = new SchemaSet()
-  /**
-   * For example:
-   *
-   * <code>
-   *    {
-   *      metricStream {
-   *        metric: string
-   *        value: double
-   *        timestamp: long
-   *      }
-   *    }
-   * </code>
-   * @param schemaConfig
-   * @return
-   */
-  def parse(schemaConfig:Map[String,AnyRef]):SchemaSet = {
-    val schemas = new SchemaSet()
-    schemaConfig.foreach(entry =>{
-      schemas.set(entry._1,Schema.parse(entry._2.asInstanceOf[java.util.HashMap[String,AnyRef]].toMap))
-    })
-    schemas
-  }
-
-  def parse(config:Config):SchemaSet = parse(config.root().unwrapped().asInstanceOf[java.util.HashMap[String,AnyRef]].toMap)
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/runner/PipelineRunner.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/runner/PipelineRunner.scala b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/runner/PipelineRunner.scala
deleted file mode 100644
index 1c964e1..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/runner/PipelineRunner.scala
+++ /dev/null
@@ -1,115 +0,0 @@
-/**
-  * Licensed to the Apache Software Foundation (ASF) under one or more
-  * contributor license agreements.  See the NOTICE file distributed with
-  * this work for additional information regarding copyright ownership.
-  * The ASF licenses this file to You under the Apache License, Version 2.0
-  * (the "License"); you may not use this file except in compliance with
-  * the License.  You may obtain a copy of the License at
-  *
-  * http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-package org.apache.eagle.stream.pipeline.runner
-
-import java.util
-
-import com.typesafe.config.Config
-import org.apache.commons.cli.{CommandLine, Options}
-import org.apache.eagle.dataproc.util.ConfigOptionParser
-import org.apache.eagle.datastream.ExecutionEnvironments.storm
-import org.apache.eagle.datastream.core.ExecutionEnvironment
-import org.apache.eagle.stream.pipeline.compiler.PipelineCompiler
-import org.apache.eagle.stream.pipeline.parser.PipelineParser
-import org.slf4j.LoggerFactory
-
-import scala.reflect.runtime.{universe => ru}
-
-trait PipelineRunner extends PipelineParser with PipelineCompiler{
-  import PipelineCLIOptionParser._
-  private val LOG = LoggerFactory.getLogger("PipelineCLIOptionParser")
-  def submit[T <: ExecutionEnvironment](resource:String)(implicit typeTag:ru.TypeTag[T]) =
-    compile(parseResource(resource)).submit[T]
-  def submit(resource:String,clazz:Class[ExecutionEnvironment]) =
-    compile(parseResource(resource)).submit(clazz)
-  def submit(pipelineConfig:Config,clazz:Class[ExecutionEnvironment]) =
-    compile(parse(pipelineConfig)).submit(clazz)
-  def submit[T <: ExecutionEnvironment](pipelineConfig:Config)(implicit typeTag: ru.TypeTag[T]) =
-    compile(parse(pipelineConfig)).submit[T]
-
-  def apply(args:Array[String]):PipelineRunner = {
-    new ConfigOptionParser().load(args)
-    this
-  }
-
-  def main(args: Array[String]): Unit = {
-    val config = PipelineCLIOptionParser.load(args)
-    if(config.hasPath(PIPELINE_CONFIG_KEY)) {
-      submit[storm](config.getString(PIPELINE_CONFIG_KEY))
-    } else {
-      sys.error(
-        s"""
-           |Error: --$PIPELINE_OPT_KEY is required
-           |$USAGE
-         """.stripMargin)
-    }
-  }
-}
-
-private[runner] object PipelineCLIOptionParser extends ConfigOptionParser{
-  val LOG = LoggerFactory.getLogger("PipelineCLIOptionParser")
-  val PIPELINE_OPT_KEY="pipeline"
-
-  val PIPELINE_CONFIG_KEY="pipeline.config"
-
-  val CONFIG_OPT_KEY="conf"
-  val CONFIG_RESOURCE_KEY="config.resource"
-  val CONFIG_FILE_KEY="config.file"
-  val USAGE =
-    """
-      |Usage: java org.apache.eagle.stream.pipeline.Pipeline [options]
-      |
-      |Options:
-      |   --pipeline   pipeline configuration
-      |   --conf       common configuration
-      |   --env        storm (support spark, etc later)
-      |   --mode       local/remote/cluster
-    """.stripMargin
-  
-  override protected def options(): Options = {
-    val options = super.options()
-    options.addOption(PIPELINE_OPT_KEY, true, "Pipeline configuration file")
-    options.addOption(CONFIG_OPT_KEY, true, "Config properties file")
-    options
-  }
-
-  override protected def parseCommand(cmd: CommandLine): util.Map[String, String] = {
-    val map = super.parseCommand(cmd)
-
-    if (cmd.hasOption(PIPELINE_OPT_KEY)) {
-      val pipelineConf = cmd.getOptionValue(PIPELINE_OPT_KEY)
-      if(pipelineConf == null){
-        throw new IllegalArgumentException(s"--$PIPELINE_OPT_KEY should not be null")
-      } else {
-        LOG.info(s"Set $PIPELINE_CONFIG_KEY as $pipelineConf")
-        map.put(PIPELINE_CONFIG_KEY, pipelineConf)
-      }
-    }
-
-    if(cmd.hasOption(CONFIG_OPT_KEY)){
-      val commonConf = cmd.getOptionValue(CONFIG_OPT_KEY)
-      if(commonConf.contains("/")){
-        LOG.info(s"Set $CONFIG_FILE_KEY as $commonConf")
-        map.put(CONFIG_FILE_KEY, commonConf)
-      }else {
-        LOG.info(s"Set $CONFIG_RESOURCE_KEY $commonConf")
-        map.put(CONFIG_RESOURCE_KEY, commonConf)
-      }
-    }
-    map
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/utils/Exceptions.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/utils/Exceptions.scala b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/utils/Exceptions.scala
deleted file mode 100644
index 1102a33..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/utils/Exceptions.scala
+++ /dev/null
@@ -1,20 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.stream.pipeline.utils
-
-class ParseException(message:String) extends Exception(message)
-class CompileException(message:String) extends Exception(message)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/application.conf b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/application.conf
deleted file mode 100644
index 3e8f69c..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/application.conf
+++ /dev/null
@@ -1,34 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing software
-# distributed under the License is distributed on an "AS IS" BASIS
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-{
-	"eagleProps" : {
-		"dataJoinPollIntervalSec" : 30
-		"mailHost" : "smtp.server.host"
-		"mailSmtpPort":"25"
-		"mailDebug" : "true"
-		"eagleService": {
-			"host": "localhost"
-			"port": 9099
-			"username": "admin"
-			"password": "secret"
-		}
-	}
-	"dynamicConfigSource" : {
-		"enabled" : true
-		"initDelayMillis" : 0
-		"delayMillis" : 30000
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/eagle-pipeline.sh
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/eagle-pipeline.sh b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/eagle-pipeline.sh
deleted file mode 100644
index 4250681..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/eagle-pipeline.sh
+++ /dev/null
@@ -1,20 +0,0 @@
-#!/usr/bin/env bash
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing software
-# distributed under the License is distributed on an "AS IS" BASIS
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-# ./eagle-pipeline.sh --pipeline [pipeline-definition-config] --config [base-configuration]
-
-
-

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/log4j.properties b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/log4j.properties
deleted file mode 100644
index c8a4f46..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,19 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-log4j.rootLogger=INFO, stdout
-log4j.appender.stdout=org.apache.log4j.ConsoleAppender
-log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
-log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_1.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_1.conf b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_1.conf
deleted file mode 100644
index 6dddf7a..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_1.conf
+++ /dev/null
@@ -1,131 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing software
-# distributed under the License is distributed on an "AS IS" BASIS
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-{
-	config {
-		alertExecutorConfigs {
-			defaultAlertExecutor  {
-				"parallelism" : 1
-				"partitioner" : "org.apache.eagle.policy.DefaultPolicyPartitioner"
-				"needValidation" : "true"
-			}
-		}
-		eagleProps  {
-			"site" : "sandbox"
-			"application": "eventSource"
-			"dataJoinPollIntervalSec" : 30
-			"mailHost" : "mail.host.com"
-			"mailSmtpPort":"25"
-			"mailDebug" : "true"
-			"eagleService": {
-				"host": "localhost"
-				"port": 38080
-				"username": "admin"
-				"password": "secret"
-			}
-		}
-		dynamicConfigSource  {
-			"enabled" : true
-			"initDelayMillis" : 0
-			"delayMillis" : 30000
-		}
-	}
-
-	schema {
-		metricStreamSchema {
-			metric: string
-			value: double
-			timestamp: long
-		}
-	}
-
-	dataflow {
-		KafkaSource.metricStream_1 {
-			parallism = 1000
-			topic = "metric_event_1"
-			zkConnection = "localhost:2181"
-			zkConnectionTimeoutMS = 15000
-			consumerGroupId = "Consumer"
-			fetchSize = 1048586
-			transactionZKServers = "localhost"
-			transactionZKPort = 2181
-			transactionZKRoot = "/consumers"
-			transactionStateUpdateMS = 2000
-			deserializerClass = "org.apache.eagle.datastream.storm.JsonMessageDeserializer"
-			schema = "metricStreamSchema"
-		}
-
-		KafkaSource.metricStream_2 {
-			parallism = 1000
-			topic = "metric_event_2"
-			zkConnection = "localhost:2181"
-			zkConnectionTimeoutMS = 15000
-			consumerGroupId = "Consumer"
-			fetchSize = 1048586
-			transactionZKServers = "localhost"
-			transactionZKPort = 2181
-			transactionZKRoot = "/consumers"
-			transactionStateUpdateMS = 2000
-			deserializerClass = "org.apache.eagle.datastream.storm.JsonMessageDeserializer"
-		}
-
-		KafkaSource.metricStream_3{
-			parallism = 1000
-			topic = "metric_event_3"
-			zkConnection = "localhost:2181"
-			zkConnectionTimeoutMS = 15000
-			consumerGroupId = "Consumer"
-			fetchSize = 1048586
-			transactionZKServers = "localhost"
-			transactionZKPort = 2181
-			transactionZKRoot = "/consumers"
-			transactionStateUpdateMS = 2000
-			deserializerClass = "org.apache.eagle.datastream.storm.JsonMessageDeserializer"
-			schema = "metricStreamSchema"
-		}
-
-		KafkaSink.metricStore {
-			schema = "metricStreamSchema"
-			parallism = 1000
-			topic = "metric_event_2"
-			zkConnection = "localhost:2181"
-			zkConnectionTimeoutMS = 15000
-			consumerGroupId = "Consumer"
-			fetchSize = 1048586
-			transactionZKServers = "localhost"
-			transactionZKPort = 2181
-			transactionZKRoot = "/consumers"
-			transactionStateUpdateMS = 2000
-			deserializerClass = "org.apache.eagle.datastream.storm.JsonMessageDeserializer"
-		}
-
-		Alert.alert {
-//			upStreamNames = [metricStream_1,metricStream_2]
-			alertExecutorId = defaultAlertExecutor
-		}
-
-//		aggregator.aggreator {
-//			executor = "aggreationExecutor"
-//		}
-
-		metricStream_1|metricStream_2 -> alert {
-			group = shuffle
-		}
-
-		metricStream_1|metricStream_2 -> metricStore {
-			group = shuffle
-		}
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_2.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_2.conf b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_2.conf
deleted file mode 100644
index 5e3561a..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_2.conf
+++ /dev/null
@@ -1,93 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing software
-# distributed under the License is distributed on an "AS IS" BASIS
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-{
-	config {
-		envContextConfig {
-			"env" : "storm"
-			"mode" : "local"
-			"topologyName" : "dsl-based-topology"
-		}
-		eagleProps  {
-			"site" : "sandbox"
-			"application": "eventSource"
-			"dataJoinPollIntervalSec" : 30
-			"mailHost" : "mail.host.com"
-			"mailSmtpPort":"25"
-			"mailDebug" : "true"
-			"eagleService": {
-				"host": "localhost"
-				"port": 38080
-				"username": "admin"
-				"password": "secret"
-			}
-		}
-		dynamicConfigSource  {
-			"enabled" : true
-			"initDelayMillis" : 0
-			"delayMillis" : 30000
-		}
-	}
-
-	dataflow {
-		KafkaSource.metricStream_1 {
-			parallism = 1000
-			topic = "metric_event_1"
-			zkConnection = "localhost:2181"
-			zkConnectionTimeoutMS = 15000
-			consumerGroupId = "Consumer"
-			fetchSize = 1048586
-			transactionZKServers = "localhost"
-			transactionZKPort = 2181
-			transactionZKRoot = "/consumers"
-			transactionStateUpdateMS = 2000
-			deserializerClass = "org.apache.eagle.datastream.storm.JsonMessageDeserializer"
-		}
-
-		KafkaSource.metricStream_2 {
-			parallism = 1000
-			topic = "metric_event_2"
-			zkConnection = "localhost:2181"
-			zkConnectionTimeoutMS = 15000
-			consumerGroupId = "Consumer"
-			fetchSize = 1048586
-			transactionZKServers = "localhost"
-			transactionZKPort = 2181
-			transactionZKRoot = "/consumers"
-			transactionStateUpdateMS = 2000
-			deserializerClass = "org.apache.eagle.datastream.storm.JsonMessageDeserializer"
-		}
-
-		KafkaSource.metricStream_3{
-			parallism = 1000
-			topic = "metric_event_3"
-			zkConnection = "localhost:2181"
-			zkConnectionTimeoutMS = 15000
-			consumerGroupId = "Consumer"
-			fetchSize = 1048586
-			transactionZKServers = "localhost"
-			transactionZKPort = 2181
-			transactionZKRoot = "/consumers"
-			transactionStateUpdateMS = 2000
-			deserializerClass = "org.apache.eagle.datastream.storm.JsonMessageDeserializer"
-		}
-
-		Console.printer {}
-
-		metricStream_1|metricStream_2|metricStream_3 -> printer {
-			grouping = shuffle
-		}
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_3.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_3.conf b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_3.conf
deleted file mode 100644
index 9dc7ce3..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_3.conf
+++ /dev/null
@@ -1,152 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing software
-# distributed under the License is distributed on an "AS IS" BASIS
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-{
-	config {
-		envContextConfig {
-			"env" : "storm"
-			"mode" : "local"
-			"topologyName" : "dsl-based-topology"
-			"parallelismConfig" : {
-				"kafkaMsgConsumer" : 1
-			}
-		}
-		alertExecutorConfigs {
-			defaultAlertExecutor  {
-				"parallelism" : 1
-				"partitioner" : "org.apache.eagle.policy.DefaultPolicyPartitioner"
-				"needValidation" : "true"
-			}
-		}
-		eagleProps  {
-			"site" : "sandbox"
-			"application": "HADOOP"
-			"dataJoinPollIntervalSec" : 30
-			"mailHost" : "some.mail.server"
-			"mailSmtpPort":"25"
-			"mailDebug" : "true"
-			"eagleService": {
-				"host": "localhost"
-				"port": 38080
-				"username": "admin"
-				"password": "secret"
-			}
-		}
-		dynamicConfigSource  {
-			"enabled" : true
-			"initDelayMillis" : 0
-			"delayMillis" : 30000
-		}
-	}
-
-	schema {
-//		JmxStreamOne {
-//			attributes {
-//				metric: string
-//				value: double
-//				timestamp: long
-//			}
-//			alertExecutorId = [defaultAlertExecutor,anotherAlertExecutor]
-//		}
-		JmxStreamOne {
-			metric: string
-			value: double
-			timestamp: long
-		}
-	}
-
-	dataflow {
-		KafkaSource.JmxStreamOne {
-			parallism = 1000
-			topic = "metric_event_1"
-			zkConnection = "localhost:2181"
-			zkConnectionTimeoutMS = 15000
-			consumerGroupId = "Consumer"
-			fetchSize = 1048586
-			transactionZKServers = "localhost"
-			transactionZKPort = 2181
-			transactionZKRoot = "/consumers"
-			transactionStateUpdateMS = 2000
-			deserializerClass = "org.apache.eagle.datastream.storm.JsonMessageDeserializer"
-		}
-
-		KafkaSource.JmxStreamTwo {
-			parallism = 1000
-			topic = "metric_event_2"
-			zkConnection = "localhost:2181"
-			zkConnectionTimeoutMS = 15000
-			consumerGroupId = "Consumer"
-			fetchSize = 1048586
-			transactionZKServers = "localhost"
-			transactionZKPort = 2181
-			transactionZKRoot = "/consumers"
-			transactionStateUpdateMS = 2000
-			deserializerClass = "org.apache.eagle.datastream.storm.JsonMessageDeserializer"
-		}
-
-		KafkaSource.JmxStreamThree{
-			parallism = 1000
-			topic = "metric_event_3"
-			zkConnection = "localhost:2181"
-			zkConnectionTimeoutMS = 15000
-			consumerGroupId = "Consumer"
-			fetchSize = 1048586
-			transactionZKServers = "localhost"
-			transactionZKPort = 2181
-			transactionZKRoot = "/consumers"
-			transactionStateUpdateMS = 2000
-			deserializerClass = "org.apache.eagle.datastream.storm.JsonMessageDeserializer"
-		}
-
-		Console.printer {
-			format = "%s"
-		}
-
-		KafkaSink.metricStore {
-			topic = "metric_event_persist"
-		}
-
-//		KafkaSink.alertStore {
-//			"topic" = "alert_persist"
-//			"bootstrap.servers" = "localhost:6667"
-//		}
-
-		Alert.alert {
-			inputs = [JmxStreamOne,JmxStreamTwo,JmxStreamThree]
-
-			upStreamNames = [JmxStreamOne,JmxStreamTwo,JmxStreamThree]
-			alertExecutorId = defaultAlertExecutor
-		}
-
-//		Aggregator.aggreator {
-//			upStreamNames = []
-//			analyzerId = ""
-//			cepQl = ""
-//			strategy = ""
-//		}
-
-		JmxStreamOne|JmxStreamTwo|JmxStreamThree -> alert {
-			grouping = shuffle
-		}
-
-		JmxStreamOne|JmxStreamTwo|JmxStreamThree -> metricStore {
-			grouping = shuffle
-		}
-
-		JmxStreamOne|JmxStreamTwo|JmxStreamThree -> printer {
-			grouping = shuffle
-		}
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_4.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_4.conf b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_4.conf
deleted file mode 100644
index 9c35456..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_4.conf
+++ /dev/null
@@ -1,125 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing software
-# distributed under the License is distributed on an "AS IS" BASIS
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-{
-	config {
-		envContextConfig {
-			"env" : "storm"
-			"mode" : "local"
-			"topologyName" : "dsl-topology"
-			"parallelismConfig" : {
-				"kafkaMsgConsumer" : 1
-			},
-		}
-		alertExecutorConfigs {
-			defaultAlertExecutor  {
-				"parallelism" : 1
-				"partitioner" : "org.apache.eagle.policy.DefaultPolicyPartitioner"
-				"needValidation" : "true"
-			}
-		}
-		eagleProps {
-			"site" : "sandbox"
-			"application": "HADOOP"
-		}
-	}
-	
-	dataflow {
-		KafkaSource.JmxStreamOne {
-			parallism = 1000
-			topic = "metric_event_1"
-			zkConnection = "localhost:2181"
-			zkConnectionTimeoutMS = 15000
-			consumerGroupId = "Consumer"
-			fetchSize = 1048586
-			transactionZKServers = "localhost"
-			transactionZKPort = 2181
-			transactionZKRoot = "/consumers"
-			transactionStateUpdateMS = 2000
-			deserializerClass = "org.apache.eagle.datastream.storm.JsonMessageDeserializer"
-		}
-
-		KafkaSource.JmxStreamTwo {
-			parallism = 1000
-			topic = "metric_event_2"
-			zkConnection = "localhost:2181"
-			zkConnectionTimeoutMS = 15000
-			consumerGroupId = "Consumer"
-			fetchSize = 1048586
-			transactionZKServers = "localhost"
-			transactionZKPort = 2181
-			transactionZKRoot = "/consumers"
-			transactionStateUpdateMS = 2000
-			deserializerClass = "org.apache.eagle.datastream.storm.JsonMessageDeserializer"
-		}
-
-		KafkaSource.JmxStreamThree{
-			parallism = 1000
-			topic = "metric_event_3"
-			zkConnection = "localhost:2181"
-			zkConnectionTimeoutMS = 15000
-			consumerGroupId = "Consumer"
-			fetchSize = 1048586
-			transactionZKServers = "localhost"
-			transactionZKPort = 2181
-			transactionZKRoot = "/consumers"
-			transactionStateUpdateMS = 2000
-			deserializerClass = "org.apache.eagle.datastream.storm.JsonMessageDeserializer"
-		}
-
-		Console.printer {
-			format = "%s"
-		}
-
-		KafkaSink.metricStore {
-			topic = "metric_event_persist"
-		}
-
-//		KafkaSink.aggSink {
-//			topic = "metric_agg_persist"
-//		}
-
-		Alert.defaultAlertExecutor {
-			// upStreamNames = [JmxStreamOne,JmxStreamTwo,JmxStreamThree]
-			// alertExecutorId = defaultAlertExecutor
-		}
-
-		Aggregator.Aggregator{ sql = """
-				define stream JmxStreamOne(eagleAlertContext object, timestamp long, metric string, value double);
-				@info(name = "query")
-				from JmxStreamOne[value > 100.0] select * insert into outputStream;
-		"""}
-
-
-		JmxStreamOne -> Aggregator {}
-
-		Aggregator -> printer {}
-
-//		Aggregator -> aggregatedSink{
-//			grouping = shuffle
-//		}
-		JmxStreamOne|JmxStreamTwo|JmxStreamThree -> defaultAlertExecutor {
-			grouping = shuffle
-		}
-
-		JmxStreamOne|JmxStreamTwo|JmxStreamThree -> metricStore {
-			grouping = shuffle
-		}
-
-		JmxStreamOne|JmxStreamTwo|JmxStreamThree -> printer {
-			grouping = shuffle
-		}
-	}
-}
\ No newline at end of file