You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by yo...@apache.org on 2016/08/14 06:23:09 UTC
[09/13] incubator-eagle git commit: EAGLE-341 clean inner process
alert engine code clean inner process alert engine code
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-alert-parent/eagle-alert-service/src/main/java/org/apache/eagle/service/alert/SiteApplicationResource.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-service/src/main/java/org/apache/eagle/service/alert/SiteApplicationResource.java b/eagle-core/eagle-alert-parent/eagle-alert-service/src/main/java/org/apache/eagle/service/alert/SiteApplicationResource.java
deleted file mode 100644
index e399189..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert-service/src/main/java/org/apache/eagle/service/alert/SiteApplicationResource.java
+++ /dev/null
@@ -1,189 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.eagle.service.alert;
-
-import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
-import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
-import org.apache.eagle.policy.common.Constants;
-import org.apache.eagle.alert.entity.SiteDescServiceEntity;
-import org.apache.eagle.service.generic.GenericEntityServiceResource;
-import org.apache.eagle.alert.entity.ApplicationDescServiceEntity;
-import org.apache.eagle.alert.entity.SiteApplicationServiceEntity;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.codehaus.jackson.map.type.TypeFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.ws.rs.*;
-import javax.ws.rs.core.MediaType;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.*;
-
-@Path(SiteApplicationResource.ROOT_PATH)
-public class SiteApplicationResource {
- private final static Logger LOG = LoggerFactory.getLogger(SiteApplicationResource.class);
- private final static GenericEntityServiceResource resource = new GenericEntityServiceResource();
- public final static String ROOT_PATH = "/module";
-
- @Path("site")
- @DELETE
- @Consumes(MediaType.APPLICATION_JSON)
- @Produces(MediaType.APPLICATION_JSON)
- public GenericServiceAPIResponseEntity deleteSite(@QueryParam("site") String site) {
- String siteQuery = Constants.SITE_DESCRIPTION_SERVICE_ENDPOINT_NAME+ "[@site=\"" + site + "\"]{*}";
- String siteApplicationQuery = Constants.SITE_APPLICATION_SERVICE_ENDPOINT_NAME + "[@site=\"" + site + "\"]{*}";
- int pageSize = Integer.MAX_VALUE;
-
- GenericServiceAPIResponseEntity response = resource.deleteByQuery(siteQuery, null, null, pageSize, null, false, false, 0L, 0, true, 0, null, false);
- if(response.isSuccess()) {
- response = resource.deleteByQuery(siteApplicationQuery, null, null, pageSize, null, false, false, 0L, 0, true, 0, null, false);
- if(!response.isSuccess()) {
- LOG.error(response.getException());
- }
- } else {
- LOG.error(response.getException());
- }
- return response;
- }
-
- @Path("application")
- @DELETE
- @Consumes(MediaType.APPLICATION_JSON)
- @Produces(MediaType.APPLICATION_JSON)
- public GenericServiceAPIResponseEntity deleteApplication(@QueryParam("application") String application) {
- String applicationQuery = Constants.APPLICATION_DESCRIPTION_SERVICE_ENDPOINT_NAME+ "[@application=\"" + application + "\"]{*}";
- String siteApplicationQuery = Constants.SITE_APPLICATION_SERVICE_ENDPOINT_NAME + "[@application=\"" + application + "\"]{*}";
- int pageSize = Integer.MAX_VALUE;
-
- GenericServiceAPIResponseEntity response = resource.deleteByQuery(applicationQuery, null, null, pageSize, null, false, false, 0L, 0, true, 0, null, false);
- if(response.isSuccess()) {
- response = resource.deleteByQuery(siteApplicationQuery, null, null, pageSize, null, false, false, 0L, 0, true, 0, null, false);
- if(!response.isSuccess()) {
- LOG.error(response.getException());
- }
- } else {
- LOG.error(response.getException());
- }
- return response;
- }
-
- @Path("feature")
- @DELETE
- @Consumes(MediaType.APPLICATION_JSON)
- @Produces(MediaType.APPLICATION_JSON)
- public GenericServiceAPIResponseEntity deleteFeature(@QueryParam("feature") String feature) {
- String featureQuery = Constants.FEATURE_DESCRIPTION_SERVICE_ENDPOINT_NAME+ "[@feature=\"" + feature + "\"]{*}";
- String applicationQuery = Constants.APPLICATION_DESCRIPTION_SERVICE_ENDPOINT_NAME + "[]{*}";
- int pageSize = Integer.MAX_VALUE;
-
- GenericServiceAPIResponseEntity response = resource.deleteByQuery(featureQuery, null, null, pageSize, null, false, false, 0L, 0, true, 0, null, false);
- if(response.isSuccess()) {
- response = resource.search(applicationQuery, null, null, pageSize, null, false, false, 0L, 0, true, 0, null, false);
- if(response.isSuccess()) {
- List<ApplicationDescServiceEntity> entityList = response.getObj();
- Boolean isModified = false;
- for(ApplicationDescServiceEntity entity : entityList) {
- if(entity.getFeatures().contains(feature)) {
- List<String> features = entity.getFeatures();
- features.remove(feature);
- entity.setFeatures(features);
- isModified = true;
- }
- }
- if(isModified) {
- response = resource.updateEntities(entityList, Constants.APPLICATION_DESCRIPTION_SERVICE_ENDPOINT_NAME);
- }
- }
- }
- if(!response.isSuccess()) {
- LOG.error(response.getException());
- }
- return response;
- }
-
- @Path("siteApplication")
- @POST
- @Consumes(MediaType.APPLICATION_JSON)
- @Produces(MediaType.APPLICATION_JSON)
- public GenericServiceAPIResponseEntity createSiteApplications(InputStream inputStream) {
- GenericServiceAPIResponseEntity response = new GenericServiceAPIResponseEntity<>();
- int pageSize = Integer.MAX_VALUE;
- try {
- List<SiteApplicationObject> entities = (List<SiteApplicationObject>) unmarshalSiteApplicationEntities(inputStream);
- if(entities == null) {
- throw new IllegalArgumentException("cannot convert to SiteApplicationObject");
- }
- List<SiteDescServiceEntity> siteEntities = new LinkedList<>();
- List<SiteApplicationServiceEntity> applicationEntities = new LinkedList<>();
- Set<String> sites = new HashSet<>();
- for(SiteApplicationObject e : entities) {
- sites.add(e.getTags().get("site"));
- SiteDescServiceEntity entity = new SiteDescServiceEntity();
- entity.setEnabled(e.getEnabled());
- entity.setTags(e.getTags());
- siteEntities.add(entity);
- applicationEntities.addAll(e.getApplications());
- }
- response = resource.updateEntities(siteEntities, Constants.SITE_DESCRIPTION_SERVICE_ENDPOINT_NAME);
- if(response.isSuccess()) {
- String query = buildQueryWithAttributeList(Constants.SITE_APPLICATION_SERVICE_ENDPOINT_NAME, "site", sites);
- LOG.info("query=" + query);
- response = resource.search(query, null, null, pageSize, null, false, false, 0L, 0, true, 0, null, false);
- if(response.isSuccess()) {
- List<SiteApplicationServiceEntity> applications = response.getObj();
- for(SiteApplicationServiceEntity app : applications) {
- app.setEnabled(false);
- }
- response = resource.updateEntities(applications, Constants.SITE_APPLICATION_SERVICE_ENDPOINT_NAME);
- if(response.isSuccess()) {
- response = resource.updateEntities(applicationEntities, Constants.SITE_APPLICATION_SERVICE_ENDPOINT_NAME);
- }
- }
- }
- if(!response.isSuccess()) {
- LOG.error(response.getException());
- }
- } catch (Exception ex) {
- LOG.error(ex.getMessage(), ex);
- response.setException(ex);
- }
- return response;
- }
-
- private String buildQueryWithAttributeList(String serviceName, String attr, Set<String> sets) {
- StringBuilder builder = new StringBuilder(serviceName + "[");
- String attribute = "@" + attr + "=";
- String condition = " OR ";
- for(String s : sets) {
- String value = String.format("\"%s\"", s);
- builder.append(attribute + value);
- builder.append(condition);
- }
- String result = builder.substring(0, builder.length()-condition.length());
- result = result + "]{*}";
- return result;
- }
-
- private List<? extends TaggedLogAPIEntity> unmarshalSiteApplicationEntities(InputStream inputStream) throws IllegalAccessException, InstantiationException, IOException {
- ObjectMapper objectMapper = new ObjectMapper();
- return objectMapper.readValue(inputStream, TypeFactory.defaultInstance().constructCollectionType(LinkedList.class, SiteApplicationObject.class));
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-alert-parent/eagle-alert-service/src/main/java/org/apache/eagle/service/alert/resolver/AttributeResolveResponse.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-service/src/main/java/org/apache/eagle/service/alert/resolver/AttributeResolveResponse.java b/eagle-core/eagle-alert-parent/eagle-alert-service/src/main/java/org/apache/eagle/service/alert/resolver/AttributeResolveResponse.java
deleted file mode 100644
index 0ec9cf4..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert-service/src/main/java/org/apache/eagle/service/alert/resolver/AttributeResolveResponse.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.service.alert.resolver;
-
-import java.util.List;
-
-public class AttributeResolveResponse<V> {
- private String exception;
- private List<V> values;
-
- public List<V> getValues() {
- return values;
- }
-
- public void setValues(List<V> values) {
- this.values = values;
- }
-
- public String getException() {
- return exception;
- }
-
- public void setException(String exception) {
- this.exception = exception;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-alert-parent/eagle-alert-service/src/main/resources/META-INF/services/org.apache.eagle.service.alert.AlertPolicyValidateProvider
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-service/src/main/resources/META-INF/services/org.apache.eagle.service.alert.AlertPolicyValidateProvider b/eagle-core/eagle-alert-parent/eagle-alert-service/src/main/resources/META-INF/services/org.apache.eagle.service.alert.AlertPolicyValidateProvider
deleted file mode 100644
index 3833f47..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert-service/src/main/resources/META-INF/services/org.apache.eagle.service.alert.AlertPolicyValidateProvider
+++ /dev/null
@@ -1,16 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-org.apache.eagle.service.alert.SiddhiAlertPolicyValidateProvider
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-alert-parent/eagle-alert-service/src/test/resources/META-INF/services/org.apache.eagle.service.alert.AlertPolicyValidateProvider
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-service/src/test/resources/META-INF/services/org.apache.eagle.service.alert.AlertPolicyValidateProvider b/eagle-core/eagle-alert-parent/eagle-alert-service/src/test/resources/META-INF/services/org.apache.eagle.service.alert.AlertPolicyValidateProvider
deleted file mode 100644
index 3833f47..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert-service/src/test/resources/META-INF/services/org.apache.eagle.service.alert.AlertPolicyValidateProvider
+++ /dev/null
@@ -1,16 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-org.apache.eagle.service.alert.SiddhiAlertPolicyValidateProvider
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-alert-parent/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/pom.xml b/eagle-core/eagle-alert-parent/pom.xml
index bc35fc9..9dbacf0 100644
--- a/eagle-core/eagle-alert-parent/pom.xml
+++ b/eagle-core/eagle-alert-parent/pom.xml
@@ -31,9 +31,6 @@
<name>eagle-alert-parent</name>
<modules>
<module>eagle-alert</module>
- <module>eagle-alert-base</module>
- <module>eagle-alert-process</module>
<module>eagle-alert-service</module>
- <module>eagle-alert-notification-plugin</module>
</modules>
</project>
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-app/eagle-stream-application-manager/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-stream-application-manager/pom.xml b/eagle-core/eagle-app/eagle-stream-application-manager/pom.xml
deleted file mode 100644
index 919188f..0000000
--- a/eagle-core/eagle-app/eagle-stream-application-manager/pom.xml
+++ /dev/null
@@ -1,142 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one or more
- ~ contributor license agreements. See the NOTICE file distributed with
- ~ this work for additional information regarding copyright ownership.
- ~ The ASF licenses this file to You under the Apache License, Version 2.0
- ~ (the "License"); you may not use this file except in compliance with
- ~ the License. You may obtain a copy of the License at
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing, software
- ~ distributed under the License is distributed on an "AS IS" BASIS,
- ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ~ See the License for the specific language governing permissions and
- ~ limitations under the License.
- ~
- -->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <parent>
- <artifactId>eagle-app-parent</artifactId>
- <groupId>org.apache.eagle</groupId>
- <version>0.5.0-incubating-SNAPSHOT</version>
- </parent>
-
- <modelVersion>4.0.0</modelVersion>
- <artifactId>eagle-stream-application-manager</artifactId>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.eagle</groupId>
- <artifactId>eagle-application-service</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.eagle</groupId>
- <artifactId>eagle-stream-pipeline</artifactId>
- <version>${project.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.storm</groupId>
- <artifactId>storm-core</artifactId>
- <version>${storm.version}</version>
- <exclusions>
- <exclusion>
- <groupId>ch.qos.logback</groupId>
- <artifactId>logback-classic</artifactId>
- </exclusion>
- <exclusion>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>log4j-over-slf4j</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.scala-lang</groupId>
- <artifactId>scala-reflect</artifactId>
- </dependency>
- <dependency>
- <groupId>org.scala-lang</groupId>
- <artifactId>scala-compiler</artifactId>
- <version>${scala.version}.0</version>
- </dependency>
- <dependency>
- <groupId>org.scalatest</groupId>
- <artifactId>scalatest_${scala.version}</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>com.typesafe.akka</groupId>
- <artifactId>akka-actor_${scala.version}</artifactId>
- <version>${akka.actor.version}</version>
- </dependency>
- <dependency>
- <groupId>com.typesafe.akka</groupId>
- <artifactId>akka-testkit_${scala.version}</artifactId>
- <version>${akka.actor.version}</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- <scope>compile</scope>
- </dependency>
- </dependencies>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.scala-tools</groupId>
- <artifactId>maven-scala-plugin</artifactId>
- <executions>
- <execution>
- <id>scala-compile-first</id>
- <phase>process-resources</phase>
- <goals>
- <goal>add-source</goal>
- <goal>compile</goal>
- </goals>
- </execution>
- <execution>
- <id>scala-test-compile</id>
- <phase>process-test-resources</phase>
- <goals>
- <goal>testCompile</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-surefire-plugin</artifactId>
- <configuration>
- <skipTests>true</skipTests>
- </configuration>
- </plugin>
- <plugin>
- <groupId>org.scalatest</groupId>
- <artifactId>scalatest-maven-plugin</artifactId>
- <configuration>
- <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
- <junitxml>.</junitxml>
- <filereports>TestSuite.txt</filereports>
- </configuration>
- <executions>
- <execution>
- <id>test</id>
- <goals>
- <goal>test</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
-</project>
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-app/eagle-stream-application-manager/src/main/java/org/apache/eagle/stream/application/TopologyException.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-stream-application-manager/src/main/java/org/apache/eagle/stream/application/TopologyException.java b/eagle-core/eagle-app/eagle-stream-application-manager/src/main/java/org/apache/eagle/stream/application/TopologyException.java
deleted file mode 100644
index d382629..0000000
--- a/eagle-core/eagle-app/eagle-stream-application-manager/src/main/java/org/apache/eagle/stream/application/TopologyException.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.eagle.stream.application;
-
-
-public class TopologyException extends Exception {
- public TopologyException(String s, Exception e) { super(s,e); }
- public TopologyException(Exception e) { super(e); }
- public TopologyException(String s) { super(s); }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-app/eagle-stream-application-manager/src/main/java/org/apache/eagle/stream/application/TopologyExecutable.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-stream-application-manager/src/main/java/org/apache/eagle/stream/application/TopologyExecutable.java b/eagle-core/eagle-app/eagle-stream-application-manager/src/main/java/org/apache/eagle/stream/application/TopologyExecutable.java
deleted file mode 100644
index 8f625c7..0000000
--- a/eagle-core/eagle-app/eagle-stream-application-manager/src/main/java/org/apache/eagle/stream/application/TopologyExecutable.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.eagle.stream.application;
-
-
-import com.typesafe.config.Config;
-
-
-public interface TopologyExecutable {
- void submit(String topology, Config config);
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-app/eagle-stream-application-manager/src/main/java/org/apache/eagle/stream/application/TopologyFactory.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-stream-application-manager/src/main/java/org/apache/eagle/stream/application/TopologyFactory.java b/eagle-core/eagle-app/eagle-stream-application-manager/src/main/java/org/apache/eagle/stream/application/TopologyFactory.java
deleted file mode 100644
index e32f48e..0000000
--- a/eagle-core/eagle-app/eagle-stream-application-manager/src/main/java/org/apache/eagle/stream/application/TopologyFactory.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.eagle.stream.application;
-
-
-import com.typesafe.config.Config;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
-
-public final class TopologyFactory {
- public static Logger LOG = LoggerFactory.getLogger(TopologyFactory.class);
- private final static Map<String, TopologyExecutable> topologyCache = Collections.synchronizedMap(new HashMap<String, TopologyExecutable>());
- public static TopologyExecutable getTopologyInstance(String topologyClass) throws TopologyException {
- TopologyExecutable instance;
- if(topologyCache.containsKey(topologyClass)){
- instance = topologyCache.get(topologyClass);
- } else {
- try {
- LOG.info("load class " + topologyClass + "with classLoader " + TopologyFactory.class.getClassLoader().toString());
- instance = (TopologyExecutable) Class.forName(topologyClass).newInstance();
- topologyCache.put(topologyClass, instance);
- } catch (ClassNotFoundException e) {
- throw new TopologyException("Topology in type of " + topologyClass + " is not found",e);
- } catch (InstantiationException | IllegalAccessException e) {
- throw new TopologyException(e);
- }
- }
- return instance;
- }
-
- public static void submit(String topologyClass, Config config) throws TopologyException {
- TopologyExecutable topology = getTopologyInstance(topologyClass);
- topology.submit(topologyClass, config);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/AbstractDynamicApplication.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/AbstractDynamicApplication.scala b/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/AbstractDynamicApplication.scala
index 3e918cc..b5fbf59 100644
--- a/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/AbstractDynamicApplication.scala
+++ b/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/AbstractDynamicApplication.scala
@@ -20,13 +20,11 @@ package org.apache.eagle.stream.application
import com.typesafe.config.Config
import org.apache.eagle.datastream.core.StreamContext
-import org.apache.eagle.stream.pipeline.Pipeline
trait AbstractDynamicApplication extends TopologyExecutable {
def compileStream(application: String, config: Config): StreamContext = {
- val pipeline = Pipeline.parseStringWithConfig(application, config)
- Pipeline.compile(pipeline)
+ null
}
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/ApplicationManager.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/ApplicationManager.scala b/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/ApplicationManager.scala
deleted file mode 100644
index bbfaedd..0000000
--- a/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/ApplicationManager.scala
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.eagle.stream.application
-
-import java.util
-
-import com.google.common.base.Preconditions
-import org.apache.eagle.service.application.entity.TopologyExecutionStatus
-import org.apache.eagle.stream.application.impl.StormExecutionPlatform
-import org.slf4j.{Logger, LoggerFactory}
-
-import scala.collection.JavaConversions
-
-
-object ApplicationManager {
- private val LOG: Logger = LoggerFactory.getLogger(ApplicationManager.getClass)
- private val workerMap: util.Map[AnyRef, TaskExecutor] = new util.TreeMap[AnyRef, TaskExecutor]
-
- def getWorkerMap: util.Map[AnyRef, TaskExecutor] = {
- return workerMap
- }
-
- def submit(id: AnyRef, runnable: Runnable): TaskExecutor = {
- if (workerMap.containsKey(id)) {
- val executor: Thread = workerMap.get(id)
- if (!executor.isAlive || executor.getState.equals() ) {
- LOG.info("Replacing dead executor: {}", executor)
- workerMap.remove(id)
- }
- else {
- throw new IllegalArgumentException("Duplicated id '" + id + "'")
- }
- }
- val worker: TaskExecutor = new TaskExecutor(runnable)
- LOG.info("Registering new executor %s: %s".format(id, worker))
- workerMap.put(id, worker)
- worker.setName(id.toString)
- worker.setDaemon(true)
- worker.start
- return worker
- }
-
- def get(id: AnyRef): TaskExecutor = {
- Preconditions.checkArgument(workerMap.containsKey(id))
- return workerMap.get(id)
- }
-
- @throws(classOf[Exception])
- def stop(id: AnyRef): TaskExecutor = {
- val worker: TaskExecutor = get(id)
- worker.interrupt
- //this.workerMap.remove(id)
- return worker
- }
-
- def getWorkerStatus(state: Thread.State): String = {
- if (whereIn(state, java.lang.Thread.State.RUNNABLE, java.lang.Thread.State.TIMED_WAITING, java.lang.Thread.State.WAITING)) {
- return TopologyExecutionStatus.STARTED
- }
- else if (whereIn(state, java.lang.Thread.State.NEW)) {
- return TopologyExecutionStatus.STARTING
- }
- else if (whereIn(state, java.lang.Thread.State.TERMINATED)) {
- return TopologyExecutionStatus.STOPPED
- }
- throw new IllegalStateException("Unknown state: " + state)
- }
-
- def getTopologyStatus(status: String): String = {
- if(whereIn(status, StormExecutionPlatform.KILLED))
- return TopologyExecutionStatus.STOPPING
- return TopologyExecutionStatus.STARTED
- }
-
- private def whereIn(status: String, inStatuses: String*): Boolean = {
- for (_status <- inStatuses) {
- if (_status.equalsIgnoreCase(status)) {
- return true
- }
- }
- return false
- }
- private def whereIn(state: Thread.State, inStates: Thread.State*): Boolean = {
- for (_state <- inStates) {
- if (_state eq state) {
- return true
- }
- }
- return false
- }
-
- def remove(id: AnyRef) {
- val executor: TaskExecutor = this.get(id)
- if (executor.isAlive) {
- throw new RuntimeException("Failed to remove alive executor '" + id + "'")
- }
- else {
- this.workerMap.remove(id)
- }
- }
-
- def stopAll(): Unit ={
- JavaConversions.collectionAsScalaIterable(workerMap.values()) foreach { worker =>
- if(!worker.isInterrupted) {
- worker.interrupt()
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/ApplicationManagerUtils.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/ApplicationManagerUtils.scala b/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/ApplicationManagerUtils.scala
deleted file mode 100644
index 4c2df77..0000000
--- a/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/ApplicationManagerUtils.scala
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.eagle.stream.application
-
-import com.typesafe.config.Config
-import org.apache.eagle.service.application.AppManagerConstants
-import org.apache.eagle.service.application.entity.TopologyExecutionEntity
-
-
-object ApplicationManagerUtils {
-
- def generateTopologyFullName(topologyExecution: TopologyExecutionEntity) = {
- val fullName = "eagle-%s-%s-%s".format(topologyExecution.getSite, topologyExecution.getApplication, topologyExecution.getTopology)
- fullName
- }
-
- def buildStormTopologyURL(config: Config, topologyID: String): String = {
- val clusterURL = if(config.hasPath(AppManagerConstants.CLUSTER_URL)) config.getString(AppManagerConstants.CLUSTER_URL) else AppManagerConstants.DEFAULT_CLUSTER_URL
- val topologyURL = clusterURL + "/topology.html?id=" + topologyID
- topologyURL
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/ApplicationSchedulerAsyncDAO.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/ApplicationSchedulerAsyncDAO.scala b/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/ApplicationSchedulerAsyncDAO.scala
deleted file mode 100644
index ae0f6e8..0000000
--- a/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/ApplicationSchedulerAsyncDAO.scala
+++ /dev/null
@@ -1,179 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.eagle.stream.application
-
-import java.util
-import java.util.concurrent.Callable
-
-import akka.dispatch.Futures
-import com.typesafe.config.Config
-import org.apache.eagle.alert.entity.SiteApplicationServiceEntity
-import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity
-import org.apache.eagle.policy.common.Constants
-import org.apache.eagle.service.application.entity.{TopologyDescriptionEntity, TopologyExecutionEntity, TopologyExecutionStatus, TopologyOperationEntity}
-import org.apache.eagle.service.client.EagleServiceConnector
-import org.apache.eagle.service.client.impl.EagleServiceClientImpl
-import org.slf4j.{Logger, LoggerFactory}
-
-import scala.collection.JavaConversions
-import scala.concurrent.ExecutionContext
-
-
-class ApplicationSchedulerAsyncDAO(config: Config, ex: ExecutionContext) {
- private val LOG: Logger = LoggerFactory.getLogger(classOf[ApplicationSchedulerAsyncDAO])
- private val connector: EagleServiceConnector = new EagleServiceConnector(config)
-
- def getEagleServiceClient(): EagleServiceClientImpl = {
- return new EagleServiceClientImpl(connector)
- }
-
- def readOperationsByStatus(status: String) = {
- Futures.future(new Callable[util.List[TopologyOperationEntity]]{
- override def call(): util.List[TopologyOperationEntity] = {
- val client = getEagleServiceClient()
- val query = "%s[@status=\"%s\"]{*}".format(Constants.TOPOLOGY_OPERATION_SERVICE_ENDPOINT_NAME, status)
- val response: GenericServiceAPIResponseEntity[TopologyOperationEntity] = client.search(query).pageSize(Int.MaxValue).send()
- if(client != null) client.close()
- if(!response.isSuccess || response.getObj == null)
- throw new Exception(s"Fail to load operations with status $status")
- response.getObj
- }
- }, ex)
- }
-
- def loadAllTopologyExecutionEntities() = {
- Futures.future(new Callable[util.List[TopologyExecutionEntity]]{
- override def call(): util.List[TopologyExecutionEntity] = {
- val client = getEagleServiceClient()
- val query = "%s[@status != \"%s\"]{*}".format(Constants.TOPOLOGY_EXECUTION_SERVICE_ENDPOINT_NAME, TopologyExecutionStatus.NEW)
- val response: GenericServiceAPIResponseEntity[TopologyExecutionEntity] = client.search(query).pageSize(Int.MaxValue).send()
- if(client != null) client.close()
- if(!response.isSuccess || response.getObj == null) throw new Exception(response.getException)
- response.getObj
- }
- }, ex)
- }
-
- def loadTopologyExecutionByName(site: String, appName: String, topologyName: String) = {
- Futures.future(new Callable[TopologyExecutionEntity]{
- override def call(): TopologyExecutionEntity = {
- val client = getEagleServiceClient()
- val query = "%s[@site=\"%s\" AND @application=\"%s\" AND @topology=\"%s\"]{*}".format(Constants.TOPOLOGY_EXECUTION_SERVICE_ENDPOINT_NAME, site, appName, topologyName)
- LOG.info(s"query=$query")
- val response: GenericServiceAPIResponseEntity[TopologyExecutionEntity] = client.search(query).pageSize(Int.MaxValue).send()
- if(client != null) client.close()
- if(!response.isSuccess || response.getObj == null)
- throw new Exception(s"Fail to load topologyExecutionEntity with application=$appName topology=$topologyName due to Exception: ${response.getException}")
- if(response.getObj.size() == 0 || response.getObj.size() > 1) {
- throw new Exception(s"Get 0 or more than 1 topologyExecutionEntity with application=$appName topology=$topologyName")
- }
- response.getObj.get(0)
- }
- }, ex)
- }
-
- def loadTopologyDescriptionByName(site: String, application: String, topologyName: String) = {
- Futures.future(new Callable[TopologyDescriptionEntity]{
- override def call(): TopologyDescriptionEntity = {
- val client = getEagleServiceClient()
- var query = "%s[@topology=\"%s\"]{*}".format(Constants.TOPOLOGY_DESCRIPTION_SERVICE_ENDPOINT_NAME, topologyName)
- val response: GenericServiceAPIResponseEntity[TopologyDescriptionEntity] = client.search(query).pageSize(Int.MaxValue).send()
- if(!response.isSuccess || response.getObj == null || response.getObj.size() == 0)
- throw new Exception(s"Fail to load TopologyDescriptionEntity with site=$site application=$application topology=$topologyName due to Exception: ${response.getException}")
- val topologyDescriptionEntity = response.getObj.get(0)
-
- query = "%s[@site=\"%s\" AND @application=\"%s\"]{*}".format(Constants.SITE_APPLICATION_SERVICE_ENDPOINT_NAME, site, application)
- val configResponse: GenericServiceAPIResponseEntity[SiteApplicationServiceEntity] = client.search(query).pageSize(Int.MaxValue).send()
- if (client != null) client.close()
- if(!configResponse.isSuccess || configResponse.getObj == null || configResponse.getObj.size() == 0)
- throw new Exception(s"Fail to load topology configuration with query=$query due to Exception: ${configResponse.getException}")
- val siteApplicationEntity = configResponse.getObj.get(0)
- topologyDescriptionEntity.setContext(siteApplicationEntity.getConfig)
- topologyDescriptionEntity
- }
- }, ex)
- }
-
- def updateOperationStatus(operation: TopologyOperationEntity) = {
- Futures.future(new Callable[GenericServiceAPIResponseEntity[String]]{
- override def call(): GenericServiceAPIResponseEntity[String] = {
- if(LOG.isDebugEnabled()) LOG.debug(s"Updating status of command[$operation] as ${operation.getStatus}")
- val client = getEagleServiceClient()
- operation.setLastModifiedDate(System.currentTimeMillis())
- val response= client.update(java.util.Arrays.asList(operation), classOf[TopologyOperationEntity])
- if(client != null) client.close()
- if(response.isSuccess) {
- LOG.info(s"Updated operation status [$operation] as: ${operation.getStatus}")
- } else {
- LOG.error(s"Failed to update status as ${operation.getStatus} of command[$operation]")
- throw new RuntimeException(s"Failed to update command due to exception: ${response.getException}")
- }
- response
- }
- }, ex)
- }
-
- def updateTopologyExecutionStatus(topology: TopologyExecutionEntity) = {
- Futures.future(new Callable[GenericServiceAPIResponseEntity[String]]{
- override def call(): GenericServiceAPIResponseEntity[String] = {
- if(LOG.isDebugEnabled()) LOG.debug(s"Updating status of app[$topology] as ${topology.getStatus}")
- val client = getEagleServiceClient()
- topology.setLastModifiedDate(System.currentTimeMillis())
- if(client != null) client.close()
- val response= client.update(java.util.Arrays.asList(topology), classOf[TopologyExecutionEntity])
- if(response.isSuccess) {
- LOG.info(s"Updated status application[$topology] as: ${topology.getStatus}")
- } else {
- LOG.error(s"Failed to update status as ${topology.getStatus} of application[$topology] due to ${response.getException}")
- }
- response
- }
- }, ex)
- }
-
- def clearPendingOperations() = {
- Futures.future(new Callable[GenericServiceAPIResponseEntity[String]]{
- override def call(): GenericServiceAPIResponseEntity[String] = {
- LOG.info("start to clear operation")
- val query: String = "%s[@status=\"%s\"]{*}".format(Constants.TOPOLOGY_OPERATION_SERVICE_ENDPOINT_NAME, TopologyOperationEntity.OPERATION_STATUS.PENDING)
- val client = getEagleServiceClient()
- val response: GenericServiceAPIResponseEntity[TopologyOperationEntity] = client.search(query).pageSize(Int.MaxValue).send()
- var ret: GenericServiceAPIResponseEntity[String] = new GenericServiceAPIResponseEntity[String]()
- if (response.isSuccess && response.getObj.size != 0) {
- val pendingOperations: util.List[TopologyOperationEntity] = response.getObj
- val failedOperations: util.List[TopologyOperationEntity] = new util.ArrayList[TopologyOperationEntity]
- JavaConversions.collectionAsScalaIterable(pendingOperations) foreach { operation =>
- operation.setStatus(TopologyOperationEntity.OPERATION_STATUS.FAILED)
- failedOperations.add(operation)
- }
- ret = client.update(failedOperations, Constants.TOPOLOGY_OPERATION_SERVICE_ENDPOINT_NAME)
- if (client != null) client.close()
- if (ret.isSuccess) {
- LOG.info(s"Successfully clear ${failedOperations.size()} pending operations")
- } else {
- LOG.error(s"Failed to clear pending operations due to exception:" + ret.getException)
- }
- }
- ret
- }
- }, ex)
- }
-}
-
-
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/ExecutionPlatform.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/ExecutionPlatform.scala b/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/ExecutionPlatform.scala
deleted file mode 100644
index 88271bb..0000000
--- a/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/ExecutionPlatform.scala
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.eagle.stream.application
-
-import com.typesafe.config.Config
-import org.apache.eagle.service.application.entity.{TopologyExecutionEntity, TopologyDescriptionEntity}
-
-
-trait ExecutionPlatform {
- def start(topology: TopologyDescriptionEntity, topologyExecution: TopologyExecutionEntity, config: Config)
- def stop(topologyExecution: TopologyExecutionEntity, config: Config)
- def status(topologyExecutions: java.util.List[TopologyExecutionEntity], config: Config)
- def status(topologyExecution: TopologyExecutionEntity, config: Config)
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/ExecutionPlatformFactory.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/ExecutionPlatformFactory.scala b/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/ExecutionPlatformFactory.scala
deleted file mode 100644
index 6b9c033..0000000
--- a/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/ExecutionPlatformFactory.scala
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.eagle.stream.application
-
-import org.apache.eagle.service.application.AppManagerConstants
-import org.apache.eagle.stream.application.impl.StormExecutionPlatform
-import org.slf4j.{LoggerFactory, Logger}
-
-import scala.collection.mutable
-
-
-object ExecutionPlatformFactory {
- private val LOG: Logger = LoggerFactory.getLogger(ExecutionPlatformFactory.getClass)
-
- var managerCache = new mutable.HashMap[String, ExecutionPlatform] with
- mutable.SynchronizedMap[String, ExecutionPlatform]
-
- def getApplicationManager(managerType: String): ExecutionPlatform = {
- if(managerCache.contains(managerType)) {
- managerCache.get(managerType).get
- } else {
- managerType match {
- case AppManagerConstants.EAGLE_CLUSTER_STORM =>
- val instance = new StormExecutionPlatform
- managerCache.put(managerType, instance)
- instance
- case _ =>
- throw new Exception(s"Invalid managerType $managerType")
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/TaskExecutor.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/TaskExecutor.scala b/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/TaskExecutor.scala
deleted file mode 100644
index 07737ac..0000000
--- a/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/TaskExecutor.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.eagle.stream.application
-
-import org.codehaus.jackson.annotate.JsonIgnore
-
-class TaskExecutor(runnable: Runnable) extends Thread(runnable) {
-
- @JsonIgnore override def getContextClassLoader: ClassLoader = {
- return super.getContextClassLoader
- }
-
- @JsonIgnore override def getUncaughtExceptionHandler: Thread.UncaughtExceptionHandler = {
- return super.getUncaughtExceptionHandler
- }
-
- def shutdown {
- this.interrupt
- }
-
- def restart {
- this.interrupt
- this.start
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/impl/StormDynamicTopology.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/impl/StormDynamicTopology.scala b/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/impl/StormDynamicTopology.scala
deleted file mode 100644
index 7d52649..0000000
--- a/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/impl/StormDynamicTopology.scala
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.eagle.stream.application.impl
-
-import com.typesafe.config.Config
-import org.apache.eagle.datastream.ExecutionEnvironments
-import org.apache.eagle.datastream.storm.StormExecutionEnvironment
-import org.apache.eagle.stream.application.AbstractDynamicApplication
-import org.slf4j.LoggerFactory
-
-
-object StormDynamicTopology extends AbstractDynamicApplication {
- val LOG = LoggerFactory.getLogger(classOf[AbstractDynamicApplication])
-
- override def submit(application: String, config: Config) {
- val stream = compileStream(application, config)
- var ret = true
-
- try {
- val stormEnv = ExecutionEnvironments.getWithConfig[StormExecutionEnvironment](stream.getConfig)
- stream.submit(stormEnv)
- } catch {
- case e: Throwable =>
- ret = false
- LOG.error(e.toString)
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/impl/StormExecutionPlatform.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/impl/StormExecutionPlatform.scala b/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/impl/StormExecutionPlatform.scala
deleted file mode 100644
index af4cafa..0000000
--- a/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/impl/StormExecutionPlatform.scala
+++ /dev/null
@@ -1,197 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.eagle.stream.application.impl
-
-import java.net.URLDecoder
-import java.nio.file.{Files, Paths}
-
-import backtype.storm.generated.InvalidTopologyException
-import backtype.storm.utils.{NimbusClient, Utils}
-import com.typesafe.config.{Config, ConfigFactory}
-import org.apache.eagle.common.config.EagleConfigConstants
-import org.apache.eagle.service.application.AppManagerConstants
-import org.apache.eagle.service.application.entity.{TopologyDescriptionEntity, TopologyExecutionEntity, TopologyExecutionStatus}
-import org.apache.eagle.stream.application.{ApplicationManager, ApplicationManagerUtils, ExecutionPlatform, TopologyFactory}
-import org.slf4j.LoggerFactory
-
-import scala.collection.JavaConversions
-
-object StormExecutionPlatform {
- val ACTIVE: String = "ACTIVE"
- val INACTIVE: String = "INACTIVE"
- val KILLED: String = "KILLED"
- val REBALANCING: String = "REBALANCING"
-}
-
-class StormExecutionPlatform extends ExecutionPlatform {
- val LOG = LoggerFactory.getLogger(classOf[StormExecutionPlatform])
-
- private def getNimbusClient(appConfig: com.typesafe.config.Config): NimbusClient = {
- val conf = Utils.readStormConfig().asInstanceOf[java.util.HashMap[String, Object]]
- conf.putAll(Utils.readCommandLineOpts().asInstanceOf[java.util.HashMap[String, Object]])
-
- if(appConfig.hasPath("envContextConfig.nimbusHost")) {
- LOG.info(s"Setting ${backtype.storm.Config.NIMBUS_HOST} as ${appConfig.getString("envContextConfig.nimbusHost")}")
- conf.put(backtype.storm.Config.NIMBUS_HOST, appConfig.getString("envContextConfig.nimbusHost"))
- }
-
- if(appConfig.hasPath("envContextConfig.nimbusThriftPort")) {
- LOG.info(s"Setting ${backtype.storm.Config.NIMBUS_THRIFT_PORT} as ${appConfig.getString("envContextConfig.nimbusThriftPort")}")
- conf.put(backtype.storm.Config.NIMBUS_THRIFT_PORT, appConfig.getNumber("envContextConfig.nimbusThriftPort"))
- }
- NimbusClient.getConfiguredClient(conf)
- }
-
- def startLocal(topologyName: String, topology: TopologyDescriptionEntity, topologyExecution: TopologyExecutionEntity, config: Config): Unit = {
- val worker: Thread = ApplicationManager.submit(topologyName, new Runnable {
- override def run(): Unit = {
- try {
- val topologyType = topology.getType.toUpperCase()
- topologyType match {
- case TopologyDescriptionEntity.TYPE.CLASS =>
- TopologyFactory.submit(topology.getExeClass, config)
- case TopologyDescriptionEntity.TYPE.DYNAMIC =>
- StormDynamicTopology.submit(topology.getExeClass, config)
- case m@_ =>
- LOG.error("Unsupported topology type: " + topology.getType)
- }
- } catch {
- case ex: Throwable =>
- LOG.error(s"topology $topologyName in local mode is interrupted with ${ex.toString}")
- }
- }
- })
- topologyExecution.setFullName(topologyName)
- topologyExecution.setStatus(ApplicationManager.getWorkerStatus(worker.getState))
- topologyExecution.setDescription("Running inside " + worker.toString + " in local mode")
- }
-
- override def start(topology: TopologyDescriptionEntity, topologyExecution: TopologyExecutionEntity, config: Config): Unit = {
- val stormJarPath: String = URLDecoder.decode(classOf[ExecutionPlatform].getProtectionDomain.getCodeSource.getLocation.getPath, "UTF-8")
- if (stormJarPath == null || !Files.exists(Paths.get(stormJarPath)) || !stormJarPath.endsWith(".jar")) {
- val errMsg = s"storm jar file $stormJarPath does not exists, or is a invalid jar file"
- LOG.error(errMsg)
- throw new Exception(errMsg)
- }
- LOG.info(s"Detected a storm.jar location at: $stormJarPath")
- System.setProperty("storm.jar", stormJarPath)
-
- val fullName = ApplicationManagerUtils.generateTopologyFullName(topologyExecution)
- val extConfigStr = "envContextConfig.topologyName=%s".format(fullName)
- val extConfig = ConfigFactory.parseString(extConfigStr)
- val newConfig = extConfig.withFallback(config)
-
- val mode = if(config.hasPath(AppManagerConstants.RUNNING_MODE)) config.getString(AppManagerConstants.RUNNING_MODE) else EagleConfigConstants.LOCAL_MODE
- topologyExecution.setMode(mode)
- if (topologyExecution.getMode.equalsIgnoreCase(EagleConfigConstants.LOCAL_MODE)) {
- startLocal(fullName, topology, topologyExecution, newConfig)
- return
- }
-
- val topologyType = topology.getType.toUpperCase()
- topologyType match {
- case TopologyDescriptionEntity.TYPE.CLASS =>
- TopologyFactory.submit(topology.getExeClass, newConfig)
- case TopologyDescriptionEntity.TYPE.DYNAMIC =>
- StormDynamicTopology.submit(topology.getExeClass, newConfig)
- case m@_ =>
- throw new InvalidTopologyException("Unsupported topology type: " + topology.getType)
- }
- topologyExecution.setFullName(fullName)
- //topologyExecution.setStatus(TopologyExecutionStatus.STARTED)
- }
-
- override def stop(topologyExecution: TopologyExecutionEntity, config: Config): Unit = {
- val name: String = ApplicationManagerUtils.generateTopologyFullName(topologyExecution)
-
- if(topologyExecution.getMode.equalsIgnoreCase(EagleConfigConstants.LOCAL_MODE)) {
- stopLocal(name, topologyExecution)
- } else {
- getNimbusClient(config).getClient.killTopology(name)
- topologyExecution.setStatus(TopologyExecutionStatus.STOPPING)
- //topologyExecution.setDescription("")
- }
- }
-
- def stopLocal(name: String, topologyExecution: TopologyExecutionEntity): Unit = {
- val taskWorker = ApplicationManager.stop(name)
- topologyExecution.setStatus(ApplicationManager.getWorkerStatus(taskWorker.getState))
- topologyExecution.setDescription(s"topology status is ${taskWorker.getState}")
- /*try{
- ApplicationManager.remove(name)
- } catch {
- case ex: IllegalArgumentException =>
- LOG.warn(s"ApplicationManager.remove($name) failed as it has been removed")
- }*/
- }
-
-
- def getTopology(topologyName: String, config: Config) = {
- val topologySummery = getNimbusClient(config).getClient.getClusterInfo.get_topologies
- JavaConversions.collectionAsScalaIterable(topologySummery).find { t => t.get_name.equals(topologyName) }
- match {
- case Some(t) => Some(t)
- case None => None
- }
- }
-
- override def status(topologyExecution: TopologyExecutionEntity, config: Config): Unit = {
- val name: String = ApplicationManagerUtils.generateTopologyFullName(topologyExecution)
-
- if(topologyExecution.getMode.equalsIgnoreCase(EagleConfigConstants.LOCAL_MODE)) {
- statusLocal(name, topologyExecution)
- } else {
- val topology = getTopology(name, config)
- topology match {
- case Some(topology) =>
- topologyExecution.setStatus(ApplicationManager.getTopologyStatus(topology.get_status()))
- topologyExecution.setUrl(ApplicationManagerUtils.buildStormTopologyURL(config, topology.get_id()))
- topologyExecution.setDescription(topology.toString)
- case None =>
- topologyExecution.setStatus(TopologyExecutionStatus.STOPPED)
- topologyExecution.setUrl("")
- topologyExecution.setDescription("")
- }
- }
- }
-
- def statusLocal(name: String, topologyExecution: TopologyExecutionEntity): Unit = {
- try {
- val currentStatus = topologyExecution.getStatus()
- val newStatus = ApplicationManager.getWorkerStatus(ApplicationManager.get(name).getState())
- if (!currentStatus.equals(newStatus)) {
- LOG.info("Status of topology: %s changed from %s to %s".format(topologyExecution.getFullName, currentStatus, newStatus))
- topologyExecution.setStatus(newStatus)
- topologyExecution.setDescription(String.format("Status of topology: %s changed from %s to %s", name, currentStatus, newStatus))
- } else if(currentStatus.equalsIgnoreCase(TopologyExecutionStatus.STOPPED)) {
- ApplicationManager.remove(name)
- }
- }catch {
- case ex: Throwable =>
- topologyExecution.setDescription("")
- topologyExecution.setStatus(TopologyExecutionStatus.STOPPED)
- }
- }
-
- override def status(topologyExecutions: java.util.List[TopologyExecutionEntity], config: Config): Unit = {
- JavaConversions.collectionAsScalaIterable(topologyExecutions) foreach {
- topologyExecution => status(topologyExecution, config)
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/scheduler/AppCommandExecutor.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/scheduler/AppCommandExecutor.scala b/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/scheduler/AppCommandExecutor.scala
deleted file mode 100644
index 8fbf60d..0000000
--- a/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/scheduler/AppCommandExecutor.scala
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.eagle.stream.application.scheduler
-
-import java.util.concurrent.Callable
-
-import akka.actor.{Actor, ActorLogging}
-import akka.dispatch.Futures
-import com.typesafe.config.{Config, ConfigFactory, ConfigParseOptions, ConfigSyntax}
-import org.apache.eagle.common.config.EagleConfigConstants
-import org.apache.eagle.service.application.AppManagerConstants
-import org.apache.eagle.service.application.entity.TopologyOperationEntity.OPERATION
-import org.apache.eagle.service.application.entity.{TopologyExecutionEntity, TopologyExecutionStatus, TopologyOperationEntity}
-import org.apache.eagle.stream.application.{ApplicationSchedulerAsyncDAO, ExecutionPlatformFactory}
-
-import scala.collection.JavaConversions
-import scala.util.{Failure, Success}
-
-
-private[scheduler] class AppCommandExecutor extends Actor with ActorLogging {
- @volatile var _config: Config = _
- @volatile var _dao: ApplicationSchedulerAsyncDAO = _
-
- import context.dispatcher
-
- def start(topologyExecution: TopologyExecutionEntity, topologyOperation: TopologyOperationEntity) = {
- val options: ConfigParseOptions = ConfigParseOptions.defaults.setSyntax(ConfigSyntax.PROPERTIES).setAllowMissing(false)
- _dao.loadTopologyDescriptionByName(topologyOperation.getSite, topologyOperation.getApplication, topologyOperation.getTopology) onComplete {
- case Success(topology) =>
- val topologyConfig: Config = ConfigFactory.parseString(topology.getContext, options)
-
- if(!topologyConfig.hasPath(EagleConfigConstants.APP_CONFIG)) {
- topologyOperation.setMessage("Fail to detect topology configuration")
- topologyOperation.setStatus(TopologyOperationEntity.OPERATION_STATUS.FAILED)
- _dao.updateOperationStatus(topologyOperation)
- } else {
- val config = topologyConfig.getConfig(EagleConfigConstants.APP_CONFIG).withFallback(_config)
- val clusterType = if(config.hasPath(AppManagerConstants.CLUSTER_ENV)) config.getString(AppManagerConstants.CLUSTER_ENV) else AppManagerConstants.EAGLE_CLUSTER_STORM
- topologyExecution.setEnvironment(clusterType)
-
- Futures.future(new Callable[TopologyExecutionEntity]{
- override def call(): TopologyExecutionEntity = {
- topologyExecution.setStatus(TopologyExecutionStatus.STARTING)
- _dao.updateTopologyExecutionStatus(topologyExecution)
- ExecutionPlatformFactory.getApplicationManager(clusterType).start(topology, topologyExecution, config)
- topologyOperation.setStatus(TopologyOperationEntity.OPERATION_STATUS.SUCCESS)
- topologyExecution
- }
- }, context.dispatcher) onComplete {
- case Success(topologyExecutionEntity) =>
- topologyOperation.setStatus(TopologyOperationEntity.OPERATION_STATUS.SUCCESS)
- updateStatus(topologyExecution, topologyOperation)
- case Failure(ex) =>
- topologyOperation.setMessage(ex.getMessage)
- topologyOperation.setStatus(TopologyOperationEntity.OPERATION_STATUS.FAILED)
- _dao.updateOperationStatus(topologyOperation)
- }
- }
-
- case Failure(ex) =>
- topologyOperation.setMessage(ex.getMessage)
- topologyOperation.setStatus(TopologyOperationEntity.OPERATION_STATUS.FAILED)
- _dao.updateOperationStatus(topologyOperation)
- }
- }
-
- def stop(topologyExecution: TopologyExecutionEntity, topologyOperation: TopologyOperationEntity) = {
- val clusterType = topologyExecution.getEnvironment
-
- Futures.future(new Callable[TopologyExecutionEntity]{
- override def call(): TopologyExecutionEntity = {
- topologyExecution.setStatus(TopologyExecutionStatus.STOPPING)
- _dao.updateTopologyExecutionStatus(topologyExecution)
- ExecutionPlatformFactory.getApplicationManager(clusterType).stop(topologyExecution, _config)
- topologyOperation.setStatus(TopologyOperationEntity.OPERATION_STATUS.SUCCESS)
- topologyExecution
- }
- }, context.dispatcher) onComplete {
- case Success(topologyExecutionEntity) =>
- topologyOperation.setStatus(TopologyOperationEntity.OPERATION_STATUS.SUCCESS)
- updateStatus(topologyExecution, topologyOperation)
- case Failure(ex) =>
- topologyOperation.setMessage(ex.toString)
- topologyOperation.setStatus(TopologyOperationEntity.OPERATION_STATUS.FAILED)
- _dao.updateOperationStatus(topologyOperation)
- }
- }
-
- def status(topologyExecution: TopologyExecutionEntity) = {
- val clusterType = topologyExecution.getEnvironment
-
- Futures.future(new Callable[TopologyExecutionEntity]{
- override def call(): TopologyExecutionEntity = {
- ExecutionPlatformFactory.getApplicationManager(clusterType).status(topologyExecution, _config)
- topologyExecution
- }
- }, context.dispatcher) onComplete {
- case _ =>
- _dao.updateTopologyExecutionStatus(topologyExecution)
- }
- }
-
- def updateStatus(topologyExecution: TopologyExecutionEntity, topologyOperation: TopologyOperationEntity): Unit = {
- _dao.updateOperationStatus(topologyOperation)
- _dao.updateTopologyExecutionStatus(topologyExecution)
- }
-
- def execute(topologyExecution: TopologyExecutionEntity, topologyOperation: TopologyOperationEntity): Unit = {
- try {
- topologyOperation.getOperation match {
- case OPERATION.START =>
- start(topologyExecution, topologyOperation)
- case OPERATION.STOP =>
- stop(topologyExecution, topologyOperation)
- case m@_ =>
- log.warning("Unsupported operation: " + topologyOperation)
- throw new Exception(s"Unsupported operation: ${topologyOperation.getOperation}, possible values are START/STOP")
- }
- } catch {
- case e: Throwable =>
- topologyOperation.setMessage(e.getMessage)
- topologyOperation.setStatus(TopologyOperationEntity.OPERATION_STATUS.FAILED)
- _dao.updateOperationStatus(topologyOperation)
- }
- }
-
- override def receive = {
- case InitializationEvent(config: Config) =>
- _config = config
- _dao = new ApplicationSchedulerAsyncDAO(config, context.dispatcher)
- case SchedulerCommand(topologyExecution, topologyOperation) =>
- execute(topologyExecution, topologyOperation)
- case HealthCheckerEvent =>
- _dao.loadAllTopologyExecutionEntities() onComplete {
- case Success(topologyExecutions) =>
- log.info(s"Load ${topologyExecutions.size()} topologies in execution")
- JavaConversions.collectionAsScalaIterable(topologyExecutions) foreach { topologyExecution =>
- try{
- status(topologyExecution)
- } catch {
- case ex: Throwable =>
- log.error(ex.getMessage)
- }
- }
- case Failure(ex) =>
- log.error(s"Fail to load any topologyExecutionEntity due to Exception: ${ex.getMessage}")
- }
- case TerminatedEvent =>
- context.stop(self)
- case m@_ =>
- log.warning("Unsupported operation $m")
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/scheduler/AppCommandLoader.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/scheduler/AppCommandLoader.scala b/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/scheduler/AppCommandLoader.scala
deleted file mode 100644
index c731846..0000000
--- a/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/scheduler/AppCommandLoader.scala
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.eagle.stream.application.scheduler
-
-import akka.actor.{Actor, ActorLogging}
-import com.typesafe.config.Config
-import org.apache.eagle.service.application.entity.TopologyOperationEntity.OPERATION_STATUS
-import org.apache.eagle.stream.application.ApplicationSchedulerAsyncDAO
-
-import scala.collection.JavaConversions
-import scala.util.{Failure, Success}
-
-
-private[scheduler] class AppCommandLoader extends Actor with ActorLogging {
- @volatile var _config: Config = null
- @volatile var _dao: ApplicationSchedulerAsyncDAO = null
-
- import context.dispatcher
-
- override def receive = {
- case InitializationEvent(config: Config) =>
- _config = config
- _dao = new ApplicationSchedulerAsyncDAO(config, context.dispatcher)
- case ClearPendingOperation =>
- if(_dao == null) _dao = new ApplicationSchedulerAsyncDAO(_config, context.dispatcher)
- _dao.clearPendingOperations()
- case CommandLoaderEvent => {
- val _sender = sender()
- _dao.readOperationsByStatus(OPERATION_STATUS.INITIALIZED) onComplete {
- case Success(commands) => {
- log.info(s"Load ${commands.size()} new commands")
- JavaConversions.collectionAsScalaIterable(commands) foreach { command =>
- command.setStatus(OPERATION_STATUS.PENDING)
- _dao.updateOperationStatus(command) onComplete {
- case Success(response) =>
- _dao.loadTopologyExecutionByName(command.getSite, command.getApplication, command.getTopology) onComplete {
- case Success(topologyExecution) => {
- _sender ! SchedulerCommand(topologyExecution, command)
- }
- case Failure(ex) =>
- log.error(ex.getMessage)
- command.setMessage(ex.getMessage)
- command.setStatus(OPERATION_STATUS.FAILED)
- _dao.updateOperationStatus(command)
- }
- case Failure(ex) =>
- log.error(s"Got an exception to update command status $command: ${ex.getMessage}")
- command.setMessage(ex.getMessage)
- command.setStatus(OPERATION_STATUS.FAILED)
- _dao.updateOperationStatus(command)
- }
- }
- }
- case Failure(ex) =>
- log.error(s"Failed to get commands due to exception ${ex.getMessage}")
- }
- }
- case TerminatedEvent =>
- context.stop(self)
- case m@_ => throw new UnsupportedOperationException(s"Event is not supported $m")
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/scheduler/ApplicationScheduler.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/scheduler/ApplicationScheduler.scala b/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/scheduler/ApplicationScheduler.scala
deleted file mode 100644
index 476a3fb..0000000
--- a/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/scheduler/ApplicationScheduler.scala
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.eagle.stream.application.scheduler
-
-import akka.actor.{ActorSystem, Props}
-import com.typesafe.config.Config
-import org.apache.eagle.service.application.AppManagerConstants
-import org.apache.eagle.service.application.entity.{TopologyExecutionEntity, TopologyOperationEntity}
-import org.apache.eagle.stream.application.ApplicationManager
-
-import scala.concurrent.duration._
-
-
-private[scheduler] class ScheduleEvent
-private[scheduler] case class InitializationEvent(config: Config) extends ScheduleEvent
-private[scheduler] case class TerminatedEvent() extends ScheduleEvent
-private[scheduler] case class CommandLoaderEvent() extends ScheduleEvent
-private[scheduler] case class HealthCheckerEvent() extends ScheduleEvent
-private[scheduler] case class ClearPendingOperation() extends ScheduleEvent
-private[scheduler] case class SchedulerCommand(topologyExecution: TopologyExecutionEntity, topologyOperation: TopologyOperationEntity) extends ScheduleEvent
-
-case class EagleServiceUnavailableException(message:String) extends Exception(message)
-case class DuplicatedDefinitionException(message:String) extends Exception(message)
-case class LoadTopologyFailureException(message:String) extends Exception(message)
-
-
-/**
- * 1. Sync command from eagle service
- * 2. Coordinate command to different actor
- * 3. Actor execute command as requested
- */
-class ApplicationScheduler {
- //val config = ConfigFactory.load()
- val DEFAULT_COMMAND_LOADER_INTERVAL_SECS = 2
- val DEFAULT_HEALTH_CHECK_INTERVAL_SECS = 5
-
- def start(config: Config) = {
- val system = ActorSystem("application-manager-scheduler", config)
- system.log.info(s"Started actor system: $system")
-
- import system.dispatcher
-
- val commandLoaderIntervalSecs: Long = if(config.hasPath(AppManagerConstants.APP_COMMAND_LOADER_INTERVAL_SECS)) config.getLong(AppManagerConstants.APP_COMMAND_LOADER_INTERVAL_SECS) else DEFAULT_COMMAND_LOADER_INTERVAL_SECS
- val healthCheckIntervalSecs: Long = if(config.hasPath(AppManagerConstants.APP_HEALTH_CHECK_INTERVAL_SECS)) config.getLong(AppManagerConstants.APP_HEALTH_CHECK_INTERVAL_SECS) else DEFAULT_HEALTH_CHECK_INTERVAL_SECS
-
- val coordinator = system.actorOf(Props[StreamAppCoordinator])
- system.scheduler.scheduleOnce(0 seconds, coordinator, InitializationEvent(config))
- system.scheduler.scheduleOnce(1 seconds, coordinator, ClearPendingOperation)
- system.scheduler.schedule(2.seconds, commandLoaderIntervalSecs.seconds, coordinator, CommandLoaderEvent)
- system.scheduler.schedule(10.seconds, healthCheckIntervalSecs.seconds, coordinator, HealthCheckerEvent)
-
- /*
- registerOnTermination is called when you have shut down the ActorSystem (system.shutdown),
- and the callbacks will be executed after all actors have been stopped.
- */
- system.registerOnTermination(new Runnable {
- override def run(): Unit = {
- coordinator ! TerminatedEvent
- ApplicationManager.stopAll()
- }
- })
- system
- }
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/scheduler/StreamAppCoordinator.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/scheduler/StreamAppCoordinator.scala b/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/scheduler/StreamAppCoordinator.scala
deleted file mode 100644
index 17006ee..0000000
--- a/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/scheduler/StreamAppCoordinator.scala
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.eagle.stream.application.scheduler
-
-import akka.actor.{Actor, ActorLogging, ActorRef, Props}
-
-private[scheduler] class StreamAppCoordinator extends Actor with ActorLogging {
- var commandLoader: ActorRef = null
- var commandExecutor: ActorRef = null
-
-
- override def preStart(): Unit = {
- commandLoader = context.actorOf(Props[AppCommandLoader], "command-loader")
- commandExecutor = context.actorOf(Props[AppCommandExecutor], "command-worker")
- }
-
- override def receive = {
- case InitializationEvent(config) => {
- log.info(s"Config updated: $config")
- commandLoader ! InitializationEvent(config)
- commandExecutor ! InitializationEvent(config)
- }
- case ClearPendingOperation =>
- commandLoader ! ClearPendingOperation
- case CommandLoaderEvent =>
- commandLoader ! CommandLoaderEvent
- case command: SchedulerCommand =>
- log.info(s"Executing command: $SchedulerCommand")
- commandExecutor ! command
- case HealthCheckerEvent =>
- commandExecutor ! HealthCheckerEvent
- case TerminatedEvent =>
- log.info("Coordinator exit ...")
- context.stop(self)
- case m@_ =>
- log.warning(s"Coordinator Unsupported message: $m")
- }
-}