You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by yo...@apache.org on 2016/08/14 06:23:08 UTC
[08/13] incubator-eagle git commit: EAGLE-341 clean inner process
alert engine code clean inner process alert engine code
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-app/eagle-stream-application-manager/src/test/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-stream-application-manager/src/test/resources/application.conf b/eagle-core/eagle-app/eagle-stream-application-manager/src/test/resources/application.conf
deleted file mode 100644
index 4c21a7c..0000000
--- a/eagle-core/eagle-app/eagle-stream-application-manager/src/test/resources/application.conf
+++ /dev/null
@@ -1,42 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-### scheduler propertise
-appCommandLoaderIntervalSecs = 1
-appHealthCheckIntervalSecs = 5
-
-### execution platform properties
-envContextConfig.env = "storm"
-envContextConfig.url = "http://sandbox.hortonworks.com:8744"
-envContextConfig.nimbusHost = "sandbox.hortonworks.com"
-envContextConfig.nimbusThriftPort = 6627
-envContextConfig.jarFile = "/dir-to-jar/eagle-topology-0.3.0-incubating-assembly.jar"
-
-### default topology properties
-eagleProps.mailHost = "mailHost.com"
-eagleProps.mailSmtpPort = "25"
-eagleProps.mailDebug = "true"
-eagleProps.eagleService.host = "localhost"
-eagleProps.eagleService.port = 9099
-eagleProps.eagleService.username = "admin"
-eagleProps.eagleService.password = "secret"
-eagleProps.dataJoinPollIntervalSec = 30
-
-dynamicConfigSource.enabled = true
-dynamicConfigSource.initDelayMillis = 0
-dynamicConfigSource.delayMillis = 30000
-
-
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-app/eagle-stream-application-manager/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-stream-application-manager/src/test/resources/log4j.properties b/eagle-core/eagle-app/eagle-stream-application-manager/src/test/resources/log4j.properties
deleted file mode 100644
index 25331ab..0000000
--- a/eagle-core/eagle-app/eagle-stream-application-manager/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,35 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-log4j.rootLogger=INFO, stdout
-
- eagle.log.dir=../logs
- eagle.log.file=eagle.log
-
-# standard output
-log4j.appender.stdout=org.apache.log4j.ConsoleAppender
-log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
-log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n
-
-# Daily Rolling File Appender
- log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender
- log4j.appender.DRFA.File=${eagle.log.dir}/${eagle.log.file}
- log4j.appender.DRFA.DatePattern=.yyyy-MM-dd
-## 30-day backup
-# log4j.appender.DRFA.MaxBackupIndex=30
- log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout
-
-# Pattern format: Date LogLevel LoggerName LogMessage
-log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-app/eagle-stream-application-manager/src/test/scala/org/apache/eagle/stream/application/scheduler/MockTopology.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-stream-application-manager/src/test/scala/org/apache/eagle/stream/application/scheduler/MockTopology.scala b/eagle-core/eagle-app/eagle-stream-application-manager/src/test/scala/org/apache/eagle/stream/application/scheduler/MockTopology.scala
deleted file mode 100644
index e87ee92..0000000
--- a/eagle-core/eagle-app/eagle-stream-application-manager/src/test/scala/org/apache/eagle/stream/application/scheduler/MockTopology.scala
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.eagle.stream.application.scheduler
-
-import com.typesafe.config.Config
-import org.apache.eagle.stream.application.TopologyExecutable
-import org.slf4j.LoggerFactory
-
-class MockTopology extends TopologyExecutable {
- private val LOG = LoggerFactory.getLogger(classOf[MockTopology])
- override def submit(topology: String, config: Config): Unit = {
- LOG.info(s"$topology is running")
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-app/eagle-stream-application-manager/src/test/scala/org/apache/eagle/stream/application/scheduler/StormApplicationManagerSpec.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-stream-application-manager/src/test/scala/org/apache/eagle/stream/application/scheduler/StormApplicationManagerSpec.scala b/eagle-core/eagle-app/eagle-stream-application-manager/src/test/scala/org/apache/eagle/stream/application/scheduler/StormApplicationManagerSpec.scala
deleted file mode 100644
index 1cad3a7..0000000
--- a/eagle-core/eagle-app/eagle-stream-application-manager/src/test/scala/org/apache/eagle/stream/application/scheduler/StormApplicationManagerSpec.scala
+++ /dev/null
@@ -1,40 +0,0 @@
-package org.apache.eagle.stream.application.scheduler
-
-import com.typesafe.config.ConfigFactory
-import org.apache.eagle.common.config.EagleConfigConstants
-import org.apache.eagle.stream.application.ExecutionPlatform
-import org.apache.eagle.stream.application.impl.StormExecutionPlatform
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-
-object StormApplicationManagerSpec extends App {
- val manager: ExecutionPlatform = new StormExecutionPlatform
- val baseConfig = ConfigFactory.load()
- val topoConfigStr: String = "webConfig{\"hbase.zookeeper.property.clientPort\":\"2181\", \"hbase.zookeeper.quorum\":\"localhost\"}\nappConfig{\n \"envContextConfig\" : {\n \"env\" : \"storm\",\n \"mode\" : \"cluster\",\n \"topologyName\" : \"sandbox-hbaseSecurityLog-topology\",\n \"stormConfigFile\" : \"security-auditlog-storm.yaml\",\n \"parallelismConfig\" : {\n \"kafkaMsgConsumer\" : 1,\n \"hbaseSecurityLogAlertExecutor*\" : 1\n }\n },\n \"dataSourceConfig\": {\n \"topic\" : \"sandbox_hbase_security_log\",\n \"zkConnection\" : \"127.0.0.1:2181\",\n \"zkConnectionTimeoutMS\" : 15000,\n \"brokerZkPath\" : \"/brokers\",\n \"fetchSize\" : 1048586,\n \"deserializerClass\" : \"org.apache.eagle.security.hbase.parse.HbaseAuditLogKafkaDeserializer\",\n \"transactionZKServers\" : \"127.0.0.1\",\n \"transactionZKPort\" : 2181,\n \"transactionZKRoot\" : \"/consumers\",\n \"consumerGroupId\" : \"eagle.hbasesecurity.consumer\",\n
\"transactionStateUpdateMS\" : 2000\n },\n \"alertExecutorConfigs\" : {\n \"hbaseSecurityLogAlertExecutor\" : {\n \"parallelism\" : 1,\n \"partitioner\" : \"org.apache.eagle.policy.DefaultPolicyPartitioner\"\n \"needValidation\" : \"true\"\n }\n },\n \"eagleProps\" : {\n \"site\" : \"sandbox\",\n \"application\": \"hbaseSecurityLog\",\n \"dataJoinPollIntervalSec\" : 30,\n \"mailHost\" : \"mailHost.com\",\n \"mailSmtpPort\":\"25\",\n \"mailDebug\" : \"true\",\n \"eagleService\": {\n \"host\": \"localhost\",\n \"port\": 9099\n \"username\": \"admin\",\n \"password\": \"secret\"\n }\n },\n \"dynamicConfigSource\" : {\n \"enabled\" : true,\n \"initDelayMillis\" : 0,\n \"delayMillis\" : 30000\n }\n}"
-
- val topoConfig = ConfigFactory.parseString(topoConfigStr)
- val conf = topoConfig.getConfig(EagleConfigConstants.APP_CONFIG).withFallback(baseConfig)
-
- //val (ret, nextState) = manager.execute("START", topologyDescModel, null, conf)
- //println(s"Result: ret=$ret, nextState=$nextState")
-}
-
-
-
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-app/eagle-stream-application-manager/src/test/scala/org/apache/eagle/stream/application/scheduler/TestScheduler.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-stream-application-manager/src/test/scala/org/apache/eagle/stream/application/scheduler/TestScheduler.scala b/eagle-core/eagle-app/eagle-stream-application-manager/src/test/scala/org/apache/eagle/stream/application/scheduler/TestScheduler.scala
deleted file mode 100644
index 3db2d67..0000000
--- a/eagle-core/eagle-app/eagle-stream-application-manager/src/test/scala/org/apache/eagle/stream/application/scheduler/TestScheduler.scala
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.stream.application.scheduler
-
-import akka.actor.{ActorSystem, Props}
-import akka.testkit.{TestActorRef, TestKit}
-import com.typesafe.config.ConfigFactory
-import org.scalatest.{Ignore, BeforeAndAfterAll, MustMatchers, WordSpecLike}
-
-@Ignore
-class TestSchedulerSpec extends TestKit(ActorSystem("stream-app-scheduler"))
-with WordSpecLike with MustMatchers with BeforeAndAfterAll {
-
- "A Scheduler actor" must {
- "Forward a message it receives" in {
- val coordinator = TestActorRef[StreamAppCoordinator]
- coordinator ! CommandLoaderEvent
- expectNoMsg()
- }
- }
-
- "A Integrated test" must {
- "run end-to-end" in {
- val coordinator = system.actorOf(Props[StreamAppCoordinator])
- coordinator ! CommandLoaderEvent
- expectNoMsg()
- }
- }
-
- override def afterAll(): Unit = {
- super.afterAll()
- system.shutdown()
- }
-}
-
-@Ignore
-object TestStreamAppScheduler extends App {
- val conf: String = """
- akka.loglevel = "DEBUG"
- akka.actor.debug {
- receive = on
- lifecycle = on
- }
- """
- new ApplicationScheduler().start(ConfigFactory.parseString(conf))
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-app/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/pom.xml b/eagle-core/eagle-app/pom.xml
index 6f3069c..5637b01 100644
--- a/eagle-core/eagle-app/pom.xml
+++ b/eagle-core/eagle-app/pom.xml
@@ -32,7 +32,6 @@
<modules>
<module>eagle-app-base</module>
- <module>eagle-stream-application-manager</module>
<module>eagle-application-service</module>
</modules>
</project>
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-pipeline/README.md
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/README.md b/eagle-core/eagle-data-process/eagle-stream-pipeline/README.md
deleted file mode 100644
index b8a0bdc..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-pipeline/README.md
+++ /dev/null
@@ -1,80 +0,0 @@
-<!--
-{% comment %}
-Licensed to the Apache Software Foundation (ASF) under one or more
-contributor license agreements. See the NOTICE file distributed with
-this work for additional information regarding copyright ownership.
-The ASF licenses this file to you under the Apache License, Version 2.0
-(the "License"); you may not use this file except in compliance with
-the License. You may obtain a copy of the License at
-
-http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-{% endcomment %}
--->
-
-Eagle Declarative Streaming DSL
-===============================
-
-DSL Format
-----------
-
- {
- config {
- config.key = configValue
- }
-
- schema {
- metricStreamSchema {
- metric: string
- value: double
- timestamp: long
- }
- }
-
- dataflow {
- kafkaSource.source1 {
- schema = "metricStreamSchema"
- }
- kafkaSource.source2 {
- schema = {
- metric: string
- value: double
- timestamp: long
- }
- }
- }
- }
-
-Usage
------
-
- val pipeline = Pipeline.parseResource("pipeline.conf")
- val stream = Pipeline.compile(pipeline)
- stream.submit[storm]
-
-Features
---------
-* [x] Compile DSL Configure to Pipeline model
-* [x] Compile Pipeline model to Stream Execution Graph
-* [x] Submit Stream Execution Graph to actual running environment say storm
-* [x] Support Alert and Persistence for metric monitoring
-* [ ] Extensible stream module management and automatically scan and register module
-* [x] Pipeline runner CLI tool and shell script
-* [ ] Decouple pipeline compiler and scheduler into individual modules
-* [ ] Stream Pipeline Scheduler
-* [ ] Graph editor to define streaming graph in UI
-* [?] JSON/Config & Scala Case Class Mapping (https://github.com/scala/pickling)
-* [?] Raw message structure oriented programing is a little ugly, we should define a generic message/event consist of [payload:stream/timestamp/serializer/deserializer,data:message]
-* [ ] Provide stream schema inline and send to metadata when submitting
-* [ ] UI should support specify executorId when defining new stream
-* [ ] Lack of a entity named StreamEntity for the workflow of defining topology&policy end-to-end
-* [!] Fix configuration conflict, should pass through Config instead of ConfigFactory.load() manually
-* [ ] Override application configuration with pipeline configuration
-* [ ] Refactor schema registration structure and automatically submit stream schema when submitting pipeline
-* [ ] Submit alertStream, alertExecutorId mapping to AlertExecutorService when submitting pipeline
-* [x] Supports `inputs` field to define connector
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-pipeline/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/pom.xml b/eagle-core/eagle-data-process/eagle-stream-pipeline/pom.xml
deleted file mode 100644
index 18fb610..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-pipeline/pom.xml
+++ /dev/null
@@ -1,156 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one or more
- ~ contributor license agreements. See the NOTICE file distributed with
- ~ this work for additional information regarding copyright ownership.
- ~ The ASF licenses this file to You under the Apache License, Version 2.0
- ~ (the "License"); you may not use this file except in compliance with
- ~ the License. You may obtain a copy of the License at
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing, software
- ~ distributed under the License is distributed on an "AS IS" BASIS,
- ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ~ See the License for the specific language governing permissions and
- ~ limitations under the License.
- -->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <parent>
- <artifactId>eagle-data-process-parent</artifactId>
- <groupId>org.apache.eagle</groupId>
- <version>0.5.0-incubating-SNAPSHOT</version>
- </parent>
- <modelVersion>4.0.0</modelVersion>
- <artifactId>eagle-stream-pipeline</artifactId>
- <dependencies>
- <!--<dependency>-->
- <!--<groupId>org.reflections</groupId>-->
- <!--<artifactId>reflections</artifactId>-->
- <!--</dependency>-->
- <dependency>
- <groupId>org.apache.eagle</groupId>
- <artifactId>eagle-service-base</artifactId>
- <version>${project.version}</version>
- <exclusions>
- <exclusion>
- <groupId>org.ow2.asm</groupId>
- <artifactId>asm-all</artifactId>
- </exclusion>
- <exclusion>
- <groupId>asm</groupId>
- <artifactId>asm</artifactId>
- </exclusion>
- <exclusion>
- <groupId>asm</groupId>
- <artifactId>asm-all</artifactId>
- </exclusion>
- <exclusion>
- <groupId>asm</groupId>
- <artifactId>asm-commons</artifactId>
- </exclusion>
- <exclusion>
- <groupId>asm</groupId>
- <artifactId>asm-tree</artifactId>
- </exclusion>
- <exclusion>
- <artifactId>servlet-api</artifactId>
- <groupId>javax.servlet</groupId>
- </exclusion>
- <exclusion>
- <artifactId>slf4j-log4j12</artifactId>
- <groupId>org.slf4j</groupId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.apache.eagle</groupId>
- <artifactId>eagle-storage-base</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </dependency>
-<!-- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- <scope>test</scope>
- </dependency>
--->
- <!--<dependency>-->
- <!--<groupId>org.scala-lang</groupId>-->
- <!--<artifactId>scala-reflect</artifactId>-->
- <!--</dependency>-->
- <!--<dependency>-->
- <!--<groupId>org.scala-lang</groupId>-->
- <!--<artifactId>scala-compiler</artifactId>-->
- <!--<version>${scala.version}.0</version>-->
- <!--</dependency>-->
- <dependency>
- <groupId>org.scalatest</groupId>
- <artifactId>scalatest_${scala.version}</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.eagle</groupId>
- <artifactId>eagle-stream-process-api</artifactId>
- <version>${project.version}</version>
- </dependency>
- <!--<dependency>-->
- <!--<groupId>com.typesafe.akka</groupId>-->
- <!--<artifactId>akka-testkit_${scala.version}</artifactId>-->
- <!--<version>${akka.actor.version}</version>-->
- <!--<scope>test</scope>-->
- <!--</dependency>-->
- </dependencies>
- <build>
- <plugins>
- <plugin>
- <groupId>org.scala-tools</groupId>
- <artifactId>maven-scala-plugin</artifactId>
- <executions>
- <execution>
- <id>scala-compile-first</id>
- <phase>process-resources</phase>
- <goals>
- <goal>add-source</goal>
- <goal>compile</goal>
- </goals>
- </execution>
- <execution>
- <id>scala-test-compile</id>
- <phase>process-test-resources</phase>
- <goals>
- <goal>testCompile</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-surefire-plugin</artifactId>
- <configuration>
- <skipTests>true</skipTests>
- </configuration>
- </plugin>
- <plugin>
- <groupId>org.scalatest</groupId>
- <artifactId>scalatest-maven-plugin</artifactId>
- <configuration>
- <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
- <junitxml>.</junitxml>
- <filereports>TestSuite.txt</filereports>
- </configuration>
- <executions>
- <execution>
- <id>test</id>
- <goals>
- <goal>test</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
-</project>
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/Pipeline.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/Pipeline.scala b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/Pipeline.scala
deleted file mode 100644
index 65ab390..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/Pipeline.scala
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.stream.pipeline
-
-
-import org.apache.eagle.stream.pipeline.compiler.PipelineCompiler
-import org.apache.eagle.stream.pipeline.parser.PipelineParser
-import org.apache.eagle.stream.pipeline.runner.PipelineRunner
-
-object Pipeline
- extends PipelineRunner
- with PipelineParser
- with PipelineCompiler
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/annotation/Extension.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/annotation/Extension.scala b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/annotation/Extension.scala
deleted file mode 100644
index 2ff81d4..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/annotation/Extension.scala
+++ /dev/null
@@ -1,21 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.stream.pipeline.annotation
-
-import scala.annotation.StaticAnnotation
-
-case class Extension(extType:String) extends StaticAnnotation
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/compiler/PipelineCompiler.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/compiler/PipelineCompiler.scala b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/compiler/PipelineCompiler.scala
deleted file mode 100644
index df8bbe5..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/compiler/PipelineCompiler.scala
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.stream.pipeline.compiler
-
-
-import org.apache.eagle.datastream.core._
-import org.apache.eagle.stream.pipeline.extension.ModuleManager._
-import org.apache.eagle.stream.pipeline.parser._
-import org.apache.eagle.stream.pipeline.utils.CompileException
-
-trait PipelineCompiler {
- def compile(pipeline:Pipeline):StreamContext = {
- val context = new StreamContext(pipeline.config)
- val dataflow = pipeline.dataflow
- val dag = new StreamDAG(context.dag)
- dataflow.getProcessors.map(buildStreamProducer(dag,_)).foreach(producer =>{
- producer.initWith(dag.graph,pipeline.config)
- dag.addVertex(producer)
- })
- dataflow.getConnectors.foreach(connector =>{
- val from = dag.getNodeByName(connector.from).get
- val to = dag.getNodeByName(connector.to).get
- dag.addEdge(from,to,buildStreamConnector(from,to,dataflow,connector))
- })
- context
- }
- private def buildStreamProducer(dag:StreamDAG,processor:Processor):StreamProducer[Any] = {
- if(findModuleType(processor.getType)){
- getModuleMapperByType(processor.getType).map(processor).nameAs(processor.getId).stream(processor.streamId)
- } else {
- throw new CompileException(s"Unknown processor type [${processor.getType}]")
- }
- }
- private def buildStreamConnector(from:StreamProducer[Any],to:StreamProducer[Any],dataflow:DataFlow,connector:Connector):StreamConnector[Any,Any]={
- var groupByIndexes:Seq[Int] = connector.groupByIndexes.orNull
- if(groupByIndexes!=null ){
- if(connector.groupByFields.isDefined) throw new CompileException(s"Both ${Connector.GROUP_BY_FIELD_FIELD} and ${Connector.GROUP_BY_INDEX_FIELD} is defined at same time")
- } else if(connector.groupByFields.isDefined){
- groupByIndexes = connector.groupByFields.get.map(dataflow.getProcessor(from.name).get.getSchema.get.indexOfAttribute)
- }
- if(groupByIndexes == null){
- ShuffleConnector(from,to)
- } else {
- GroupbyFieldsConnector(from,to,groupByIndexes)
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/extension/ModuleManager.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/extension/ModuleManager.scala b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/extension/ModuleManager.scala
deleted file mode 100644
index 2174560..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/extension/ModuleManager.scala
+++ /dev/null
@@ -1,186 +0,0 @@
-package org.apache.eagle.stream.pipeline.extension
-
-import java.util.Properties
-import java.util.concurrent.atomic.AtomicBoolean
-
-import com.typesafe.config.ConfigFactory
-import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSourcedSpoutProvider
-import org.apache.eagle.datastream.core._
-import org.apache.eagle.partition.PartitionStrategy
-import org.apache.eagle.stream.pipeline.parser.Processor
-import org.apache.kafka.clients.producer.{Callback, KafkaProducer, ProducerRecord, RecordMetadata}
-//import org.slf4j.LoggerFactory
-
-import scala.collection.JavaConverters._
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-object ModuleManager{
- def getModuleMapperByType(moduleType:String):ModuleMapper = {
- classOfProcessorMapping(moduleType)
- }
-
- def findModuleType(moduleType:String):Boolean = classOfProcessorMapping.contains(moduleType)
-
- val classOfProcessorMapping = Map[String,ModuleMapper](
- "KafkaSource" -> KafkaSourceStreamProducer,
- "KafkaSink" -> KafkaSinkStreamProducer,
- "Alert" -> AlertStreamProducer,
- "Persistence" -> PersistProducer,
- "Aggregator" -> AggregatorProducer,
- "Console" -> ConsoleStreamProducer
- )
-}
-
-trait ModuleMapper{
- def getType:String
- def map(module:Processor):StreamProducer[Any]
-}
-object KafkaSourceStreamProducer extends ModuleMapper{
- def getType = "KafkaSource"
- override def map(module:Processor): StreamProducer[Any] = {
- val config = module.getConfig
- new StormSourceProducer[Any](new KafkaSourcedSpoutProvider(null).getSpout(ConfigFactory.parseMap(config.asJava)))
- }
-}
-object KafkaSinkStreamProducer extends ModuleMapper{
- def getType = "KafkaSink"
- override def map(module:Processor): StreamProducer[Any] = {
- val config = module.getConfig
- ForeachProducer[AnyRef](KafkaSinkExecutor(config))
- }
-}
-object ConsoleStreamProducer extends ModuleMapper{
- override def getType: String = "Stdout"
- override def map(module:Processor): StreamProducer[Any] = ForeachProducer[Any](m=>print(s"$m\n"))
-}
-object AlertStreamProducer extends ModuleMapper{
- def getType:String = "Alert"
- override def map(module:Processor): StreamProducer[Any] = {
- val config = module.getConfig
- val moduleId = module.getId
- // Support create functional AlertStreamProducer constructor
- new AlertStreamProducer (
- upStreamNames = config.getOrElse("upStreamNames",if(module.inputIds!=null) module.inputIds.asJava else null).asInstanceOf[java.util.List[String]],
- alertExecutorId = config.getOrElse("alertExecutorId",moduleId).asInstanceOf[String],
- consume = config.getOrElse("consume",true).asInstanceOf[Boolean],
- strategy = config.get("strategy") match {case Some(strategy)=> Class.forName(strategy.asInstanceOf[String]).newInstance().asInstanceOf[PartitionStrategy] case None => null}
- )
- }
-}
-
-object PersistProducer extends ModuleMapper{
- override def getType = "Persistence"
- override def map(module:Processor): StreamProducer[Any] = {
- val config = module.getConfig
- new PersistProducer(config.getOrElse("executorId",module.getId).asInstanceOf[String],StorageType.withName(config.getOrElse("storageType",null).asInstanceOf[String]))
- }
-}
-
-object AggregatorProducer extends ModuleMapper{
- override def getType: String = "Aggregator"
- override def map(module:Processor): StreamProducer[Any] = {
- val config = module.getConfig
- new AggregateProducer(
- upStreamNames = config.getOrElse("upStreamNames",if(module.inputIds!=null) module.inputIds.asJava else null).asInstanceOf[java.util.List[String]],
- config.getOrElse("analyzer",module.getId).asInstanceOf[String],
- config.get("sql") match {case Some(sql) => sql.asInstanceOf[String] case None => null },
- config.get("strategy") match {case Some(strategy)=> Class.forName(strategy.asInstanceOf[String]).newInstance().asInstanceOf[PartitionStrategy] case None => null}
- )
- }
-}
-
-object KafkaSinkExecutor{
-// val LOG = LoggerFactory.getLogger(classOf[KafkaSinkExecutor])
-}
-
-/**
- * @todo currently support single topic now, should support topic selector
- * @param config
- */
-case class KafkaSinkExecutor(config:Map[String,AnyRef]) extends ((AnyRef) => Unit) with Serializable{
-
- val TOPIC_KEY = "topic"
- def getDefaultProps = {
- val props = new Properties()
- props.putAll(Map[String,AnyRef](
- "bootstrap.servers" -> "localhost:6667",
- "acks" -> "all",
- "retries" -> "3",
- "batch.size" -> "16384",
- "linger.ms" -> "1",
- "buffer.memory" -> "33554432",
- "key.serializer" -> "org.apache.kafka.common.serialization.StringSerializer",
- "value.serializer" -> classOf[org.apache.eagle.dataproc.impl.storm.kafka.JsonSerializer].getCanonicalName
- ).asJava)
- props
- }
-
- @transient var initialized:AtomicBoolean = new AtomicBoolean(false)
- @transient var producer:KafkaProducer[String,AnyRef] = null
- @transient var topic:String = null
- @transient var timeoutMs:Long = 3000
-
- private def init():Unit = {
- if(this.initialized != null && this.initialized.get()){
-// LOG.info("Already initialized, skip")
- return
- }
- this.initialized = new AtomicBoolean(false)
- if (producer != null) {
-// LOG.info(s"Closing $producer")
- producer.close()
- }
-// LOG.info("Initializing and creating Kafka Producer")
- if (config.contains(TOPIC_KEY)) {
- this.topic = config.get(TOPIC_KEY).get.asInstanceOf[String]
- } else {
- throw new IllegalStateException("topic is not defined")
- }
- val props = getDefaultProps
- props.putAll((config - TOPIC_KEY).asJava)
- producer = new KafkaProducer[String, AnyRef](props)
-// LOG.info(s"Created new KafkaProducer: $producer")
- initialized.set(true)
- }
-
- override def apply(value: AnyRef): Unit = {
- if(initialized == null || !initialized.get()) init()
- if(topic == null) throw new IllegalStateException("topic is not defined")
- val isList = value.isInstanceOf[java.util.List[AnyRef]]
- val record: ProducerRecord[String, AnyRef] = if(isList){
- val list = value.asInstanceOf[java.util.List[AnyRef]]
- if(list.size() == 1) {
- new ProducerRecord[String, AnyRef](topic, value.asInstanceOf[java.util.List[AnyRef]].get(0))
- }else{
- new ProducerRecord[String, AnyRef](topic, value)
- }
- }else{
- new ProducerRecord[String, AnyRef](topic,value)
- }
- producer.send(record,new Callback(){
- override def onCompletion(metadata: RecordMetadata, exception: Exception): Unit = {
- if(exception!=null){
-// LOG.error(s"Failed to send record $value to topic: $topic",exception)
- throw new IllegalStateException(s"Failed to send record $value to topic: $topic",exception)
- }
- }
- })
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/parser/DataFlow.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/parser/DataFlow.scala b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/parser/DataFlow.scala
deleted file mode 100644
index 7e1f4cf..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/parser/DataFlow.scala
+++ /dev/null
@@ -1,235 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.stream.pipeline.parser
-
-import com.typesafe.config.Config
-import org.apache.eagle.stream.pipeline.utils.ParseException
-
-import scala.collection.JavaConversions._
-import scala.collection.JavaConverters._
-
-import scala.collection.mutable
-
-
-class DataFlow {
- def getInputs(id: String):Seq[Processor] = {
- this.getConnectors.filter(_.to.equals(id)).map(c => getProcessor(c.from).get)
- }
-
- /**
- * Connect if not, do nothing if already connected
- *
- * @param from
- * @param to
- */
- def connect(from: String, to: String): Unit = {
- val connector = Connector(from,to,null)
- var exists = false
- connectors.foreach(c => exists = (c.from.equals(from) && c.to.equals(to)) || exists)
- if(!exists) addConnector(connector)
- }
-
- private var processors = mutable.Map[String,Processor]()
- private var connectors = mutable.Seq[Connector]()
- def setProcessors(processors:Seq[Processor]):Unit = {
- processors.foreach{module =>
- this.processors.put(module.getId,module)
- }
- }
- def setProcessors(processors:mutable.Map[String,Processor]):Unit = {
- this.processors = processors
- }
- def setConnectors(connectors:Seq[Connector]):Unit = {
- connectors.foreach(connector =>{
- this.connectors :+= connector
- })
- }
- def addProcessor(module:Processor):Unit = {
- if(contains(module)) throw new IllegalArgumentException(s"Duplicated processor id error, ${module.getId} has already been defined as ${getProcessor(module.getId)}")
- processors.put(module.getId,module)
- }
-
- def contains(module:Processor):Boolean = processors.contains(module.getId)
- def addConnector(connector:Connector):Unit = {
- connectors :+= connector
- }
- def getProcessors:Seq[Processor] = processors.values.toSeq
- def getProcessor(processorId:String):Option[Processor] = processors.get(processorId)
- def getConnectors:Seq[Connector] = connectors
-}
-
-/**
- * Stream Processor
- *
- * @param processorId
- * @param processorType
- * @param schema
- * @param processorConfig
- */
-case class Processor(var processorId:String = null,var processorType:String = null,var schema:Schema = null, var processorConfig:Map[String,AnyRef] = null) extends Serializable {
- private[pipeline] var inputs:Seq[Processor] = null
- private[pipeline] var inputIds:Seq[String] = null
-
- def getId:String = processorId
- def getType:String = processorType
- def getConfig:Map[String,AnyRef] = processorConfig
- def getSchema:Option[Schema] = if(schema == null) None else Some(schema)
-
- /**
- * @todo assume processorId as streamId
- * @return
- */
- def streamId = processorId
-}
-
-case class Connector (from:String,to:String, config:Map[String,AnyRef]) extends Serializable{
- import Connector._
-
- def group:Option[String] = config.get(GROUP_FIELD).asInstanceOf[Option[String]]
- def groupByFields:Option[Seq[String]] = config.get(GROUP_BY_FIELD_FIELD) match {
- case Some(obj) => Some(obj.asInstanceOf[java.util.List[String]].asScala.toSeq)
- case None => None
- }
- def groupByIndexes:Option[Seq[Int]] = config.get(GROUP_BY_INDEX_FIELD) match {
- case Some(obj) => Some(obj.asInstanceOf[java.util.List[java.lang.Integer]].asScala.toSeq.map(Int.unbox(_)))
- case None => None
- }
-}
-
-object Connector{
- val GROUP_FIELD = "grouping"
- val GROUP_BY_FIELD_FIELD = "groupByField"
- val GROUP_BY_INDEX_FIELD = "groupByIndex"
-}
-
-private [pipeline]
-object Processor {
- val SCHEMA_FIELD:String = "schema"
- val INPUTS_FIELD = "inputs"
- def parse(processorId:String,processorType:String,context:Map[String,AnyRef], schemaSet:SchemaSet):Processor = {
- val schema = context.get(SCHEMA_FIELD) match {
- case Some(schemaDef) => schemaDef match {
- case schemaId:String => schemaSet.get(schemaId).getOrElse {
- throw new ParseException(s"Schema [$schemaId] is not found but referred by [$processorType:$processorId] in $context")
- }
- case schemaMap:java.util.HashMap[String,AnyRef] => Schema.parse(schemaMap.toMap)
- case _ => throw new ParseException(s"Illegal value for schema: $schemaDef")
- }
- case None => null
- }
- val instance = new Processor(processorId,processorType,schema,context-SCHEMA_FIELD)
- if(context.contains(INPUTS_FIELD)) instance.inputIds = context.get(INPUTS_FIELD).get.asInstanceOf[java.util.List[String]].asScala.toSeq
- instance
- }
-}
-
-
-trait DataFlowParser {
- def parse(config:Config,schemaSet:SchemaSet = SchemaSet.empty()):DataFlow = {
- val dataw = new DataFlow()
- val map = config.root().unwrapped().toMap
-
- // Parse processors and connectors
- map.foreach(entry => {
- parseSingle(entry._1,entry._2.asInstanceOf[java.util.HashMap[String,AnyRef]].toMap,dataw,schemaSet)
- })
- expand(dataw)
- validate(dataw)
- dataw
- }
-
- private def expand(datafw: DataFlow):Unit = {
- datafw.getProcessors.foreach(proc =>{
- if(proc.inputIds!=null) {
- proc.inputIds.foreach(id => {
- // connect if not
- datafw.connect(id,proc.getId)
- })
- }
- proc.inputs = datafw.getInputs(proc.getId)
- proc.inputIds = proc.inputs.map(_.getId)
- })
- }
-
- private def
- validate(pipeline:DataFlow): Unit ={
- def checkModuleExists(id:String): Unit ={
- pipeline.getProcessor(id).orElse {
- throw new ParseException(s"Stream [$id] is not defined before being referred")
- }
- }
-
- pipeline.getConnectors.foreach {connector =>
- checkModuleExists(connector.from)
- checkModuleExists(connector.to)
- }
- }
-
- private def
- parseSingle(identifier:String,config:Map[String,AnyRef],dataflow:DataFlow, schemaSet: SchemaSet):Unit = {
- Identifier.parse(identifier) match {
- case DefinitionIdentifier(processorType) => {
- config foreach {entry =>
- dataflow.addProcessor(Processor.parse(entry._1, processorType,entry._2.asInstanceOf[java.util.HashMap[String, AnyRef]].toMap,schemaSet))
- }
- }
- case ConnectionIdentifier(fromIds,toId) => fromIds.foreach { fromId =>
- if(fromId.eq(toId)) throw new ParseException(s"Can't connect $fromId to $toId")
- dataflow.addConnector(Connector(fromId,toId,config))
- }
- case _ => ???
- }
- }
-}
-
-
-private[pipeline] trait Identifier
-
-private[pipeline] case class DefinitionIdentifier(moduleType: String) extends Identifier
-private[pipeline] case class ConnectionIdentifier(fromIds: Seq[String], toId: String) extends Identifier
-
-private[pipeline] object Identifier {
- val ConnectorFlag = "->"
- val UnitFlagSplitPattern = "\\|"
- val UnitFlagChar = "|"
- val ConnectorPattern = s"([\\w-|\\s]+)\\s+$ConnectorFlag\\s+([\\w-_]+)".r
- def parse(identifier: String): Identifier = {
- // ${id} -> ${id}
- ConnectorPattern.findFirstMatchIn(identifier) match {
- case Some(matcher) => {
- if(matcher.groupCount != 2){
- throw new ParseException(s"Illegal connector definition: $identifier")
- }else{
- val source = matcher.group(1)
- val destination = matcher.group(2)
- if(source.contains(UnitFlagChar)) {
- val sources = source.split(UnitFlagSplitPattern).toSeq
- ConnectionIdentifier(sources.map{_.trim()},destination)
- }else{
- ConnectionIdentifier(Seq(source),destination)
- }
- }
- }
- case None => {
- if(identifier.contains(ConnectorFlag)) throw new ParseException(s"Failed to parse $identifier")
- DefinitionIdentifier(identifier)
- }
- }
- }
-}
-
-object DataFlow extends DataFlowParser
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/parser/Pipeline.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/parser/Pipeline.scala b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/parser/Pipeline.scala
deleted file mode 100644
index eb09156..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/parser/Pipeline.scala
+++ /dev/null
@@ -1,95 +0,0 @@
-package org.apache.eagle.stream.pipeline.parser
-
-import java.io.File
-
-import com.typesafe.config.{Config, ConfigFactory}
-
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-case class Pipeline(config:Config,dataflow:DataFlow)
-
-/**
- * Pipeline configuration parser
- *
- * For example:
- *
- * {{{
- * <code>
- * {
- * config {
- * execution.environment.config = someValue
- * }
- * schema {
- * metricStreamSchema {
- * metric: string
- * value: double
- * timestamp: long
- * }
- * }
- * dataflow {
- * kafkaSource.source1 {
- * schema = "metricStreamSchema"
- * }
- * kafkaSource.source2 {
- * schema = {
- * metric: string
- * value: double
- * timestamp: long
- * }
- * }
- * }
- * }
- * </code>
- * }}}
- */
-trait PipelineParser{
- val CONFIG_FIELD = "config"
- val SCHEMA_FIELD = "schema"
- val DATAFLOW_FIELD = "dataflow"
-
- def parse(config:Config):Pipeline = {
- if(config.isEmpty) throw new IllegalArgumentException("Pipeline configuration is empty")
- var pConfig:Config = ConfigFactory.empty()
- var pSchemaSet:SchemaSet = SchemaSet.empty()
- var pDataflow:DataFlow = null
- if(config.hasPath(CONFIG_FIELD)) pConfig = config.getConfig(CONFIG_FIELD)
- if(config.hasPath(SCHEMA_FIELD)) pSchemaSet = SchemaSet.parse(config.getConfig(SCHEMA_FIELD))
- if(config.hasPath(DATAFLOW_FIELD)) pDataflow = DataFlow.parse(config.getConfig(DATAFLOW_FIELD),pSchemaSet)
-
- // Merge pipeline config over base config
- val baseConfig =ConfigFactory.load()
- pConfig = if(pConfig!=null) pConfig.withFallback(baseConfig) else baseConfig
- new Pipeline(pConfig,pDataflow)
- }
-
- def parseString(config:String):Pipeline = parse(ConfigFactory.parseString(config))
-
- def parseStringWithConfig(dataFlow:String, config: Config) = {
- val pConfig = config.withFallback(ConfigFactory.parseString(dataFlow))
- parse(pConfig)
- }
-
- def parseResource(resource:String):Pipeline = {
- // TODO: Load environment, currently hard-code with storm
- if(resource.startsWith("/") || resource.startsWith("./")){
- parse(ConfigFactory.parseFile(new File(resource)))
- } else{
- parse(ConfigFactory.parseResourcesAnySyntax(getClass.getClassLoader,resource))
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/parser/Schema.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/parser/Schema.scala b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/parser/Schema.scala
deleted file mode 100644
index 7653f9e..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/parser/Schema.scala
+++ /dev/null
@@ -1,152 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.stream.pipeline.parser
-
-import com.typesafe.config.Config
-
-import scala.collection.JavaConversions.mapAsScalaMap
-import scala.collection.mutable
-
-class Field(name:String) extends Serializable{
- def getName:String = name
-}
-
-case class StringField(name:String) extends Field(name)
-case class LongField(name:String) extends Field(name)
-case class IntegerField(name:String) extends Field(name)
-case class BooleanField(name:String) extends Field(name)
-case class FloatField(name:String) extends Field(name)
-case class DoubleField(name:String) extends Field(name)
-case class DatetimeField(name:String,format:String) extends Field(name)
-
-object Field{
- def string(name:String) = StringField(name)
- def long(name:String) = LongField(name)
- def integer(name:String) = IntegerField(name)
- def boolean(name:String) = BooleanField(name)
- def float(name:String) = FloatField(name)
- def double(name:String) = DoubleField(name)
- def datetime(name:String)(format:String) = DatetimeField(name,format)
-
- def apply(name:String,typeName:String):Field = typeName match {
- case "string" => string(name)
- case "long" => long(name)
- case "integer" => integer(name)
- case "boolean" => boolean(name)
- case "float" => float(name)
- case "double" => double(name)
- case _ => throw new UnsupportedOperationException(s"""Unknown attribute type $typeName for attribute "$name"""")
- }
-}
-
-case class Schema(attributes:Seq[Field]) extends Serializable{
- def getAttribute(attributeName:String):Option[Field]={
- if(attributes != null){
- attributes.find(_.getName.eq(attributeName))
- }else None
- }
-
- def indexOfAttribute(attributeName:String):Int = {
- if(attributes != null){
- attributes.indexWhere(_.getName.eq(attributeName))
- } else -1
- }
-
- @throws[IllegalArgumentException]
- def indexOfAttributeOrException(attributeName:String):Int = {
- if(attributes != null){
- attributes.indexWhere(_.getName.eq(attributeName))
- } else throw new IllegalArgumentException(s"Attribute [$attributeName] is not found in stream $this")
- }
-}
-
-object Schema{
- def parse(map:Map[String,AnyRef]):Schema = {
- new Schema(map.keys.map {attributeName =>
- map(attributeName) match{
- case simpleType:String => Field(attributeName,simpleType)
- case complexType:java.util.Map[String,AnyRef] => throw new IllegalStateException(s"ComplexType attribute definition is not supported yet [$attributeName : $complexType] ")
- case otherType@_ => throw new IllegalStateException(s"Illegal attribute definition $attributeName : $otherType")
- }
- }.toSeq)
- }
-
- /**
- * @param attributes support string, symbol, Attribute and so on.
- * @return
- */
- def build(attributes:Seq[AnyRef]):Schema = {
- new Schema(attributes.map{ a:AnyRef =>
- a match {
- case t:(String, AnyRef) => {
- t._2 match {
- case v:String => Field(t._1,v)
- case v:Symbol => Field(t._1,v.name)
- case _ => throw new UnsupportedOperationException(s"Illegal attribute definition $a")
- }
- }
- case t:Field => t
- case _ => throw new UnsupportedOperationException(s"Illegal attribute definition $a")
- }
- })
- }
-}
-
-private[pipeline] class StreamUndefinedException(message:String = "stream is not defined",throwable: Throwable = null) extends Exception(message,throwable)
-
-private[pipeline] class SchemaSet {
- private val processorSchemaCache = mutable.Map[String,Schema]()
- def set(schemaId:String,schema:Schema):Unit = {
- if(processorSchemaCache.contains(schemaId)) throw new IllegalArgumentException(
- s"""
- |Failed to define schema for $schemaId as $schema,
- |because it has been defined as ${processorSchemaCache(schemaId)},
- |please call updateSchema(processorId,schema) instead
- """)
- processorSchemaCache.put(schemaId,schema)
- }
- def get(schemaId:String):Option[Schema] = processorSchemaCache.get(schemaId)
-}
-
-private[pipeline] object SchemaSet{
- def empty() = new SchemaSet()
- /**
- * For example:
- *
- * <code>
- * {
- * metricStream {
- * metric: string
- * value: double
- * timestamp: long
- * }
- * }
- * </code>
- * @param schemaConfig
- * @return
- */
- def parse(schemaConfig:Map[String,AnyRef]):SchemaSet = {
- val schemas = new SchemaSet()
- schemaConfig.foreach(entry =>{
- schemas.set(entry._1,Schema.parse(entry._2.asInstanceOf[java.util.HashMap[String,AnyRef]].toMap))
- })
- schemas
- }
-
- def parse(config:Config):SchemaSet = parse(config.root().unwrapped().asInstanceOf[java.util.HashMap[String,AnyRef]].toMap)
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/runner/PipelineRunner.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/runner/PipelineRunner.scala b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/runner/PipelineRunner.scala
deleted file mode 100644
index 1c964e1..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/runner/PipelineRunner.scala
+++ /dev/null
@@ -1,115 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.stream.pipeline.runner
-
-import java.util
-
-import com.typesafe.config.Config
-import org.apache.commons.cli.{CommandLine, Options}
-import org.apache.eagle.dataproc.util.ConfigOptionParser
-import org.apache.eagle.datastream.ExecutionEnvironments.storm
-import org.apache.eagle.datastream.core.ExecutionEnvironment
-import org.apache.eagle.stream.pipeline.compiler.PipelineCompiler
-import org.apache.eagle.stream.pipeline.parser.PipelineParser
-import org.slf4j.LoggerFactory
-
-import scala.reflect.runtime.{universe => ru}
-
-trait PipelineRunner extends PipelineParser with PipelineCompiler{
- import PipelineCLIOptionParser._
- private val LOG = LoggerFactory.getLogger("PipelineCLIOptionParser")
- def submit[T <: ExecutionEnvironment](resource:String)(implicit typeTag:ru.TypeTag[T]) =
- compile(parseResource(resource)).submit[T]
- def submit(resource:String,clazz:Class[ExecutionEnvironment]) =
- compile(parseResource(resource)).submit(clazz)
- def submit(pipelineConfig:Config,clazz:Class[ExecutionEnvironment]) =
- compile(parse(pipelineConfig)).submit(clazz)
- def submit[T <: ExecutionEnvironment](pipelineConfig:Config)(implicit typeTag: ru.TypeTag[T]) =
- compile(parse(pipelineConfig)).submit[T]
-
- def apply(args:Array[String]):PipelineRunner = {
- new ConfigOptionParser().load(args)
- this
- }
-
- def main(args: Array[String]): Unit = {
- val config = PipelineCLIOptionParser.load(args)
- if(config.hasPath(PIPELINE_CONFIG_KEY)) {
- submit[storm](config.getString(PIPELINE_CONFIG_KEY))
- } else {
- sys.error(
- s"""
- |Error: --$PIPELINE_OPT_KEY is required
- |$USAGE
- """.stripMargin)
- }
- }
-}
-
-private[runner] object PipelineCLIOptionParser extends ConfigOptionParser{
- val LOG = LoggerFactory.getLogger("PipelineCLIOptionParser")
- val PIPELINE_OPT_KEY="pipeline"
-
- val PIPELINE_CONFIG_KEY="pipeline.config"
-
- val CONFIG_OPT_KEY="conf"
- val CONFIG_RESOURCE_KEY="config.resource"
- val CONFIG_FILE_KEY="config.file"
- val USAGE =
- """
- |Usage: java org.apache.eagle.stream.pipeline.Pipeline [options]
- |
- |Options:
- | --pipeline pipeline configuration
- | --conf common configuration
- | --env storm (support spark, etc later)
- | --mode local/remote/cluster
- """.stripMargin
-
- override protected def options(): Options = {
- val options = super.options()
- options.addOption(PIPELINE_OPT_KEY, true, "Pipeline configuration file")
- options.addOption(CONFIG_OPT_KEY, true, "Config properties file")
- options
- }
-
- override protected def parseCommand(cmd: CommandLine): util.Map[String, String] = {
- val map = super.parseCommand(cmd)
-
- if (cmd.hasOption(PIPELINE_OPT_KEY)) {
- val pipelineConf = cmd.getOptionValue(PIPELINE_OPT_KEY)
- if(pipelineConf == null){
- throw new IllegalArgumentException(s"--$PIPELINE_OPT_KEY should not be null")
- } else {
- LOG.info(s"Set $PIPELINE_CONFIG_KEY as $pipelineConf")
- map.put(PIPELINE_CONFIG_KEY, pipelineConf)
- }
- }
-
- if(cmd.hasOption(CONFIG_OPT_KEY)){
- val commonConf = cmd.getOptionValue(CONFIG_OPT_KEY)
- if(commonConf.contains("/")){
- LOG.info(s"Set $CONFIG_FILE_KEY as $commonConf")
- map.put(CONFIG_FILE_KEY, commonConf)
- }else {
- LOG.info(s"Set $CONFIG_RESOURCE_KEY $commonConf")
- map.put(CONFIG_RESOURCE_KEY, commonConf)
- }
- }
- map
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/utils/Exceptions.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/utils/Exceptions.scala b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/utils/Exceptions.scala
deleted file mode 100644
index 1102a33..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/utils/Exceptions.scala
+++ /dev/null
@@ -1,20 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.stream.pipeline.utils
-
-class ParseException(message:String) extends Exception(message)
-class CompileException(message:String) extends Exception(message)
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/application.conf b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/application.conf
deleted file mode 100644
index 3e8f69c..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/application.conf
+++ /dev/null
@@ -1,34 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing software
-# distributed under the License is distributed on an "AS IS" BASIS
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-{
- "eagleProps" : {
- "dataJoinPollIntervalSec" : 30
- "mailHost" : "smtp.server.host"
- "mailSmtpPort":"25"
- "mailDebug" : "true"
- "eagleService": {
- "host": "localhost"
- "port": 9099
- "username": "admin"
- "password": "secret"
- }
- }
- "dynamicConfigSource" : {
- "enabled" : true
- "initDelayMillis" : 0
- "delayMillis" : 30000
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/eagle-pipeline.sh
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/eagle-pipeline.sh b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/eagle-pipeline.sh
deleted file mode 100644
index 4250681..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/eagle-pipeline.sh
+++ /dev/null
@@ -1,20 +0,0 @@
-#!/usr/bin/env bash
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing software
-# distributed under the License is distributed on an "AS IS" BASIS
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-# ./eagle-pipeline.sh --pipeline [pipeline-definition-config] --config [base-configuration]
-
-
-
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/log4j.properties b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/log4j.properties
deleted file mode 100644
index c8a4f46..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,19 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-log4j.rootLogger=INFO, stdout
-log4j.appender.stdout=org.apache.log4j.ConsoleAppender
-log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
-log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_1.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_1.conf b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_1.conf
deleted file mode 100644
index 6dddf7a..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_1.conf
+++ /dev/null
@@ -1,131 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing software
-# distributed under the License is distributed on an "AS IS" BASIS
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-{
- config {
- alertExecutorConfigs {
- defaultAlertExecutor {
- "parallelism" : 1
- "partitioner" : "org.apache.eagle.policy.DefaultPolicyPartitioner"
- "needValidation" : "true"
- }
- }
- eagleProps {
- "site" : "sandbox"
- "application": "eventSource"
- "dataJoinPollIntervalSec" : 30
- "mailHost" : "mail.host.com"
- "mailSmtpPort":"25"
- "mailDebug" : "true"
- "eagleService": {
- "host": "localhost"
- "port": 38080
- "username": "admin"
- "password": "secret"
- }
- }
- dynamicConfigSource {
- "enabled" : true
- "initDelayMillis" : 0
- "delayMillis" : 30000
- }
- }
-
- schema {
- metricStreamSchema {
- metric: string
- value: double
- timestamp: long
- }
- }
-
- dataflow {
- KafkaSource.metricStream_1 {
- parallism = 1000
- topic = "metric_event_1"
- zkConnection = "localhost:2181"
- zkConnectionTimeoutMS = 15000
- consumerGroupId = "Consumer"
- fetchSize = 1048586
- transactionZKServers = "localhost"
- transactionZKPort = 2181
- transactionZKRoot = "/consumers"
- transactionStateUpdateMS = 2000
- deserializerClass = "org.apache.eagle.datastream.storm.JsonMessageDeserializer"
- schema = "metricStreamSchema"
- }
-
- KafkaSource.metricStream_2 {
- parallism = 1000
- topic = "metric_event_2"
- zkConnection = "localhost:2181"
- zkConnectionTimeoutMS = 15000
- consumerGroupId = "Consumer"
- fetchSize = 1048586
- transactionZKServers = "localhost"
- transactionZKPort = 2181
- transactionZKRoot = "/consumers"
- transactionStateUpdateMS = 2000
- deserializerClass = "org.apache.eagle.datastream.storm.JsonMessageDeserializer"
- }
-
- KafkaSource.metricStream_3{
- parallism = 1000
- topic = "metric_event_3"
- zkConnection = "localhost:2181"
- zkConnectionTimeoutMS = 15000
- consumerGroupId = "Consumer"
- fetchSize = 1048586
- transactionZKServers = "localhost"
- transactionZKPort = 2181
- transactionZKRoot = "/consumers"
- transactionStateUpdateMS = 2000
- deserializerClass = "org.apache.eagle.datastream.storm.JsonMessageDeserializer"
- schema = "metricStreamSchema"
- }
-
- KafkaSink.metricStore {
- schema = "metricStreamSchema"
- parallism = 1000
- topic = "metric_event_2"
- zkConnection = "localhost:2181"
- zkConnectionTimeoutMS = 15000
- consumerGroupId = "Consumer"
- fetchSize = 1048586
- transactionZKServers = "localhost"
- transactionZKPort = 2181
- transactionZKRoot = "/consumers"
- transactionStateUpdateMS = 2000
- deserializerClass = "org.apache.eagle.datastream.storm.JsonMessageDeserializer"
- }
-
- Alert.alert {
-// upStreamNames = [metricStream_1,metricStream_2]
- alertExecutorId = defaultAlertExecutor
- }
-
-// aggregator.aggreator {
-// executor = "aggreationExecutor"
-// }
-
- metricStream_1|metricStream_2 -> alert {
- group = shuffle
- }
-
- metricStream_1|metricStream_2 -> metricStore {
- group = shuffle
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_2.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_2.conf b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_2.conf
deleted file mode 100644
index 5e3561a..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_2.conf
+++ /dev/null
@@ -1,93 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing software
-# distributed under the License is distributed on an "AS IS" BASIS
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-{
- config {
- envContextConfig {
- "env" : "storm"
- "mode" : "local"
- "topologyName" : "dsl-based-topology"
- }
- eagleProps {
- "site" : "sandbox"
- "application": "eventSource"
- "dataJoinPollIntervalSec" : 30
- "mailHost" : "mail.host.com"
- "mailSmtpPort":"25"
- "mailDebug" : "true"
- "eagleService": {
- "host": "localhost"
- "port": 38080
- "username": "admin"
- "password": "secret"
- }
- }
- dynamicConfigSource {
- "enabled" : true
- "initDelayMillis" : 0
- "delayMillis" : 30000
- }
- }
-
- dataflow {
- KafkaSource.metricStream_1 {
- parallism = 1000
- topic = "metric_event_1"
- zkConnection = "localhost:2181"
- zkConnectionTimeoutMS = 15000
- consumerGroupId = "Consumer"
- fetchSize = 1048586
- transactionZKServers = "localhost"
- transactionZKPort = 2181
- transactionZKRoot = "/consumers"
- transactionStateUpdateMS = 2000
- deserializerClass = "org.apache.eagle.datastream.storm.JsonMessageDeserializer"
- }
-
- KafkaSource.metricStream_2 {
- parallism = 1000
- topic = "metric_event_2"
- zkConnection = "localhost:2181"
- zkConnectionTimeoutMS = 15000
- consumerGroupId = "Consumer"
- fetchSize = 1048586
- transactionZKServers = "localhost"
- transactionZKPort = 2181
- transactionZKRoot = "/consumers"
- transactionStateUpdateMS = 2000
- deserializerClass = "org.apache.eagle.datastream.storm.JsonMessageDeserializer"
- }
-
- KafkaSource.metricStream_3{
- parallism = 1000
- topic = "metric_event_3"
- zkConnection = "localhost:2181"
- zkConnectionTimeoutMS = 15000
- consumerGroupId = "Consumer"
- fetchSize = 1048586
- transactionZKServers = "localhost"
- transactionZKPort = 2181
- transactionZKRoot = "/consumers"
- transactionStateUpdateMS = 2000
- deserializerClass = "org.apache.eagle.datastream.storm.JsonMessageDeserializer"
- }
-
- Console.printer {}
-
- metricStream_1|metricStream_2|metricStream_3 -> printer {
- grouping = shuffle
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_3.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_3.conf b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_3.conf
deleted file mode 100644
index 9dc7ce3..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_3.conf
+++ /dev/null
@@ -1,152 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing software
-# distributed under the License is distributed on an "AS IS" BASIS
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-{
- config {
- envContextConfig {
- "env" : "storm"
- "mode" : "local"
- "topologyName" : "dsl-based-topology"
- "parallelismConfig" : {
- "kafkaMsgConsumer" : 1
- }
- }
- alertExecutorConfigs {
- defaultAlertExecutor {
- "parallelism" : 1
- "partitioner" : "org.apache.eagle.policy.DefaultPolicyPartitioner"
- "needValidation" : "true"
- }
- }
- eagleProps {
- "site" : "sandbox"
- "application": "HADOOP"
- "dataJoinPollIntervalSec" : 30
- "mailHost" : "some.mail.server"
- "mailSmtpPort":"25"
- "mailDebug" : "true"
- "eagleService": {
- "host": "localhost"
- "port": 38080
- "username": "admin"
- "password": "secret"
- }
- }
- dynamicConfigSource {
- "enabled" : true
- "initDelayMillis" : 0
- "delayMillis" : 30000
- }
- }
-
- schema {
-// JmxStreamOne {
-// attributes {
-// metric: string
-// value: double
-// timestamp: long
-// }
-// alertExecutorId = [defaultAlertExecutor,anotherAlertExecutor]
-// }
- JmxStreamOne {
- metric: string
- value: double
- timestamp: long
- }
- }
-
- dataflow {
- KafkaSource.JmxStreamOne {
- parallism = 1000
- topic = "metric_event_1"
- zkConnection = "localhost:2181"
- zkConnectionTimeoutMS = 15000
- consumerGroupId = "Consumer"
- fetchSize = 1048586
- transactionZKServers = "localhost"
- transactionZKPort = 2181
- transactionZKRoot = "/consumers"
- transactionStateUpdateMS = 2000
- deserializerClass = "org.apache.eagle.datastream.storm.JsonMessageDeserializer"
- }
-
- KafkaSource.JmxStreamTwo {
- parallism = 1000
- topic = "metric_event_2"
- zkConnection = "localhost:2181"
- zkConnectionTimeoutMS = 15000
- consumerGroupId = "Consumer"
- fetchSize = 1048586
- transactionZKServers = "localhost"
- transactionZKPort = 2181
- transactionZKRoot = "/consumers"
- transactionStateUpdateMS = 2000
- deserializerClass = "org.apache.eagle.datastream.storm.JsonMessageDeserializer"
- }
-
- KafkaSource.JmxStreamThree{
- parallism = 1000
- topic = "metric_event_3"
- zkConnection = "localhost:2181"
- zkConnectionTimeoutMS = 15000
- consumerGroupId = "Consumer"
- fetchSize = 1048586
- transactionZKServers = "localhost"
- transactionZKPort = 2181
- transactionZKRoot = "/consumers"
- transactionStateUpdateMS = 2000
- deserializerClass = "org.apache.eagle.datastream.storm.JsonMessageDeserializer"
- }
-
- Console.printer {
- format = "%s"
- }
-
- KafkaSink.metricStore {
- topic = "metric_event_persist"
- }
-
-// KafkaSink.alertStore {
-// "topic" = "alert_persist"
-// "bootstrap.servers" = "localhost:6667"
-// }
-
- Alert.alert {
- inputs = [JmxStreamOne,JmxStreamTwo,JmxStreamThree]
-
- upStreamNames = [JmxStreamOne,JmxStreamTwo,JmxStreamThree]
- alertExecutorId = defaultAlertExecutor
- }
-
-// Aggregator.aggreator {
-// upStreamNames = []
-// analyzerId = ""
-// cepQl = ""
-// strategy = ""
-// }
-
- JmxStreamOne|JmxStreamTwo|JmxStreamThree -> alert {
- grouping = shuffle
- }
-
- JmxStreamOne|JmxStreamTwo|JmxStreamThree -> metricStore {
- grouping = shuffle
- }
-
- JmxStreamOne|JmxStreamTwo|JmxStreamThree -> printer {
- grouping = shuffle
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_4.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_4.conf b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_4.conf
deleted file mode 100644
index 9c35456..0000000
--- a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/test/resources/pipeline_4.conf
+++ /dev/null
@@ -1,125 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing software
-# distributed under the License is distributed on an "AS IS" BASIS
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-{
- config {
- envContextConfig {
- "env" : "storm"
- "mode" : "local"
- "topologyName" : "dsl-topology"
- "parallelismConfig" : {
- "kafkaMsgConsumer" : 1
- },
- }
- alertExecutorConfigs {
- defaultAlertExecutor {
- "parallelism" : 1
- "partitioner" : "org.apache.eagle.policy.DefaultPolicyPartitioner"
- "needValidation" : "true"
- }
- }
- eagleProps {
- "site" : "sandbox"
- "application": "HADOOP"
- }
- }
-
- dataflow {
- KafkaSource.JmxStreamOne {
- parallism = 1000
- topic = "metric_event_1"
- zkConnection = "localhost:2181"
- zkConnectionTimeoutMS = 15000
- consumerGroupId = "Consumer"
- fetchSize = 1048586
- transactionZKServers = "localhost"
- transactionZKPort = 2181
- transactionZKRoot = "/consumers"
- transactionStateUpdateMS = 2000
- deserializerClass = "org.apache.eagle.datastream.storm.JsonMessageDeserializer"
- }
-
- KafkaSource.JmxStreamTwo {
- parallism = 1000
- topic = "metric_event_2"
- zkConnection = "localhost:2181"
- zkConnectionTimeoutMS = 15000
- consumerGroupId = "Consumer"
- fetchSize = 1048586
- transactionZKServers = "localhost"
- transactionZKPort = 2181
- transactionZKRoot = "/consumers"
- transactionStateUpdateMS = 2000
- deserializerClass = "org.apache.eagle.datastream.storm.JsonMessageDeserializer"
- }
-
- KafkaSource.JmxStreamThree{
- parallism = 1000
- topic = "metric_event_3"
- zkConnection = "localhost:2181"
- zkConnectionTimeoutMS = 15000
- consumerGroupId = "Consumer"
- fetchSize = 1048586
- transactionZKServers = "localhost"
- transactionZKPort = 2181
- transactionZKRoot = "/consumers"
- transactionStateUpdateMS = 2000
- deserializerClass = "org.apache.eagle.datastream.storm.JsonMessageDeserializer"
- }
-
- Console.printer {
- format = "%s"
- }
-
- KafkaSink.metricStore {
- topic = "metric_event_persist"
- }
-
-// KafkaSink.aggSink {
-// topic = "metric_agg_persist"
-// }
-
- Alert.defaultAlertExecutor {
- // upStreamNames = [JmxStreamOne,JmxStreamTwo,JmxStreamThree]
- // alertExecutorId = defaultAlertExecutor
- }
-
- Aggregator.Aggregator{ sql = """
- define stream JmxStreamOne(eagleAlertContext object, timestamp long, metric string, value double);
- @info(name = "query")
- from JmxStreamOne[value > 100.0] select * insert into outputStream;
- """}
-
-
- JmxStreamOne -> Aggregator {}
-
- Aggregator -> printer {}
-
-// Aggregator -> aggregatedSink{
-// grouping = shuffle
-// }
- JmxStreamOne|JmxStreamTwo|JmxStreamThree -> defaultAlertExecutor {
- grouping = shuffle
- }
-
- JmxStreamOne|JmxStreamTwo|JmxStreamThree -> metricStore {
- grouping = shuffle
- }
-
- JmxStreamOne|JmxStreamTwo|JmxStreamThree -> printer {
- grouping = shuffle
- }
- }
-}
\ No newline at end of file