You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2016/06/02 07:07:46 UTC
[07/46] incubator-eagle git commit: [EAGLE-325] Initialize next-gen
alert engine code on branch-0.5
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/testStreamRouterBoltSpec.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/testStreamRouterBoltSpec.json b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/testStreamRouterBoltSpec.json
deleted file mode 100644
index f4e72bf..0000000
--- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/testStreamRouterBoltSpec.json
+++ /dev/null
@@ -1,123 +0,0 @@
-{
- "version": null,
- "topologyName": "testTopology",
- "routerSpecs": [
- {
- "streamId": "testTopic3Stream",
- "partition": {
- "streamId": "testTopic3Stream",
- "type": "GROUPBY",
- "columns": [
- "value"
- ],
- "sortSpec": {
- "windowPeriod": "PT10S",
- "windowMargin": 1000
- }
- },
- "targetQueue": [
- {
- "partition": {
- "streamId": "testTopic3Stream",
- "type": "GROUPBY",
- "columns": [
- "value"
- ],
- "sortSpec": {
- "windowPeriod": "PT10S",
- "windowMargin": 1000
- }
- },
- "workers": [
- {
- "topologyName": "testTopology",
- "boltId": "alertBolt0"
- },
- {
- "topologyName": "testTopology",
- "boltId": "alertBolt1"
- }
- ]
- }
- ]
- },
- {
- "streamId": "testTopic4Stream",
- "partition": {
- "streamId": "testTopic4Stream",
- "type": "GROUPBY",
- "columns": [
- "value"
- ],
- "sortSpec": {
- "windowPeriod": "PT10S",
- "windowMargin": 1000
- }
- },
- "targetQueue": [
- {
- "partition": {
- "streamId": "testTopic4Stream",
- "type": "GROUPBY",
- "columns": [
- "value"
- ],
- "sortSpec": {
- "windowPeriod": "PT10S",
- "windowMargin": 1000
- }
- },
- "workers": [
- {
- "topologyName": "testTopology",
- "boltId": "alertBolt0"
- },
- {
- "topologyName": "testTopology",
- "boltId": "alertBolt1"
- }
- ]
- }
- ]
- },
- {
- "streamId": "testTopic5Stream",
- "partition": {
- "streamId": "testTopic5Stream",
- "type": "GROUPBY",
- "columns": [
- "value"
- ],
- "sortSpec": {
- "windowPeriod": "PT10S",
- "windowMargin": 1000
- }
- },
- "targetQueue": [
- {
- "partition": {
- "streamId": "testTopic5Stream",
- "type": "GROUPBY",
- "columns": [
- "value"
- ],
- "sortSpec": {
- "windowPeriod": "PT10S",
- "windowMargin": 1000
- }
- },
- "workers": [
- {
- "topologyName": "testTopology",
- "boltId": "alertBolt0"
- },
- {
- "topologyName": "testTopology",
- "boltId": "alertBolt1"
- }
- ]
- }
- ]
- }
- ]
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/topic.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/topic.json b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/topic.json
deleted file mode 100644
index b49d6ad..0000000
--- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/topic.json
+++ /dev/null
@@ -1 +0,0 @@
-nn_jmx_metric_sandbox
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/topologies.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/topologies.json b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/topologies.json
deleted file mode 100644
index 411cc48..0000000
--- a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/topologies.json
+++ /dev/null
@@ -1,31 +0,0 @@
-[
-{
- "name": "alertUnitTopology_1",
- "numOfSpout":1,
- "numOfAlertBolt": 10,
- "numOfGroupBolt": 4,
- "spoutId": "alertEngineSpout",
- "groupNodeIds" : [
- "streamRouterBolt0",
- "streamRouterBolt1",
- "streamRouterBolt2",
- "streamRouterBolt3"
- ],
- "alertBoltIds": [
- "alertBolt0",
- "alertBolt1",
- "alertBolt2",
- "alertBolt3",
- "alertBolt4",
- "alertBolt5",
- "alertBolt6",
- "alertBolt7",
- "alertBolt8",
- "alertBolt9"
- ],
- "pubBoltId" : "alertPublishBolt",
- "spoutParallelism": 1,
- "groupParallelism": 1,
- "alertParallelism": 1
-}
-]
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-engine/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/pom.xml b/eagle-core/eagle-alert/alert/alert-engine/pom.xml
deleted file mode 100644
index 4e68b4c..0000000
--- a/eagle-core/eagle-alert/alert/alert-engine/pom.xml
+++ /dev/null
@@ -1,110 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!-- ~ /* ~ * Licensed to the Apache Software Foundation (ASF) under one
- or more ~ * contributor license agreements. See the NOTICE file distributed
- with ~ * this work for additional information regarding copyright ownership.
- ~ * The ASF licenses this file to You under the Apache License, Version 2.0
- ~ * (the "License"); you may not use this file except in compliance with
- ~ * the License. You may obtain a copy of the License at ~ * ~ * http://www.apache.org/licenses/LICENSE-2.0
- ~ * ~ * Unless required by applicable law or agreed to in writing, software
- ~ * distributed under the License is distributed on an "AS IS" BASIS, ~ *
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ~ * See the License for the specific language governing permissions and ~
- * limitations under the License. ~ */ -->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>io.sherlock</groupId>
- <artifactId>alert-parent</artifactId>
- <version>0.0.1-SNAPSHOT</version>
- </parent>
-
- <artifactId>alert-engine-parent</artifactId>
- <packaging>pom</packaging>
-
- <modules>
- <module>alert-engine-base</module>
- </modules>
-
- <dependencies>
- <dependency>
- <groupId>io.sherlock</groupId>
- <artifactId>alert-common</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.storm</groupId>
- <artifactId>storm-kafka</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.storm</groupId>
- <artifactId>storm-core</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>${kafka.artifact.id}</artifactId>
- </dependency>
- <dependency>
- <groupId>com.sun.jersey</groupId>
- <artifactId>jersey-client</artifactId>
- </dependency>
- <dependency>
- <groupId>org.codehaus.jackson</groupId>
- <artifactId>jackson-jaxrs</artifactId>
- </dependency>
- <dependency>
- <groupId>com.netflix.archaius</groupId>
- <artifactId>archaius-core</artifactId>
- </dependency>
- <dependency>
- <groupId>org.wso2.siddhi</groupId>
- <artifactId>siddhi-core</artifactId>
- </dependency>
- <dependency>
- <groupId>org.wso2.siddhi</groupId>
- <artifactId>siddhi-query-api</artifactId>
- </dependency>
- <dependency>
- <groupId>org.wso2.siddhi</groupId>
- <artifactId>siddhi-query-compiler</artifactId>
- </dependency>
- <dependency>
- <groupId>org.wso2.siddhi</groupId>
- <artifactId>siddhi-extension-regex</artifactId>
- </dependency>
- <dependency>
- <groupId>org.wso2.siddhi</groupId>
- <artifactId>siddhi-extension-string</artifactId>
- </dependency>
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka-clients</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>com.101tec</groupId>
- <artifactId>zkclient</artifactId>
- </dependency>
- <dependency>
- <groupId>joda-time</groupId>
- <artifactId>joda-time</artifactId>
- </dependency>
- <!--<dependency>-->
- <!--<groupId>org.slf4j</groupId>-->
- <!--<artifactId>slf4j-log4j12</artifactId>-->
- <!--</dependency>-->
- <!--<dependency>-->
- <!--<groupId>org.slf4j</groupId>-->
- <!--<artifactId>slf4j-api</artifactId>-->
- <!--</dependency>-->
- <dependency>
- <groupId>com.typesafe</groupId>
- <artifactId>config</artifactId>
- </dependency>
- </dependencies>
-</project>
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata-service/.gitignore
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata-service/.gitignore b/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata-service/.gitignore
deleted file mode 100644
index 1dd3331..0000000
--- a/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata-service/.gitignore
+++ /dev/null
@@ -1,2 +0,0 @@
-/target/
-/target/
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata-service/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata-service/pom.xml b/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata-service/pom.xml
deleted file mode 100644
index 83749b5..0000000
--- a/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata-service/pom.xml
+++ /dev/null
@@ -1,113 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!-- ~ /* ~ * Licensed to the Apache Software Foundation (ASF) under one
- or more ~ * contributor license agreements. See the NOTICE file distributed
- with ~ * this work for additional information regarding copyright ownership.
- ~ * The ASF licenses this file to You under the Apache License, Version 2.0
- ~ * (the "License"); you may not use this file except in compliance with
- ~ * the License. You may obtain a copy of the License at ~ * ~ * http://www.apache.org/licenses/LICENSE-2.0
- ~ * ~ * Unless required by applicable law or agreed to in writing, software
- ~ * distributed under the License is distributed on an "AS IS" BASIS, ~ *
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ~ * See the License for the specific language governing permissions and ~
- * limitations under the License. ~ */ -->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>org.apache.eagle</groupId>
- <artifactId>alert-metadata-parent</artifactId>
- <version>0.0.1-SNAPSHOT</version>
- </parent>
-
- <artifactId>alert-metadata-service</artifactId>
- <packaging>war</packaging>
-
- <dependencies>
- <!-- Storm depends on org.ow2.asm:asm:4.0 -->
- <!-- Jersey depends on asm:asm:3.0 -->
- <dependency>
- <groupId>org.apache.eagle</groupId>
- <artifactId>alert-engine-base</artifactId>
- <version>${project.version}</version>
- <exclusions>
- <exclusion>
- <groupId>org.ow2.asm</groupId>
- <artifactId>asm</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.apache.eagle</groupId>
- <artifactId>alert-metadata</artifactId>
- <version>${project.version}</version>
- </dependency>
-
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>com.sun.jersey</groupId>
- <artifactId>jersey-server</artifactId>
- <!--<exclusions> -->
- <!--<exclusion> -->
- <!--<groupId>asm</groupId> -->
- <!--<artifactId>asm</artifactId> -->
- <!--</exclusion> -->
- <!--</exclusions> -->
- </dependency>
- <dependency>
- <groupId>com.sun.jersey.contribs</groupId>
- <artifactId>jersey-multipart</artifactId>
- </dependency>
- <dependency>
- <groupId>org.codehaus.jackson</groupId>
- <artifactId>jackson-mapper-asl</artifactId>
- </dependency>
- <dependency>
- <groupId>org.codehaus.jackson</groupId>
- <artifactId>jackson-jaxrs</artifactId>
- </dependency>
- <dependency>
- <groupId>org.codehaus.jackson</groupId>
- <artifactId>jackson-xc</artifactId>
- </dependency>
- <dependency>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-databind</artifactId>
- </dependency>
- <dependency>
- <groupId>com.typesafe</groupId>
- <artifactId>config</artifactId>
- </dependency>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.tomcat.embed</groupId>
- <artifactId>tomcat-embed-core</artifactId>
- </dependency>
- <dependency>
- <groupId>io.swagger</groupId>
- <artifactId>swagger-jaxrs</artifactId>
- </dependency>
- <dependency>
- <groupId>com.sun.jersey</groupId>
- <artifactId>jersey-servlet</artifactId>
- </dependency>
- </dependencies>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.eclipse.jetty</groupId>
- <artifactId>jetty-maven-plugin</artifactId>
- <configuration>
- <scanIntervalSeconds>5</scanIntervalSeconds>
- </configuration>
- </plugin>
- </plugins>
- </build>
-</project>
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/alert/metadata/resource/MetadataResource.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/alert/metadata/resource/MetadataResource.java b/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/alert/metadata/resource/MetadataResource.java
deleted file mode 100644
index 94ec767..0000000
--- a/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/alert/metadata/resource/MetadataResource.java
+++ /dev/null
@@ -1,225 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.metadata.resource;
-
-import java.util.List;
-
-import javax.ws.rs.Consumes;
-import javax.ws.rs.DELETE;
-import javax.ws.rs.GET;
-import javax.ws.rs.POST;
-import javax.ws.rs.Path;
-import javax.ws.rs.PathParam;
-import javax.ws.rs.Produces;
-
-import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
-import org.apache.eagle.alert.coordination.model.ScheduleState;
-import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment;
-import org.apache.eagle.alert.coordination.model.internal.Topology;
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-import org.apache.eagle.alert.engine.coordinator.Publishment;
-import org.apache.eagle.alert.engine.coordinator.PublishmentType;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamingCluster;
-import org.apache.eagle.alert.metadata.impl.MetadataDaoFactory;
-
-/**
- * @since Apr 11, 2016
- *
- */
-@Path("/metadata")
-@Produces("application/json")
-@Consumes("application/json")
-public class MetadataResource {
-
- private IMetadataDao dao = MetadataDaoFactory.getInstance().getMetadataDao();
-
- @Path("/clusters")
- @GET
- public List<StreamingCluster> listClusters() {
- return dao.listClusters();
- }
-
- @Path("/clear")
- @POST
- public OpResult clear() {
- return dao.clear();
- }
-
- @Path("/export")
- @GET
- public Models export() {
- return dao.export();
- }
-
- @Path("/import")
- @GET
- public OpResult importModels(Models model) {
- return dao.importModels(model);
- }
-
- @Path("/clusters")
- @POST
- public OpResult addCluster(StreamingCluster cluster) {
- return dao.addCluster(cluster);
- }
-
- @Path("/clusters/{clusterId}")
- @DELETE
- public OpResult removeCluster(@PathParam("clusterId") String clusterId) {
- return dao.removeCluster(clusterId);
- }
-
- @Path("/streams")
- @GET
- public List<StreamDefinition> listStreams() {
- return dao.listStreams();
- }
-
- @Path("/streams")
- @POST
- public OpResult createStream(StreamDefinition stream) {
- return dao.createStream(stream);
- }
-
- @Path("/streams/{streamId}")
- @DELETE
- public OpResult removeStream(@PathParam("streamId") String streamId) {
- return dao.removeStream(streamId);
- }
-
- @Path("/datasources")
- @GET
- public List<Kafka2TupleMetadata> listDataSources() {
- return dao.listDataSources();
- }
-
- @Path("/datasources")
- @POST
- public OpResult addDataSource(Kafka2TupleMetadata dataSource) {
- return dao.addDataSource(dataSource);
- }
-
- @Path("/datasources/{datasourceId}")
- @DELETE
- public OpResult removeDataSource(@PathParam("datasourceId") String datasourceId) {
- return dao.removeDataSource(datasourceId);
- }
-
- @Path("/policies")
- @GET
- public List<PolicyDefinition> listPolicies() {
- return dao.listPolicies();
- }
-
- @Path("/policies")
- @POST
- public OpResult addPolicy(PolicyDefinition policy) {
- return dao.addPolicy(policy);
- }
-
- @Path("/policies/{policyId}")
- @DELETE
- public OpResult removePolicy(@PathParam("policyId") String policyId) {
- return dao.removePolicy(policyId);
- }
-
- @Path("/publishments")
- @GET
- public List<Publishment> listPublishment() {
- return dao.listPublishment();
- }
-
- @Path("/publishments")
- @POST
- public OpResult addPublishment(Publishment publishment) {
- return dao.addPublishment(publishment);
- }
-
- @Path("/publishments/{pubId}")
- @DELETE
- public OpResult removePublishment(@PathParam("pubId") String pubId) {
- return dao.removePublishment(pubId);
- }
-
- @Path("/publishmentTypes")
- @GET
- public List<PublishmentType> listPublishmentType() {
- return dao.listPublishmentType();
- }
-
- @Path("/publishmentTypes")
- @POST
- public OpResult addPublishmentType(PublishmentType publishmentType) {
- return dao.addPublishmentType(publishmentType);
- }
-
- @Path("/publishmentTypes/{pubType}")
- @DELETE
- public OpResult removePublishmentType(@PathParam("pubType") String pubType) {
- return dao.removePublishmentType(pubType);
- }
-
- @Path("/schedulestates/{versionId}")
- @GET
- public ScheduleState listScheduleState(@PathParam("versionId") String versionId) {
- return dao.getScheduleState(versionId);
- }
-
- @Path("/schedulestates")
- @GET
- public ScheduleState latestScheduleState() {
- return dao.getScheduleState();
- }
-
- @Path("/schedulestates")
- @POST
- public OpResult addScheduleState(ScheduleState state) {
- return dao.addScheduleState(state);
- }
-
- @Path("/assignments")
- @GET
- public List<PolicyAssignment> listAssignmenets() {
- return dao.listAssignments();
- }
-
- @Path("/assignments")
- @POST
- public OpResult addAssignmenet(PolicyAssignment pa) {
- return dao.addAssignment(pa);
- }
-
- @Path("/topologies")
- @GET
- public List<Topology> listTopologies() {
- return dao.listTopologies();
- }
-
- @Path("/topologies")
- @POST
- public OpResult addTopology(Topology t) {
- return dao.addTopology(t);
- }
-
- @Path("/topologies/{topologyName}")
- @DELETE
- public OpResult removeTopology(@PathParam("topologyName") String topologyName) {
- return dao.removeTopology(topologyName);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/topology/resource/TopologyMgmtResource.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/topology/resource/TopologyMgmtResource.java b/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/topology/resource/TopologyMgmtResource.java
deleted file mode 100644
index 437068f..0000000
--- a/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/topology/resource/TopologyMgmtResource.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.eagle.service.topology.resource;
-
-import org.apache.eagle.alert.metadata.resource.OpResult;
-import org.apache.eagle.service.topology.resource.impl.TopologyMgmtResourceImpl;
-import org.apache.eagle.service.topology.resource.impl.TopologyStatus;
-
-import javax.ws.rs.*;
-
-import java.util.List;
-
-/**
- * @since May 5, 2016
- *
- */
-@Path("/alert")
-@Produces("application/json")
-@Consumes("application/json")
-public class TopologyMgmtResource {
- private TopologyMgmtResourceImpl topologyManager = new TopologyMgmtResourceImpl();
-
- @POST
- @Path("/topologies/{topologyName}/start")
- public OpResult startTopology(@PathParam("topologyName") String topologyName) {
- OpResult result = new OpResult();
- try {
- topologyManager.startTopology(topologyName);
- } catch (Exception ex) {
- result.message = ex.toString();
- }
- return result;
- }
-
- @POST
- @Path("/topologies/{topologyName}/stop")
- public OpResult stopTopology(@PathParam("topologyName") String topologyName) {
- OpResult result = new OpResult();
- try {
- topologyManager.stopTopology(topologyName);
- } catch (Exception ex) {
- result.message = ex.toString();
- }
- return result;
- }
-
- @GET
- @Path("/topologies")
- public List<TopologyStatus> getTopologies() throws Exception {
- return topologyManager.getTopologies();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/topology/resource/impl/TopologyMgmtResourceHelper.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/topology/resource/impl/TopologyMgmtResourceHelper.java b/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/topology/resource/impl/TopologyMgmtResourceHelper.java
deleted file mode 100644
index 3876116..0000000
--- a/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/topology/resource/impl/TopologyMgmtResourceHelper.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.eagle.service.topology.resource.impl;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.util.List;
-import java.util.Optional;
-
-public class TopologyMgmtResourceHelper {
- private static final Logger LOG = LoggerFactory.getLogger(TopologyMgmtResourceHelper.class);
-
- public static <T> Optional<T> findById(List<T> clusters, String id) {
- Optional<T> OptionValue = clusters.stream().filter(o -> getName(o).equalsIgnoreCase(id)).findFirst();
- return OptionValue;
- }
-
- public static <T> String getName(T t) {
- try {
- Method m = t.getClass().getMethod("getName");
- return (String) m.invoke(t);
- } catch (NoSuchMethodException | SecurityException | InvocationTargetException | IllegalAccessException
- | IllegalArgumentException e) {
- LOG.error(" getName not found on given class :" + t.getClass().getName());
- }
- throw new RuntimeException(String.format("no getName() found on target class %s for matching", t.getClass()
- .getName()));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/topology/resource/impl/TopologyMgmtResourceImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/topology/resource/impl/TopologyMgmtResourceImpl.java b/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/topology/resource/impl/TopologyMgmtResourceImpl.java
deleted file mode 100644
index ff9a65a..0000000
--- a/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/topology/resource/impl/TopologyMgmtResourceImpl.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.eagle.service.topology.resource.impl;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-
-import org.apache.eagle.alert.coordination.model.internal.Topology;
-import org.apache.eagle.alert.engine.UnitTopologyMain;
-import org.apache.eagle.alert.engine.runner.UnitTopologyRunner;
-import org.apache.eagle.alert.engine.coordinator.StreamingCluster;
-import org.apache.eagle.alert.metadata.impl.MetadataDaoFactory;
-import org.apache.eagle.alert.metadata.resource.IMetadataDao;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import backtype.storm.Config;
-import backtype.storm.StormSubmitter;
-import backtype.storm.generated.Nimbus;
-import backtype.storm.generated.StormTopology;
-import backtype.storm.generated.TopologySummary;
-import backtype.storm.utils.NimbusClient;
-import backtype.storm.utils.Utils;
-
-import com.typesafe.config.ConfigFactory;
-
-public class TopologyMgmtResourceImpl {
- private static final IMetadataDao dao = MetadataDaoFactory.getInstance().getMetadataDao();
- @SuppressWarnings("unused")
- private static final Logger LOG = LoggerFactory.getLogger(TopologyMgmtResourceImpl.class);
-
- private final String DEFAULT_NIMBUS_HOST = "sandbox.hortonworks.com";
- private final Integer DEFAULT_NIMBUS_THRIFT_PORT = 6627;
- private final String STORM_JAR_PATH = "topology.stormJarPath";
-
-
- @SuppressWarnings({ "rawtypes", "unchecked" })
- private Map getStormConf(List<StreamingCluster> clusters, String clusterId) throws Exception {
- Map<String, Object> storm_conf = Utils.readStormConfig();
- if(clusterId == null) {
- storm_conf.put(Config.NIMBUS_HOST, DEFAULT_NIMBUS_HOST);
- storm_conf.put(Config.NIMBUS_THRIFT_PORT, DEFAULT_NIMBUS_THRIFT_PORT);
- } else {
- if(clusters == null) clusters = dao.listClusters();
- Optional<StreamingCluster> scOp = TopologyMgmtResourceHelper.findById(clusters, clusterId);
- StreamingCluster cluster;
- if(scOp.isPresent()) {
- cluster = scOp.get();
- } else {
- throw new Exception("Fail to find cluster: " + clusterId);
- }
- storm_conf.put(Config.NIMBUS_HOST, cluster.getDeployments().getOrDefault(StreamingCluster.NIMBUS_HOST, DEFAULT_NIMBUS_HOST));
- storm_conf.put(Config.NIMBUS_THRIFT_PORT, Integer.valueOf(cluster.getDeployments().get(StreamingCluster.NIMBUS_THRIFT_PORT)));
- }
- return storm_conf;
- }
-
- private void createTopologyHelper(Topology topologyDef, com.typesafe.config.Config config) {
- int numOfSpoutTasks = config.getInt(UnitTopologyRunner.SPOUT_TASK_NUM);
- int numOfRouterBolts = config.getInt(UnitTopologyRunner.ROUTER_TASK_NUM);
- int numOfAlertBolts = config.getInt(UnitTopologyRunner.ALERT_TASK_NUM);
- int numOfPublishTasks = config.getInt(UnitTopologyRunner.PUBLISH_TASK_NUM);
- topologyDef.setSpoutId(UnitTopologyRunner.spoutName);
- topologyDef.setPubBoltId(UnitTopologyRunner.alertPublishBoltName);
- topologyDef.setNumOfSpout(numOfSpoutTasks);
- topologyDef.setNumOfGroupBolt(numOfRouterBolts);
- topologyDef.setNumOfAlertBolt(numOfAlertBolts);
- topologyDef.setNumOfPublishBolt(numOfPublishTasks);
- dao.addTopology(topologyDef);
- }
-
- private StormTopology createTopology(Topology topologyDef) {
- com.typesafe.config.Config topologyConf = ConfigFactory.load("topology-sample-definition.conf");
- String stormJarPath = "";
- if(topologyConf.hasPath(STORM_JAR_PATH)) {
- stormJarPath = topologyConf.getString(STORM_JAR_PATH);
- }
- System.setProperty("storm.jar", stormJarPath);
- createTopologyHelper(topologyDef, topologyConf);
- return UnitTopologyMain.createTopology(topologyConf);
- }
-
- public void startTopology(String topologyName) throws Exception {
- Optional<Topology> tdop = TopologyMgmtResourceHelper.findById(dao.listTopologies(), topologyName);
- Topology topologyDef;
- if(tdop.isPresent()) {
- topologyDef = tdop.get();
- } else {
- topologyDef = new Topology();
- topologyDef.setName(topologyName);
- }
- StormSubmitter.submitTopology(topologyName, getStormConf(null, topologyDef.getClusterName()), createTopology(topologyDef));
- }
-
- public void stopTopology(String topologyName) throws Exception {
- Optional<Topology> tdop = TopologyMgmtResourceHelper.findById(dao.listTopologies(), topologyName);
- Topology topologyDef;
- if(tdop.isPresent()) {
- topologyDef = tdop.get();
- } else {
- throw new Exception("Fail to find topology " + topologyName);
- }
- Nimbus.Client stormClient = NimbusClient.getConfiguredClient(getStormConf(null, topologyDef.getClusterName())).getClient();
- stormClient.killTopology(topologyName);
- }
-
- @SuppressWarnings({ "rawtypes", "unused" })
- private TopologySummary getTopologySummery(List<StreamingCluster> clusters, Topology topologyDef) throws Exception {
- Map storm_conf = getStormConf(clusters, topologyDef.getClusterName());
- Nimbus.Client stormClient = NimbusClient.getConfiguredClient(storm_conf).getClient();
- Optional<TopologySummary> tOp = stormClient.getClusterInfo().get_topologies().stream().filter(topology -> topology.get_name().equalsIgnoreCase(topologyDef.getName())).findFirst();
- if(tOp.isPresent()) {
- String id = tOp.get().get_id();
- //StormTopology stormTopology= stormClient.getTopology(id);
- return tOp.get();
- } else {
- return null;
- }
- }
-
- public List<TopologyStatus> getTopologies() throws Exception {
- List<Topology> topologyDefinitions = dao.listTopologies();
- List<StreamingCluster> clusters = dao.listClusters();
-
- List<TopologyStatus> topologies = new ArrayList<>();
- for(Topology topologyDef : topologyDefinitions) {
- TopologySummary topologySummary = getTopologySummery(clusters, topologyDef);
- if(topologySummary != null) {
- TopologyStatus t = new TopologyStatus();
- t.setName(topologySummary.get_name());
- t.setId(topologySummary.get_id());
- t.setState(topologySummary.get_status());
- t.setTopology(topologyDef);
- topologies.add(t);
- }
- }
- return topologies;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/topology/resource/impl/TopologyStatus.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/topology/resource/impl/TopologyStatus.java b/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/topology/resource/impl/TopologyStatus.java
deleted file mode 100644
index c3381d4..0000000
--- a/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/topology/resource/impl/TopologyStatus.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.eagle.service.topology.resource.impl;
-
-import org.apache.eagle.alert.coordination.model.internal.Topology;
-
-import java.util.HashMap;
-import java.util.Map;
-
-public class TopologyStatus {
- private String name;
- private String id;
- private String state;
- private Topology topology;
-
- private Map<String, Double> spoutLoad = new HashMap<>();
- private Map<String, Double> boltLoad = new HashMap<>();
-
- public String getName() {
- return name;
- }
-
- public void setName(String name) {
- this.name = name;
- }
-
- public String getId() {
- return id;
- }
-
- public void setId(String id) {
- this.id = id;
- }
-
- public String getState() {
- return state;
- }
-
- public void setState(String state) {
- this.state = state;
- }
-
- public Map<String, Double> getSpoutLoad() {
- return spoutLoad;
- }
-
- public void setSpoutLoad(Map<String, Double> spoutLoad) {
- this.spoutLoad = spoutLoad;
- }
-
- public Map<String, Double> getBoltLoad() {
- return boltLoad;
- }
-
- public void setBoltLoad(Map<String, Double> boltLoad) {
- this.boltLoad = boltLoad;
- }
-
- public Topology getTopology() {
- return topology;
- }
-
- public void setTopology(Topology topology) {
- this.topology = topology;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata-service/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata-service/src/main/resources/application.conf b/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata-service/src/main/resources/application.conf
deleted file mode 100644
index 0becbfc..0000000
--- a/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata-service/src/main/resources/application.conf
+++ /dev/null
@@ -1,21 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-{
- "datastore": {
- "metadataDao": "org.apache.eagle.service.alert.resource.impl.InMemMetadataDaoImpl",
- "connection": "localhost:27017"
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata-service/src/main/webapp/WEB-INF/web.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata-service/src/main/webapp/WEB-INF/web.xml b/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata-service/src/main/webapp/WEB-INF/web.xml
deleted file mode 100644
index ab7b019..0000000
--- a/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata-service/src/main/webapp/WEB-INF/web.xml
+++ /dev/null
@@ -1,81 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!-- ~ Licensed to the Apache Software Foundation (ASF) under one or more
- ~ contributor license agreements. See the NOTICE file distributed with ~
- this work for additional information regarding copyright ownership. ~ The
- ASF licenses this file to You under the Apache License, Version 2.0 ~ (the
- "License"); you may not use this file except in compliance with ~ the License.
- You may obtain a copy of the License at ~ ~ http://www.apache.org/licenses/LICENSE-2.0
- ~ ~ Unless required by applicable law or agreed to in writing, software ~
- distributed under the License is distributed on an "AS IS" BASIS, ~ WITHOUT
- WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ~ See the
- License for the specific language governing permissions and ~ limitations
- under the License. -->
-<web-app xmlns="http://java.sun.com/xml/ns/javaee" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://java.sun.com/xml/ns/javaee
- http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd"
- version="3.0">
- <welcome-file-list>
- <welcome-file>index.html</welcome-file>
- </welcome-file-list>
- <servlet>
- <servlet-name>Jersey Web Application</servlet-name>
- <servlet-class>com.sun.jersey.spi.container.servlet.ServletContainer</servlet-class>
- <init-param>
- <param-name>com.sun.jersey.config.property.packages</param-name>
- <param-value>io.swagger.jaxrs.json,io.swagger.jaxrs.listing,org.apache.eagle.service,org.codehaus.jackson.jaxrs</param-value>
- </init-param>
- <init-param>
- <param-name>com.sun.jersey.spi.container.ContainerRequestFilters</param-name>
- <param-value>com.sun.jersey.api.container.filter.GZIPContentEncodingFilter;com.sun.jersey.api.container.filter.PostReplaceFilter</param-value>
- </init-param>
- <init-param>
- <param-name>com.sun.jersey.spi.container.ContainerResponseFilters</param-name>
- <param-value>com.sun.jersey.api.container.filter.GZIPContentEncodingFilter</param-value>
- </init-param>
- <load-on-startup>1</load-on-startup>
- </servlet>
- <!-- Servlet for swagger initialization only, no URL mapping. -->
- <servlet>
- <servlet-name>swaggerConfig</servlet-name>
- <servlet-class>io.swagger.jaxrs.config.DefaultJaxrsConfig</servlet-class>
- <init-param>
- <param-name>api.version</param-name>
- <param-value>1.0.0</param-value>
- </init-param>
- <init-param>
- <param-name>swagger.api.basepath</param-name>
- <param-value>/api</param-value>
- </init-param>
- <load-on-startup>2</load-on-startup>
- </servlet>
-
- <servlet-mapping>
- <servlet-name>Jersey Web Application</servlet-name>
- <url-pattern>/api/*</url-pattern>
- </servlet-mapping>
- <filter>
- <filter-name>CorsFilter</filter-name>
- <!-- Ideally, should be tomcat embed core's CORSFilter. See @SimpleCORSFiler comments. -->
- <filter-class>org.apache.eagle.alert.resource.SimpleCORSFiler</filter-class>
- <init-param>
- <param-name>cors.allowed.origins</param-name>
- <param-value>*</param-value>
- </init-param>
- <init-param>
- <param-name>cors.allowed.headers</param-name>
- <param-value>Authorization,Origin, No-Cache, X-Requested-With, If-Modified-Since, Pragma, Last-Modified, Cache-Control, Expires, Content-Type, X-E4M-With, Accept</param-value>
- </init-param>
- <init-param>
- <param-name>cors.allowed.methods</param-name>
- <param-value>GET,POST,HEAD,OPTIONS,PUT,DELETE</param-value>
- </init-param>
- <init-param>
- <param-name>cors.support.credentials</param-name>
- <param-value>true</param-value>
- </init-param>
- </filter>
- <filter-mapping>
- <filter-name>CorsFilter</filter-name>
- <url-pattern>/*</url-pattern>
- </filter-mapping>
-</web-app>
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata-service/src/main/webapp/index.html
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata-service/src/main/webapp/index.html b/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata-service/src/main/webapp/index.html
deleted file mode 100644
index 5da5b32..0000000
--- a/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata-service/src/main/webapp/index.html
+++ /dev/null
@@ -1,18 +0,0 @@
-<!--
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- -->
-Hello, this is UMP alert metadata service. You are welcome!
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata-service/src/test/java/org/apache/eagle/service/topology/resource/impl/TopologyMgmtResourceImplTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata-service/src/test/java/org/apache/eagle/service/topology/resource/impl/TopologyMgmtResourceImplTest.java b/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata-service/src/test/java/org/apache/eagle/service/topology/resource/impl/TopologyMgmtResourceImplTest.java
deleted file mode 100644
index e46213e..0000000
--- a/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata-service/src/test/java/org/apache/eagle/service/topology/resource/impl/TopologyMgmtResourceImplTest.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.eagle.service.topology.resource.impl;
-
-import org.junit.Assert;
-import org.junit.Ignore;
-import org.junit.Test;
-
-import java.util.List;
-
-
-public class TopologyMgmtResourceImplTest {
- TopologyMgmtResourceImpl topologyManager = new TopologyMgmtResourceImpl();
- String topologyName = "testStartTopology";
-
- @Ignore
- @Test
- public void testStartTopology() throws Exception {
- topologyManager.startTopology(topologyName);
- Thread.sleep(10000);
- }
-
- @Ignore
- @Test
- public void testStopTopology() throws Exception {
- topologyManager.startTopology(topologyName);
- Thread.sleep(10000);
- topologyManager.stopTopology(topologyName);
- }
-
- @Ignore
- @Test
- public void testGetTopologies() throws Exception {
- topologyManager.startTopology(topologyName);
- Thread.sleep(10000);
- List<TopologyStatus> topologies = topologyManager.getTopologies();
- Assert.assertTrue(topologies.size() == 1);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata/.gitignore
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata/.gitignore b/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata/.gitignore
deleted file mode 100644
index 1dd3331..0000000
--- a/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata/.gitignore
+++ /dev/null
@@ -1,2 +0,0 @@
-/target/
-/target/
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata/pom.xml b/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata/pom.xml
deleted file mode 100644
index 3789dcd..0000000
--- a/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata/pom.xml
+++ /dev/null
@@ -1,52 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!-- ~ /* ~ * Licensed to the Apache Software Foundation (ASF) under one
- or more ~ * contributor license agreements. See the NOTICE file distributed
- with ~ * this work for additional information regarding copyright ownership.
- ~ * The ASF licenses this file to You under the Apache License, Version 2.0
- ~ * (the "License"); you may not use this file except in compliance with
- ~ * the License. You may obtain a copy of the License at ~ * ~ * http://www.apache.org/licenses/LICENSE-2.0
- ~ * ~ * Unless required by applicable law or agreed to in writing, software
- ~ * distributed under the License is distributed on an "AS IS" BASIS, ~ *
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ~ * See the License for the specific language governing permissions and ~
- * limitations under the License. ~ */ -->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>io.sherlock</groupId>
- <artifactId>alert-metadata-parent</artifactId>
- <version>0.0.1-SNAPSHOT</version>
- </parent>
-
- <artifactId>alert-metadata</artifactId>
- <packaging>jar</packaging>
-
- <dependencies>
- <!-- Storm depends on org.ow2.asm:asm:4.0 -->
- <!-- Jersey depends on asm:asm:3.0 -->
-
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>io.sherlock</groupId>
- <artifactId>alert-common</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.mongodb</groupId>
- <artifactId>mongo-java-driver</artifactId>
- <version>${mongodb.version}</version>
- </dependency>
- <dependency>
- <groupId>de.flapdoodle.embed</groupId>
- <artifactId>de.flapdoodle.embed.mongo</artifactId>
- <version>1.50.5</version>
- <scope>test</scope>
- </dependency>
- </dependencies>
-</project>
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/InMemMetadataDaoImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/InMemMetadataDaoImpl.java b/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/InMemMetadataDaoImpl.java
deleted file mode 100644
index 1773042..0000000
--- a/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/InMemMetadataDaoImpl.java
+++ /dev/null
@@ -1,333 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.metadata.impl;
-
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Optional;
-import java.util.SortedMap;
-import java.util.TreeMap;
-import java.util.function.Predicate;
-
-import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
-import org.apache.eagle.alert.coordination.model.ScheduleState;
-import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment;
-import org.apache.eagle.alert.coordination.model.internal.Topology;
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-import org.apache.eagle.alert.engine.coordinator.Publishment;
-import org.apache.eagle.alert.engine.coordinator.PublishmentType;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamingCluster;
-import org.apache.eagle.alert.metadata.resource.IMetadataDao;
-import org.apache.eagle.alert.metadata.resource.Models;
-import org.apache.eagle.alert.metadata.resource.OpResult;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.typesafe.config.Config;
-
-/**
- * In memory service for simple service start. Make all service API as
- * synchronized.
- *
- * @since Apr 11, 2016
- *
- */
-public class InMemMetadataDaoImpl implements IMetadataDao {
-
- private static final Logger LOG = LoggerFactory.getLogger(InMemMetadataDaoImpl.class);
-
- private List<StreamingCluster> clusters = new ArrayList<StreamingCluster>();
- private List<StreamDefinition> schemas = new ArrayList<StreamDefinition>();
- private List<Kafka2TupleMetadata> datasources = new ArrayList<Kafka2TupleMetadata>();
- private List<PolicyDefinition> policies = new ArrayList<PolicyDefinition>();
- private List<Publishment> publishments = new ArrayList<Publishment>();
- private List<PublishmentType> publishmentTypes = new ArrayList<PublishmentType>();
- private volatile int maxScheduleState = 100;
- private SortedMap<String, ScheduleState> scheduleStates = new TreeMap<String, ScheduleState>();
- private List<PolicyAssignment> assignments = new ArrayList<PolicyAssignment>();
- private List<Topology> topologies = new ArrayList<Topology>();
-
- public InMemMetadataDaoImpl(Config config) {
- }
-
- @Override
- public synchronized List<StreamingCluster> listClusters() {
- return clusters;
- }
-
- @Override
- public OpResult addCluster(final StreamingCluster cluster) {
- return addOrReplace(clusters, cluster);
- }
-
- private synchronized <T> OpResult addOrReplace(List<T> clusters, T paramT) {
- Optional<T> scOp = clusters.stream().filter(new Predicate<T>() {
- @Override
- public boolean test(T t) {
- if (getKey(t).equalsIgnoreCase(getKey(paramT))) {
- return true;
- }
- return false;
- }
- }).findFirst();
-
- OpResult result = new OpResult();
- // replace
- if (scOp.isPresent()) {
- clusters.remove(scOp.get());
- result.message = "replace the old one!";
- } else {
- result.message = "created new config!";
- }
- result.code = 200;
- clusters.add(paramT);
- return result;
- }
-
- public static <T> String getKey(T t) {
- if (t instanceof StreamDefinition) {
- return ((StreamDefinition) t).getStreamId();
- } else if (t instanceof PublishmentType) {
- return ((PublishmentType) t).getType();
- }
-
- try {
- Method m = t.getClass().getMethod("getName");
- return (String) m.invoke(t);
- } catch (NoSuchMethodException | SecurityException | InvocationTargetException | IllegalAccessException
- | IllegalArgumentException e) {
- LOG.error(" getName not found on given class :" + t.getClass().getName());
- }
- throw new RuntimeException(String.format("no getName() found on target class %s for matching", t.getClass()
- .getName()));
- }
-
- @SuppressWarnings("unchecked")
- private synchronized <T> OpResult remove(List<T> clusters, String id) {
- T[] matched = (T[]) clusters.stream().filter(new Predicate<T>() {
-
- @Override
- public boolean test(T t) {
- if (getKey(t).equalsIgnoreCase(id)) {
- return true;
- }
- return false;
- }
- }).toArray();
-
- OpResult result = new OpResult();
- result.code = 200;
- if (clusters.removeAll(Arrays.asList(matched))) {
- result.message = "removed configuration item succeed";
- } else {
- result.message = "no configuration item removed";
- }
- return result;
- }
-
- @Override
- public OpResult removeCluster(final String clusterId) {
- return remove(clusters, clusterId);
- }
-
- @Override
- public synchronized List<StreamDefinition> listStreams() {
- return schemas;
- }
-
- @Override
- public OpResult createStream(StreamDefinition stream) {
- return addOrReplace(schemas, stream);
- }
-
- @Override
- public OpResult removeStream(String streamId) {
- return remove(schemas, streamId);
- }
-
- @Override
- public synchronized List<Kafka2TupleMetadata> listDataSources() {
- return datasources;
- }
-
- @Override
- public OpResult addDataSource(Kafka2TupleMetadata dataSource) {
- return addOrReplace(datasources, dataSource);
- }
-
- @Override
- public OpResult removeDataSource(String datasourceId) {
- return remove(datasources, datasourceId);
- }
-
- @Override
- public synchronized List<PolicyDefinition> listPolicies() {
- return policies;
- }
-
- @Override
- public OpResult addPolicy(PolicyDefinition policy) {
- return addOrReplace(policies, policy);
- }
-
- @Override
- public OpResult removePolicy(String policyId) {
- return remove(policies, policyId);
- }
-
- @Override
- public synchronized List<Publishment> listPublishment() {
- return publishments;
- }
-
- @Override
- public OpResult addPublishment(Publishment publishment) {
- return addOrReplace(publishments, publishment);
- }
-
- @Override
- public OpResult removePublishment(String pubId) {
- return remove(publishments, pubId);
- }
-
- @Override
- public List<PublishmentType> listPublishmentType() {
- return publishmentTypes;
- }
-
- @Override
- public OpResult addPublishmentType(PublishmentType publishmentType) {
- return addOrReplace(publishmentTypes, publishmentType);
- }
-
- @Override
- public OpResult removePublishmentType(String pubType) {
- return remove(publishmentTypes, pubType);
- }
-
- @Override
- public ScheduleState getScheduleState(String versionId) {
- return scheduleStates.get(versionId);
- }
-
- @Override
- public OpResult addScheduleState(ScheduleState state) {
- // FIXME : might concurrent issue
- String toRemove = null;
- if (scheduleStates.size() > maxScheduleState) {
- toRemove = scheduleStates.firstKey();
- }
- scheduleStates.put(state.getVersion(), state);
- if (toRemove != null) {
- scheduleStates.remove(toRemove);
- }
-
- OpResult result = new OpResult();
- result.code = 200;
- result.message = "OK";
- return result;
- }
-
- @Override
- public ScheduleState getScheduleState() {
- if (scheduleStates.size() > 0) {
- return scheduleStates.get(scheduleStates.lastKey());
- }
- return null;
- }
-
- @Override
- public List<PolicyAssignment> listAssignments() {
- return assignments;
- }
-
- @Override
- public OpResult addAssignment(PolicyAssignment assignment) {
- OpResult result = new OpResult();
- result.code = 200;
- result.message = "OK";
- assignments.add(assignment);
- return result;
- }
-
- @Override
- public List<Topology> listTopologies() {
- return topologies;
- }
-
- @Override
- public OpResult addTopology(Topology t) {
- return addOrReplace(topologies, t);
- }
-
- @Override
- public OpResult removeTopology(String topologyName) {
- return remove(topologies, topologyName);
- }
-
- @Override
- public OpResult clear() {
- this.assignments.clear();
- this.clusters.clear();
- this.datasources.clear();
- this.policies.clear();
- this.publishments.clear();
- this.scheduleStates.clear();
- this.schemas.clear();
- this.topologies.clear();
- OpResult result = new OpResult();
- result.code = 200;
- result.message = "OK";
- return result;
- }
-
- @Override
- public Models export() {
- Models models = new Models();
- models.assignments.addAll(this.assignments);
- models.clusters.addAll(this.clusters);
- models.datasources.addAll(this.datasources);
- models.policies.addAll(this.policies);
- models.publishments.addAll(this.publishments);
- models.scheduleStates.putAll(this.scheduleStates);
- models.schemas.addAll(this.schemas);
- models.topologies.addAll(this.topologies);
- return models;
- }
-
- @Override
- public OpResult importModels(Models models) {
- clear();
- this.assignments.addAll(models.assignments);
- this.clusters.addAll(models.clusters);
- this.datasources.addAll(models.datasources);
- this.policies.addAll(models.policies);
- this.publishments.addAll(models.publishments);
- this.scheduleStates.putAll(models.scheduleStates);
- this.schemas.addAll(models.schemas);
- this.topologies.addAll(models.topologies);
- OpResult result = new OpResult();
- result.code = 200;
- result.message = "OK";
- return result;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/MetadataDaoFactory.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/MetadataDaoFactory.java b/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/MetadataDaoFactory.java
deleted file mode 100644
index 94717ce..0000000
--- a/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/MetadataDaoFactory.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.metadata.impl;
-
-import java.lang.reflect.Constructor;
-
-import org.apache.eagle.alert.metadata.resource.IMetadataDao;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-
-/**
- * @since Apr 12, 2016
- *
- */
-public class MetadataDaoFactory {
-
- private static final MetadataDaoFactory INSTANCE = new MetadataDaoFactory();
- private static final Logger LOG = LoggerFactory.getLogger(MetadataDaoFactory.class);
-
- private IMetadataDao dao;
-
- private MetadataDaoFactory() {
- Config config = ConfigFactory.load();
- Config datastoreConfig = config.getConfig("datastore");
- if (datastoreConfig == null) {
- LOG.warn("datastore is not configured, use in-memory store !!!");
- dao = new InMemMetadataDaoImpl(datastoreConfig);
- } else {
- String clsName = datastoreConfig.getString("metadataDao");
- Class<?> clz;
- try {
- clz = Thread.currentThread().getContextClassLoader().loadClass(clsName);
- if (IMetadataDao.class.isAssignableFrom(clz)) {
- Constructor<?> cotr = clz.getConstructor(Config.class);
- dao = (IMetadataDao) cotr.newInstance(datastoreConfig);
- } else {
- throw new Exception("metadataDao configuration need to be implementation of IMetadataDao! ");
- }
- } catch (Exception e) {
- LOG.error("error when initialize the dao, fall back to in memory mode!", e);
- dao = new InMemMetadataDaoImpl(datastoreConfig);
- }
- }
- }
-
- public static MetadataDaoFactory getInstance() {
- return INSTANCE;
- }
-
- public IMetadataDao getMetadataDao() {
- return dao;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/MongoMetadataDaoImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/MongoMetadataDaoImpl.java b/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/MongoMetadataDaoImpl.java
deleted file mode 100644
index a46f0c7..0000000
--- a/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/MongoMetadataDaoImpl.java
+++ /dev/null
@@ -1,380 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.metadata.impl;
-
-import java.io.IOException;
-import java.util.LinkedList;
-import java.util.List;
-
-import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
-import org.apache.eagle.alert.coordination.model.ScheduleState;
-import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment;
-import org.apache.eagle.alert.coordination.model.internal.Topology;
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-import org.apache.eagle.alert.engine.coordinator.Publishment;
-import org.apache.eagle.alert.engine.coordinator.PublishmentType;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamingCluster;
-import org.apache.eagle.alert.metadata.resource.IMetadataDao;
-import org.apache.eagle.alert.metadata.resource.Models;
-import org.apache.eagle.alert.metadata.resource.OpResult;
-import org.bson.BsonDocument;
-import org.bson.BsonInt32;
-import org.bson.BsonString;
-import org.bson.Document;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.fasterxml.jackson.databind.DeserializationFeature;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.mongodb.Function;
-import com.mongodb.MongoClient;
-import com.mongodb.client.MongoCollection;
-import com.mongodb.client.MongoDatabase;
-import com.mongodb.client.model.IndexOptions;
-import com.mongodb.client.model.UpdateOptions;
-import com.mongodb.client.result.DeleteResult;
-import com.mongodb.client.result.UpdateResult;
-import com.typesafe.config.Config;
-
-/**
- * @since Apr 11, 2016
- *
- */
-public class MongoMetadataDaoImpl implements IMetadataDao {
-
- private static final String DB_NAME = "ump_alert_metadata";
- private static final Logger LOG = LoggerFactory.getLogger(MongoMetadataDaoImpl.class);
- private static final ObjectMapper mapper = new ObjectMapper();
- static {
- mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
- }
-
- private final String connection;
- private final MongoClient client;
-
- private MongoDatabase db;
- private MongoCollection<Document> cluster;
- private MongoCollection<Document> schema;
- private MongoCollection<Document> datasource;
- private MongoCollection<Document> policy;
- private MongoCollection<Document> publishment;
- private MongoCollection<Document> publishmentType;
- private MongoCollection<Document> scheduleStates;
- private MongoCollection<Document> assignments;
- private MongoCollection<Document> topologies;
-
- public MongoMetadataDaoImpl(Config config) {
- this.connection = config.getString("connection");
- this.client = new MongoClient(connection);
- init();
- }
-
- private void init() {
- db = client.getDatabase(DB_NAME);
- IndexOptions io = new IndexOptions().background(true).unique(true).name("nameIndex");
- BsonDocument doc = new BsonDocument();
- doc.append("name", new BsonInt32(1));
- cluster = db.getCollection("clusters");
- cluster.createIndex(doc, io);
- {
- BsonDocument doc2 = new BsonDocument();
- doc2.append("streamId", new BsonInt32(1));
- schema = db.getCollection("schemas");
- schema.createIndex(doc2, io);
- }
- datasource = db.getCollection("datasources");
- datasource.createIndex(doc, io);
- policy = db.getCollection("policies");
- policy.createIndex(doc, io);
- publishment = db.getCollection("publishments");
- publishment.createIndex(doc, io);
- topologies = db.getCollection("topologies");
- topologies.createIndex(doc, io);
-
- publishmentType = db.getCollection("publishmentTypes");
- {
- IndexOptions io1 = new IndexOptions().background(true).unique(true).name("pubTypeIndex");
- BsonDocument doc1 = new BsonDocument();
- doc1.append("type", new BsonInt32(1));
- publishmentType.createIndex(doc1, io1);
- }
-
- scheduleStates = db.getCollection("schedule_specs");
- {
- IndexOptions io1 = new IndexOptions().background(true).unique(true).name("nameIndex");
- BsonDocument doc1 = new BsonDocument();
- doc1.append("version", new BsonInt32(1));
- scheduleStates.createIndex(doc1, io1);
- }
-
- assignments = db.getCollection("assignments");
- {
- IndexOptions io1 = new IndexOptions().background(true).unique(true).name("policyNameIndex");
- BsonDocument doc1 = new BsonDocument();
- doc1.append("policyName", new BsonInt32(1));
- assignments.createIndex(doc1, io1);
- }
- }
-
- @Override
- public List<StreamingCluster> listClusters() {
- return list(cluster, StreamingCluster.class);
- }
-
- private <T> List<T> list(MongoCollection<Document> collection, Class<T> clz) {
- List<T> result = new LinkedList<T>();
- collection.find().map(new Function<Document, T>() {
- @Override
- public T apply(Document t) {
- String json = t.toJson();
- try {
- return mapper.readValue(json, clz);
- } catch (IOException e) {
- LOG.error("deserialize config item failed!", e);
- }
- return null;
- }
- }).into(result);
- return result;
- }
-
- private <T> OpResult addOrReplace(MongoCollection<Document> collection, T t) {
- BsonDocument filter = new BsonDocument();
- if (t instanceof StreamDefinition) {
- filter.append("streamId", new BsonString(InMemMetadataDaoImpl.getKey(t)));
- } else {
- filter.append("name", new BsonString(InMemMetadataDaoImpl.getKey(t)));
- }
-
- String json = "";
- OpResult result = new OpResult();
- try {
- json = mapper.writeValueAsString(t);
- UpdateOptions options = new UpdateOptions();
- options.upsert(true);
- UpdateResult ur = collection.replaceOne(filter, Document.parse(json), options);
- // FIXME: could based on matched count do better matching...
- if (ur.getModifiedCount() > 0 || ur.getUpsertedId() != null) {
- result.code = 200;
- result.message = String.format("update %d configuration item.", ur.getModifiedCount());
- } else {
- result.code = 500;
- result.message = "no configuration item create/updated.";
- }
- } catch (Exception e) {
- result.code = 500;
- result.message = e.getMessage();
- LOG.error("", e);
- }
- return result;
- }
-
- private <T> OpResult remove(MongoCollection<Document> collection, String name) {
- BsonDocument filter = new BsonDocument();
- filter.append("name", new BsonString(name));
- DeleteResult dr = collection.deleteOne(filter);
- OpResult result = new OpResult();
- result.code = 200;
- result.message = String.format(" %d config item removed!", dr.getDeletedCount());
- return result;
- }
-
- @Override
- public OpResult addCluster(StreamingCluster cluster) {
- return addOrReplace(this.cluster, cluster);
- }
-
- @Override
- public OpResult removeCluster(String clusterId) {
- return remove(cluster, clusterId);
- }
-
- @Override
- public List<StreamDefinition> listStreams() {
- return list(schema, StreamDefinition.class);
- }
-
- @Override
- public OpResult createStream(StreamDefinition stream) {
- return addOrReplace(this.schema, stream);
- }
-
- @Override
- public OpResult removeStream(String streamId) {
- return remove(schema, streamId);
- }
-
- @Override
- public List<Kafka2TupleMetadata> listDataSources() {
- return list(datasource, Kafka2TupleMetadata.class);
- }
-
- @Override
- public OpResult addDataSource(Kafka2TupleMetadata dataSource) {
- return addOrReplace(this.datasource, dataSource);
- }
-
- @Override
- public OpResult removeDataSource(String datasourceId) {
- return remove(datasource, datasourceId);
- }
-
- @Override
- public List<PolicyDefinition> listPolicies() {
- return list(policy, PolicyDefinition.class);
- }
-
- @Override
- public OpResult addPolicy(PolicyDefinition policy) {
- return addOrReplace(this.policy, policy);
- }
-
- @Override
- public OpResult removePolicy(String policyId) {
- return remove(policy, policyId);
- }
-
- @Override
- public List<Publishment> listPublishment() {
- return list(publishment, Publishment.class);
- }
-
- @Override
- public OpResult addPublishment(Publishment publishment) {
- return addOrReplace(this.publishment, publishment);
- }
-
- @Override
- public OpResult removePublishment(String pubId) {
- return remove(publishment, pubId);
- }
-
- @Override
- public List<PublishmentType> listPublishmentType() {
- return list(publishmentType, PublishmentType.class);
- }
-
- @Override
- public OpResult addPublishmentType(PublishmentType pubType) {
- return addOrReplace(this.publishmentType, pubType);
- }
-
- @Override
- public OpResult removePublishmentType(String pubType) {
- return remove(publishmentType, pubType);
- }
-
- @Override
- public ScheduleState getScheduleState(String versionId) {
- BsonDocument doc = new BsonDocument();
- doc.append("version", new BsonString(versionId));
- ScheduleState state = scheduleStates.find(doc).map(new Function<Document, ScheduleState>() {
- @Override
- public ScheduleState apply(Document t) {
- String json = t.toJson();
- try {
- return mapper.readValue(json, ScheduleState.class);
- } catch (IOException e) {
- LOG.error("deserialize config item failed!", e);
- }
- return null;
- }
- }).first();
- return state;
- }
-
- @Override
- public OpResult addScheduleState(ScheduleState state) {
- return addOne(scheduleStates, state);
- }
-
- private <T> OpResult addOne(MongoCollection<Document> collection, T t) {
- OpResult result = new OpResult();
- try {
- String json = mapper.writeValueAsString(t);
- collection.insertOne(Document.parse(json));
- result.code = 200;
- result.message = "add state succeed!";
- } catch (Exception e) {
- result.code = 400;
- result.message = e.getMessage();
- LOG.error("", e);
- }
- return result;
- }
-
- @Override
- public ScheduleState getScheduleState() {
- BsonDocument sort = new BsonDocument();
- sort.append("generateTime", new BsonInt32(-1));
- ScheduleState state = scheduleStates.find().sort(sort).map(new Function<Document, ScheduleState>() {
- @Override
- public ScheduleState apply(Document t) {
- String json = t.toJson();
- try {
- return mapper.readValue(json, ScheduleState.class);
- } catch (IOException e) {
- LOG.error("deserialize config item failed!", e);
- }
- return null;
- }
- }).first();
- return state;
- }
-
- @Override
- public List<PolicyAssignment> listAssignments() {
- return list(assignments, PolicyAssignment.class);
- }
-
- @Override
- public OpResult addAssignment(PolicyAssignment assignment) {
- return addOne(assignments, assignment);
- }
-
- @Override
- public List<Topology> listTopologies() {
- return list(topologies, Topology.class);
- }
-
- @Override
- public OpResult addTopology(Topology t) {
- return addOrReplace(this.topologies, t);
- }
-
- @Override
- public OpResult removeTopology(String topologyName) {
- return remove(topologies, topologyName);
- }
-
- @Override
- public OpResult clear() {
- throw new UnsupportedOperationException("clear not support!");
- }
-
- @Override
- public Models export() {
- throw new UnsupportedOperationException("export not support!");
- }
-
- @Override
- public OpResult importModels(Models models) {
- throw new UnsupportedOperationException("importModels not support!");
- }
-
-}