You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2016/07/22 16:54:13 UTC
[1/3] incubator-eagle git commit: [EAGLE-386] Refactor Application
Framework Interfaces and StreamEventMapper
Repository: incubator-eagle
Updated Branches:
refs/heads/develop e21b073f6 -> e73e35da0
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/eagle-server/src/main/resources/providers-disabled.xml
----------------------------------------------------------------------
diff --git a/eagle-server/src/main/resources/providers-disabled.xml b/eagle-server/src/main/resources/providers-disabled.xml
new file mode 100644
index 0000000..b434715
--- /dev/null
+++ b/eagle-server/src/main/resources/providers-disabled.xml
@@ -0,0 +1,27 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<providers>
+ <provider>
+ <jarPath>target/apache-eagle-example-app.jar</jarPath>
+ <className>org.apache.eagle.app.example.ExampleApplicationProvider</className>
+ </provider>
+ <provider>
+ <jarPath>target/apache-eagle-example-app.jar</jarPath>
+ <className>org.apache.eagle.app.example.ExampleApplicationProvider2</className>
+ </provider>
+</providers>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/eagle-server/src/main/resources/providers.xml
----------------------------------------------------------------------
diff --git a/eagle-server/src/main/resources/providers.xml b/eagle-server/src/main/resources/providers.xml
deleted file mode 100644
index b434715..0000000
--- a/eagle-server/src/main/resources/providers.xml
+++ /dev/null
@@ -1,27 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one or more
- ~ contributor license agreements. See the NOTICE file distributed with
- ~ this work for additional information regarding copyright ownership.
- ~ The ASF licenses this file to You under the Apache License, Version 2.0
- ~ (the "License"); you may not use this file except in compliance with
- ~ the License. You may obtain a copy of the License at
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing, software
- ~ distributed under the License is distributed on an "AS IS" BASIS,
- ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ~ See the License for the specific language governing permissions and
- ~ limitations under the License.
- -->
-<providers>
- <provider>
- <jarPath>target/apache-eagle-example-app.jar</jarPath>
- <className>org.apache.eagle.app.example.ExampleApplicationProvider</className>
- </provider>
- <provider>
- <jarPath>target/apache-eagle-example-app.jar</jarPath>
- <className>org.apache.eagle.app.example.ExampleApplicationProvider2</className>
- </provider>
-</providers>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/eagle-server/src/main/webapp/WEB-INF/web.xml
----------------------------------------------------------------------
diff --git a/eagle-server/src/main/webapp/WEB-INF/web.xml b/eagle-server/src/main/webapp/WEB-INF/web.xml
new file mode 100644
index 0000000..9b8c5ac
--- /dev/null
+++ b/eagle-server/src/main/webapp/WEB-INF/web.xml
@@ -0,0 +1,88 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<web-app xmlns="http://java.sun.com/xml/ns/javaee"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://java.sun.com/xml/ns/javaee
+ http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd"
+ version="3.0">
+ <welcome-file-list>
+ <welcome-file>index.html</welcome-file>
+ </welcome-file-list>
+
+
+ <servlet>
+ <servlet-name>Jersey Web Application</servlet-name>
+ <servlet-class>com.sun.jersey.spi.container.servlet.ServletContainer</servlet-class>
+ <init-param>
+ <param-name>com.sun.jersey.config.property.packages</param-name>
+ <param-value>io.swagger.jaxrs.json,io.swagger.jaxrs.listing,org.apache.eagle,org.codehaus.jackson.jaxrs</param-value>
+ </init-param>
+ <init-param>
+ <param-name>com.sun.jersey.spi.container.ContainerRequestFilters</param-name>
+ <param-value>com.sun.jersey.api.container.filter.GZIPContentEncodingFilter;com.sun.jersey.api.container.filter.PostReplaceFilter</param-value>
+ </init-param>
+ <init-param>
+ <param-name>com.sun.jersey.spi.container.ContainerResponseFilters</param-name>
+ <param-value>com.sun.jersey.api.container.filter.GZIPContentEncodingFilter</param-value>
+ </init-param>
+ <load-on-startup>1</load-on-startup>
+ </servlet>
+ <!-- Servlet for swagger initialization only, no URL mapping. -->
+ <servlet>
+ <servlet-name>swaggerConfig</servlet-name>
+ <servlet-class>io.swagger.jaxrs.config.DefaultJaxrsConfig</servlet-class>
+ <init-param>
+ <param-name>api.version</param-name>
+ <param-value>1.0.0</param-value>
+ </init-param>
+ <init-param>
+ <param-name>swagger.api.basepath</param-name>
+ <param-value>/rest</param-value>
+ </init-param>
+ <load-on-startup>2</load-on-startup>
+ </servlet>
+ <servlet-mapping>
+ <servlet-name>Jersey Web Application</servlet-name>
+ <url-pattern>/rest/*</url-pattern>
+ </servlet-mapping>
+ <filter>
+ <filter-name>CorsFilter</filter-name>
+ <!-- this should be replaced by tomcat ones, see also metadata resource -->
+ <filter-class>org.apache.eagle.alert.resource.SimpleCORSFiler</filter-class>
+ <init-param>
+ <param-name>cors.allowed.origins</param-name>
+ <param-value>*</param-value>
+ </init-param>
+ <init-param>
+ <param-name>cors.allowed.headers</param-name>
+ <param-value>Authorization,Origin, No-Cache, X-Requested-With, If-Modified-Since, Pragma, Last-Modified, Cache-Control, Expires, Content-Type, X-E4M-With, Accept</param-value>
+ </init-param>
+ <init-param>
+ <param-name>cors.allowed.methods</param-name>
+ <param-value>GET,POST,HEAD,OPTIONS,PUT,DELETE</param-value>
+ </init-param>
+ <init-param>
+ <param-name>cors.support.credentials</param-name>
+ <param-value>true</param-value>
+ </init-param>
+ </filter>
+ <filter-mapping>
+ <filter-name>CorsFilter</filter-name>
+ <url-pattern>/*</url-pattern>
+ </filter-mapping>
+</web-app>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index d7d0ccb..ccae27f 100755
--- a/pom.xml
+++ b/pom.xml
@@ -809,6 +809,17 @@
</exclusions>
</dependency>
<dependency>
+ <groupId>io.dropwizard</groupId>
+ <artifactId>dropwizard-servlets</artifactId>
+ <version>${dropwizard.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
<groupId>io.swagger</groupId>
<artifactId>swagger-jersey-jaxrs</artifactId>
<version>${swagger.version}</version>
[3/3] incubator-eagle git commit: [EAGLE-386] Refactor Application
Framework Interfaces and StreamEventMapper
Posted by ha...@apache.org.
[EAGLE-386] Refactor Application Framework Interfaces and StreamEventMapper
[Description]
h1. Application Framework Interfaces
Application Context (Runtime): org.apache.eagle.app.ApplicationContext
Application Metadata Entity (Persistence): org.apache.eagle.metadata.model.ApplicationEntity
Application Processing Logic (Execution): org.apache.eagle.app.Application
Application Lifecycle Listener (Installation): org.apache.eagle.app.ApplicationLifecycleListener
h1. StreamEventMapper (Flatten/Direct)
* FlattenEventMapper
* DirectEventMapper
h1. Metadata.xml Path Best Practice
/META-INF/apps/example/metadata.xml
[Changes]
Decouple Tuple to Event mapping with StreamEventMapper
Rename EventMapper and Application#buildApp
Integrate with ApplicationContext with ApplicationLifecycleListener to make sure callback onAppInstall/onAppUninstall
Clarify application framework interfaces and docs
Clarify ApplicationContext as single entrance of app metadata/processing/lifecycle
Clear application service code
Better metadata.xml practice: /META-INF/apps/example/metadata.xml
Fix ApplicationOperations json bug and MetadataStore related compile error
Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/e73e35da
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/e73e35da
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/e73e35da
Branch: refs/heads/develop
Commit: e73e35da0fa778564fb23582478541fe390f2a0a
Parents: e21b073
Author: Chen, Hao <hc...@ebay.com>
Authored: Fri Jul 22 17:27:28 2016 +0800
Committer: Hao Chen <ha...@apache.org>
Committed: Sat Jul 23 00:50:40 2016 +0800
----------------------------------------------------------------------
.../apache/eagle/app/AbstractApplication.java | 4 +-
.../java/org/apache/eagle/app/Application.java | 2 +-
.../apache/eagle/app/ApplicationContext.java | 217 +++++++++++++++----
.../eagle/app/ApplicationGuiceModule.java | 4 +-
.../eagle/app/ApplicationLifecycleListener.java | 39 ++++
.../java/org/apache/eagle/app/package-info.java | 29 +++
.../eagle/app/resource/ApplicationResource.java | 14 +-
.../apache/eagle/app/service/AppOperations.java | 161 --------------
.../service/ApplicationManagementService.java | 8 +-
.../ApplicationManagementServiceImpl.java | 96 --------
.../app/service/ApplicationOperations.java | 189 ++++++++++++++++
.../app/service/ApplicationProviderLoader.java | 4 +-
.../service/ApplicationProviderServiceImpl.java | 87 --------
.../impl/ApplicationManagementServiceImpl.java | 98 +++++++++
.../impl/ApplicationProviderConfigLoader.java | 128 +++++++++++
.../impl/ApplicationProviderSPILoader.java | 89 ++++++++
.../impl/ApplicationProviderServiceImpl.java | 89 ++++++++
.../loader/ApplicationProviderConfigLoader.java | 128 -----------
.../loader/ApplicationProviderSPILoader.java | 89 --------
.../eagle/app/sink/AbstractStreamSink.java | 74 +++++++
.../apache/eagle/app/sink/KafkaStreamSink.java | 39 ++--
.../eagle/app/sink/LoggingStreamSink.java | 18 +-
.../org/apache/eagle/app/sink/StreamSink.java | 125 ++++-------
.../app/sink/mapper/DirectEventMapper.java | 19 ++
.../mapper/FieldIndexDirectEventMapper.java | 40 ++++
.../sink/mapper/FieldNameDirectEventMapper.java | 40 ++++
.../app/sink/mapper/FlattenEventMapper.java | 78 +++++++
.../app/sink/mapper/StreamEventMapper.java | 33 +++
.../app/sink/mapper/TimestampSelector.java | 25 +++
.../app/spi/AbstractApplicationProvider.java | 6 +-
.../org/apache/eagle/app/test/AppSimulator.java | 51 -----
.../apache/eagle/app/test/AppSimulatorImpl.java | 8 +-
.../eagle/app/test/AppTestGuiceModule.java | 4 +-
.../eagle/app/test/ApplicationSimulator.java | 70 ++++++
.../src/main/resources/applications.xml | 42 ----
.../app/ApplicationProviderServiceTest.java | 3 +-
.../apache/eagle/app/TestApplicationImpl.java | 38 ++--
.../eagle/app/TestApplicationTestSuite.java | 14 +-
.../test/resources/TestApplicationMetadata.xml | 8 +
.../src/test/resources/application.conf | 4 +-
.../eagle-metadata/eagle-metadata-base/pom.xml | 4 +
.../eagle/metadata/model/ApplicationEntity.java | 2 +-
.../eagle/metadata/model/StreamSinkDesc.java | 40 ++++
.../persistence/MemoryMetadataStore.java | 33 ---
.../metadata/persistence/MetadataStore.java | 4 +-
.../service/memory/MemoryMetadataStore.java | 32 +++
eagle-examples/eagle-app-example/pom.xml | 6 -
.../eagle/app/example/ExampleApplication.java | 36 ++-
.../app/example/ExampleApplicationProvider.java | 5 +-
.../example/ExampleApplicationProvider2.java | 2 +-
.../eagle/app/example/RandomEventSpout.java | 59 -----
.../resources/ExampleApplicationMetadata.xml | 8 +
.../META-INF/apps/example/metadata.xml | 109 ++++++++++
.../app/example/ExampleApplicationTest.java | 28 ++-
.../src/test/java/resources/application.conf | 58 +++++
.../apache/eagle/server/ServerApplication.java | 10 +-
.../org/apache/eagle/server/ServerConfig.java | 22 +-
.../src/main/resources/application.conf | 9 +-
.../src/main/resources/providers-disabled.xml | 27 +++
eagle-server/src/main/resources/providers.xml | 27 ---
eagle-server/src/main/webapp/WEB-INF/web.xml | 88 ++++++++
pom.xml | 11 +
62 files changed, 1821 insertions(+), 1013 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/AbstractApplication.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/AbstractApplication.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/AbstractApplication.java
index 1cb2625..5001a80 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/AbstractApplication.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/AbstractApplication.java
@@ -61,7 +61,7 @@ public abstract class AbstractApplication implements Application,Serializable {
String topologyName = context.getAppEntity().getAppId();
TopologyBuilder builder = new TopologyBuilder();
- buildTopology(builder,context);
+ buildApp(builder,context);
StormTopology topology = builder.createTopology();
Config conf = getClusterStormConfig(context);
if(appEntity.getMode() == ApplicationEntity.Mode.CLUSTER){
@@ -117,7 +117,7 @@ public abstract class AbstractApplication implements Application,Serializable {
return conf;
}
- protected abstract void buildTopology(TopologyBuilder builder, ApplicationContext context);
+ protected abstract void buildApp(TopologyBuilder builder, ApplicationContext context);
@Override
public void stop(ApplicationContext context) {
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/Application.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/Application.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/Application.java
index ffe5339..4d35e08 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/Application.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/Application.java
@@ -17,7 +17,7 @@
package org.apache.eagle.app;
/**
- * Application Lifecycle Interface
+ * Application Execution Interface
*/
public interface Application {
/**
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/ApplicationContext.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/ApplicationContext.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/ApplicationContext.java
index 1723250..dc2a456 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/ApplicationContext.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/ApplicationContext.java
@@ -18,65 +18,80 @@ package org.apache.eagle.app;
import com.typesafe.config.Config;
import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.app.sink.AbstractStreamSink;
import org.apache.eagle.app.sink.StreamSink;
+import org.apache.eagle.app.sink.mapper.*;
+import org.apache.eagle.app.spi.ApplicationProvider;
import org.apache.eagle.metadata.model.ApplicationEntity;
import org.apache.eagle.metadata.model.StreamDesc;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Serializable;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
/**
- * Application Execution Context
+ * Application Context Interface: org.apache.eagle.app.ApplicationContext
+ *
+ * <ul>
+ * <li>Application Metadata Entity (Persistence): org.apache.eagle.metadata.model.ApplicationEntity</li>
+ * <li>Application Processing Logic (Execution): org.apache.eagle.app.Application</li>
+ * <li>Application Lifecycle Listener (Installation): org.apache.eagle.app.ApplicationLifecycleListener</li>
+ * </ul>
*/
-public class ApplicationContext implements Serializable {
+public class ApplicationContext implements Serializable, ApplicationLifecycleListener{
private final Config envConfig;
private final ApplicationEntity appEntity;
private final static Logger LOG = LoggerFactory.getLogger(ApplicationContext.class);
- private final Map<String, StreamSink> streamSinkMap;
+ private final Map<String, StreamDefinition> streamDefinitionMap;
private final Map<String,StreamDesc> streamDescMap;
+ private final List<ApplicationLifecycleListener> applicationLifecycleListeners;
+ private final ApplicationProvider appProvider;
- public ApplicationContext(ApplicationEntity appEntity, Config envConfig){
+ /**
+ * @param appEntity ApplicationEntity
+ * @param appProvider ApplicationProvider
+ * @param envConfig Config
+ */
+ public ApplicationContext(ApplicationEntity appEntity, ApplicationProvider appProvider, Config envConfig){
this.appEntity = appEntity;
this.envConfig = envConfig;
- this.streamSinkMap = new HashMap<>();
+ this.appProvider = appProvider;
+ this.streamDefinitionMap = new HashMap<>();
this.streamDescMap = new HashMap<>();
-
- // TODO: Decouple out of ApplicationContext constructor
+ this.applicationLifecycleListeners = new LinkedList<>();
doInit();
}
+ public void registerListener(ApplicationLifecycleListener listener){
+ applicationLifecycleListeners.add(listener);
+ }
+
+ public List<ApplicationLifecycleListener> getListeners(){
+ return applicationLifecycleListeners;
+ }
+
private void doInit() {
- try {
- Class<?> sinkClass = appEntity.getDescriptor().getSinkClass();
- List<StreamDefinition> outputStreams = appEntity.getDescriptor().getStreams();
- if(null != outputStreams){
- Constructor constructor = sinkClass.getConstructor(StreamDefinition.class,ApplicationContext.class);
- outputStreams.forEach((stream) -> {
- try {
- StreamSink streamSink = (StreamSink) constructor.newInstance(stream,this);
- streamSinkMap.put(stream.getStreamId(), streamSink);
- StreamDesc streamDesc = new StreamDesc();
- streamDesc.setStreamSchema(stream);
- streamDesc.setSinkContext(streamSink.getSinkContext());
- streamDesc.setSinkType(sinkClass);
- streamDesc.setStreamId(stream.getStreamId());
- streamDescMap.put(streamDesc.getStreamId(),streamDesc);
- } catch (InstantiationException | IllegalAccessException | InvocationTargetException e) {
- LOG.error("Failed to initialize instance "+sinkClass.getCanonicalName()+" for application: {}",this.getAppEntity());
- throw new RuntimeException("Failed to initialize instance "+sinkClass.getCanonicalName()+" for application:"+this.getAppEntity(),e);
- }
- });
- }
- } catch (NoSuchMethodException e) {
- LOG.error(e.getMessage(),e);
- throw new RuntimeException(e.getMessage(),e.getCause());
+ Class<?> sinkClass = appEntity.getDescriptor().getSinkClass();
+ List<StreamDefinition> outputStreams = appEntity.getDescriptor().getStreams();
+ if(null != outputStreams){
+ outputStreams.forEach((stream) -> {
+ try {
+ StreamSink streamSink = (StreamSink) sinkClass.newInstance();
+ streamSink.init(stream,this);
+ StreamDesc streamDesc = new StreamDesc();
+ streamDesc.setStreamSchema(stream);
+ streamDesc.setSinkContext(streamSink.getSinkContext());
+ streamDesc.setSinkType(sinkClass);
+ streamDesc.setStreamId(stream.getStreamId());
+ streamDescMap.put(streamDesc.getStreamId(),streamDesc);
+ streamDefinitionMap.put(streamDesc.getStreamId(),stream);
+ registerListener(streamSink);
+ } catch (InstantiationException | IllegalAccessException e) {
+ LOG.error("Failed to initialize instance "+sinkClass.getCanonicalName()+" for application: {}",this.getAppEntity());
+ throw new RuntimeException("Failed to initialize instance "+sinkClass.getCanonicalName()+" for application:"+this.getAppEntity(),e);
+ }
+ });
}
}
@@ -94,15 +109,133 @@ public class ApplicationContext implements Serializable {
* @param streamId
* @return
*/
- public StreamSink getStreamSink(String streamId){
- if(streamSinkMap.containsKey(streamId)) {
- return streamSinkMap.get(streamId);
- } else {
- throw new IllegalStateException("Stream (ID: "+streamId+") was not declared in "+this.appEntity.getDescriptor().getProviderClass().getCanonicalName()+" before being called");
+ public StreamSink getFlattenStreamSink(String streamId, StreamEventMapper mapper){
+ checkStreamExists(streamId);
+ Class<?> sinkClass = appEntity.getDescriptor().getSinkClass();
+ try {
+ AbstractStreamSink abstractStreamSink = (AbstractStreamSink) sinkClass.newInstance();
+ abstractStreamSink.setEventMapper(mapper);
+ abstractStreamSink.init(streamDefinitionMap.get(streamId),this);
+ return abstractStreamSink;
+ } catch (InstantiationException | IllegalAccessException e) {
+ LOG.error("Failed to instantiate "+sinkClass,e);
+ throw new IllegalStateException("Failed to instantiate "+sinkClass,e);
+ }
+ }
+
+ /**
+ * Make sure streamId is declared in Application Providers
+ *
+ * @param streamId
+ * @return
+ */
+ public StreamSink getDirectStreamSink(String streamId, String ... fieldNames){
+ return getFlattenStreamSink(streamId,new FieldNameDirectEventMapper(fieldNames));
+ }
+
+ /**
+ * Make sure streamId is declared in Application Providers
+ *
+ * @param streamId
+ * @return
+ */
+ public StreamSink getDirectStreamSink(String streamId, int ... fieldIndexs){
+ return getFlattenStreamSink(streamId,new FieldIndexDirectEventMapper(fieldIndexs));
+ }
+
+ /**
+ * Make sure streamId is declared in Application Providers
+ *
+ * @param streamId
+ * @return
+ */
+ public StreamSink getFlattenStreamSink(String streamId, TimestampSelector timestampSelector){
+ checkStreamExists(streamId);
+ return getFlattenStreamSink(streamId,new FlattenEventMapper(streamDefinitionMap.get(streamId),timestampSelector));
+ }
+
+ /**
+ * Make sure streamId is declared in Application Providers
+ *
+ * @param streamId
+ * @return
+ */
+ public StreamSink getFlattenStreamSink(String streamId, String timestampField){
+ checkStreamExists(streamId);
+ return getFlattenStreamSink(streamId,new FlattenEventMapper(streamDefinitionMap.get(streamId),timestampField));
+ }
+
+ /**
+ * Make sure streamId is declared in Application Providers
+ *
+ * @param streamId
+ * @return
+ */
+ public StreamSink getFlattenStreamSink(String streamId){
+ checkStreamExists(streamId);
+ return getFlattenStreamSink(streamId,new FlattenEventMapper(streamDefinitionMap.get(streamId)));
+ }
+
+ private void checkStreamExists(String streamId){
+ if(! streamDefinitionMap.containsKey(streamId)){
+ LOG.error("Stream [streamId = "+streamId+"] is not defined in "
+ + appEntity.getDescriptor().getProviderClass().getCanonicalName());
+ throw new IllegalStateException("Stream [streamId = "+streamId+"] is not defined in "
+ + appEntity.getDescriptor().getProviderClass().getCanonicalName());
}
}
public Collection<StreamDesc> getStreamSinkDescs(){
return streamDescMap.values();
}
+
+ @Override
+ public void onAppInstall() {
+ getListeners().forEach((listener)->{
+ try {
+ listener.onAppInstall();
+ }catch (Throwable throwable){
+ LOG.error("Failed to invoked onAppInstall of listener {}",listener.toString(),throwable);
+ throw throwable;
+ }
+ });
+ }
+
+ @Override
+ public void onAppUninstall() {
+ getListeners().forEach((listener)->{
+ try {
+ listener.onAppUninstall();
+ }catch (Throwable throwable){
+ LOG.error("Failed to invoked onAppUninstall of listener {}",listener.toString(),throwable);
+ throw throwable;
+ }
+ });
+ }
+
+ @Override
+ public void onAppStart() {
+ getListeners().forEach((listener)->{
+ try {
+ listener.onAppStart();
+ }catch (Throwable throwable){
+ LOG.error("Failed to invoked onAppStart of listener {}",listener.toString(),throwable);
+ throw throwable;
+ }
+ });
+ appProvider.getApplication().start(this);
+ }
+
+ @Override
+ public void onAppStop() {
+ appProvider.getApplication().stop(this);
+ getListeners().forEach((listener)->{
+ try {
+ listener.onAppStop();
+ }catch (Throwable throwable){
+ LOG.error("Failed to invoked onAppStop of listener {}",listener.toString(),throwable);
+ throw throwable;
+ }
+ });
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/ApplicationGuiceModule.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/ApplicationGuiceModule.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/ApplicationGuiceModule.java
index 9a66b6c..ca3b8f2 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/ApplicationGuiceModule.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/ApplicationGuiceModule.java
@@ -19,9 +19,9 @@ package org.apache.eagle.app;
import com.google.inject.AbstractModule;
import com.google.inject.Singleton;
import org.apache.eagle.app.service.ApplicationManagementService;
-import org.apache.eagle.app.service.ApplicationManagementServiceImpl;
+import org.apache.eagle.app.service.impl.ApplicationManagementServiceImpl;
import org.apache.eagle.app.service.ApplicationProviderService;
-import org.apache.eagle.app.service.ApplicationProviderServiceImpl;
+import org.apache.eagle.app.service.impl.ApplicationProviderServiceImpl;
import org.apache.eagle.metadata.service.ApplicationDescService;
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/ApplicationLifecycleListener.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/ApplicationLifecycleListener.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/ApplicationLifecycleListener.java
new file mode 100644
index 0000000..6ced960
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/ApplicationLifecycleListener.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app;
+
+public interface ApplicationLifecycleListener {
+ /**
+ * on application installed
+ */
+ void onAppInstall();
+
+ /**
+ * on application uninstalled
+ */
+ void onAppUninstall();
+
+ /**
+ *
+ */
+ void onAppStart();
+
+ /**
+ *
+ */
+ void onAppStop();
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/package-info.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/package-info.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/package-info.java
new file mode 100644
index 0000000..8f1fcb9
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/package-info.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ *
+ * <h1>Application Management Framework Interfaces</h1>
+ *
+ * <ul>
+ * <li>Application Context (Runtime): org.apache.eagle.app.ApplicationContext</li>
+ * <li>Application Metadata Entity (Persistence): org.apache.eagle.metadata.model.ApplicationEntity</li>
+ * <li>Application Processing Logic (Execution): org.apache.eagle.app.Application</li>
+ * <li>Application Lifecycle Listener (Callback): org.apache.eagle.app.ApplicationLifecycleListener</li>
+ * </ul>
+ */
+package org.apache.eagle.app;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/resource/ApplicationResource.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/resource/ApplicationResource.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/resource/ApplicationResource.java
index a0654ff..b0e9988 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/resource/ApplicationResource.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/resource/ApplicationResource.java
@@ -19,7 +19,7 @@ package org.apache.eagle.app.resource;
import com.google.inject.Inject;
import org.apache.eagle.app.service.ApplicationManagementService;
-import org.apache.eagle.app.service.AppOperations;
+import org.apache.eagle.app.service.ApplicationOperations;
import org.apache.eagle.app.service.ApplicationProviderService;
import org.apache.eagle.metadata.model.ApplicationDesc;
import org.apache.eagle.metadata.model.ApplicationEntity;
@@ -59,7 +59,7 @@ public class ApplicationResource {
return providerService.getApplicationDescByType(type);
}
- @POST
+ @PUT
@Path("/providers/reload")
@Produces(MediaType.APPLICATION_JSON)
public Collection<ApplicationDesc> reloadApplicationDescs(){
@@ -97,7 +97,7 @@ public class ApplicationResource {
@Path("/install")
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
- public ApplicationEntity installApplication(AppOperations.InstallOperation operation){
+ public ApplicationEntity installApplication(ApplicationOperations.InstallOperation operation){
return applicationManagementService.install(operation);
}
@@ -111,11 +111,11 @@ public class ApplicationResource {
*
* @param operation
*/
- @POST
+ @DELETE
@Path("/uninstall")
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
- public ApplicationEntity uninstallApplication(AppOperations.UninstallOperation operation){
+ public ApplicationEntity uninstallApplication(ApplicationOperations.UninstallOperation operation){
return applicationManagementService.uninstall(operation);
}
@@ -132,7 +132,7 @@ public class ApplicationResource {
@Path("/start")
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
- public ApplicationEntity startApplication(AppOperations.StartOperation operation){
+ public ApplicationEntity startApplication(ApplicationOperations.StartOperation operation){
return applicationManagementService.start(operation);
}
@@ -149,7 +149,7 @@ public class ApplicationResource {
@Path("/stop")
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
- public ApplicationEntity stopApplication(AppOperations.StopOperation operation){
+ public ApplicationEntity stopApplication(ApplicationOperations.StopOperation operation){
return applicationManagementService.stop(operation);
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/AppOperations.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/AppOperations.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/AppOperations.java
deleted file mode 100644
index f2395d6..0000000
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/AppOperations.java
+++ /dev/null
@@ -1,161 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.service;
-
-import org.apache.eagle.metadata.model.ApplicationEntity;
-
-import java.io.Serializable;
-import java.util.Map;
-
-public class AppOperations {
- interface Operation extends Serializable {
- //
- }
-
- public static class InstallOperation implements Operation{
- private String siteId;
- private String appType;
- private ApplicationEntity.Mode mode = ApplicationEntity.Mode.LOCAL;
- private Map<String,Object> configuration;
-
- public InstallOperation(){}
- public InstallOperation(String siteId,String appType){
- this.setSiteId(siteId);
- this.setAppType(appType);
- }
- public InstallOperation(String siteId,String appType,ApplicationEntity.Mode mode){
- this.setSiteId(siteId);
- this.setAppType(appType);
- this.setMode(mode);
- }
- public InstallOperation(String siteId,String appType,ApplicationEntity.Mode mode,Map<String,Object> configuration){
- this.setSiteId(siteId);
- this.setAppType(appType);
- this.setMode(mode);
- this.setConfiguration(configuration);
- }
-
- public String getSiteId() {
- return siteId;
- }
- public void setSiteId(String siteId) {
- this.siteId = siteId;
- }
- public String getAppType() {
- return appType;
- }
- public void setAppType(String appType) {
- this.appType = appType;
- }
-
- public Map<String, Object> getConfiguration() {
- return configuration;
- }
-
- public void setConfiguration(Map<String, Object> configuration) {
- this.configuration = configuration;
- }
-
- public ApplicationEntity.Mode getMode() {
- return mode;
- }
-
- public void setMode(ApplicationEntity.Mode mode) {
- this.mode = mode;
- }
- }
-
- public static class UninstallOperation implements Operation{
- private String uuid;
- private String appId;
- public UninstallOperation(String uuid){
- this.setUuid(uuid);
- }
- public UninstallOperation(String uuid,String appId){
- this.setUuid(uuid);
- this.setAppId(appId);
- }
-
- public String getUuid() {
- return uuid;
- }
- public void setUuid(String uuid) {
- this.uuid = uuid;
- }
-
- public String getAppId() {
- return appId;
- }
-
- public void setAppId(String appId) {
- this.appId = appId;
- }
- }
-
- public static class StartOperation implements Operation{
- private String uuid;
- private String appId;
- public StartOperation(String uuid){
- this.setUuid(uuid);
- }
- public StartOperation(String uuid,String appId){
- this.setUuid(uuid);
- this.setAppId(appId);
- }
- public String getUuid() {
- return uuid;
- }
- public void setUuid(String uuid) {
- this.uuid = uuid;
- }
-
- public String getAppId() {
- return appId;
- }
-
- public void setAppId(String appId) {
- this.appId = appId;
- }
- }
-
- public static class StopOperation implements Operation{
- private String uuid;
- private String appId;
-
- public StopOperation(String uuid){
- this.setUuid(uuid);
- }
- public StopOperation(String uuid,String appId){
- this.setUuid(uuid);
- this.setAppId(appId);
- }
- public String getUuid() {
- return uuid;
- }
- public void setUuid(String uuid) {
- this.uuid = uuid;
- }
-
- public String getAppId() {
- return appId;
- }
-
- public void setAppId(String appId) {
- this.appId = appId;
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationManagementService.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationManagementService.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationManagementService.java
index 965a3fa..30ae0b9 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationManagementService.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationManagementService.java
@@ -24,26 +24,26 @@ public interface ApplicationManagementService {
* @param operation
* @return
*/
- ApplicationEntity install(AppOperations.InstallOperation operation);
+ ApplicationEntity install(ApplicationOperations.InstallOperation operation);
/**
*
* @param operation
* @return
*/
- ApplicationEntity uninstall(AppOperations.UninstallOperation operation);
+ ApplicationEntity uninstall(ApplicationOperations.UninstallOperation operation);
/**
*
* @param operation
* @return
*/
- ApplicationEntity start(AppOperations.StartOperation operation);
+ ApplicationEntity start(ApplicationOperations.StartOperation operation);
/**
*
* @param operation
* @return
*/
- ApplicationEntity stop(AppOperations.StopOperation operation);
+ ApplicationEntity stop(ApplicationOperations.StopOperation operation);
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationManagementServiceImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationManagementServiceImpl.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationManagementServiceImpl.java
deleted file mode 100644
index 566fb62..0000000
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationManagementServiceImpl.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.service;
-
-import com.google.common.base.Preconditions;
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-import com.typesafe.config.Config;
-import org.apache.eagle.app.Application;
-import org.apache.eagle.app.ApplicationContext;
-import org.apache.eagle.metadata.model.ApplicationDesc;
-import org.apache.eagle.metadata.model.ApplicationEntity;
-import org.apache.eagle.metadata.model.SiteEntity;
-import org.apache.eagle.metadata.service.ApplicationEntityService;
-import org.apache.eagle.metadata.service.SiteEntityService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@Singleton
-public class ApplicationManagementServiceImpl implements ApplicationManagementService {
- private final SiteEntityService siteEntityService;
- private final ApplicationProviderService applicationProviderService;
- private final ApplicationEntityService applicationEntityService;
- private final Config config;
- private final static Logger LOGGER = LoggerFactory.getLogger(ApplicationManagementServiceImpl.class);
-
- @Inject
- public ApplicationManagementServiceImpl(
- Config config,
- SiteEntityService siteEntityService,
- ApplicationProviderService applicationProviderService,
- ApplicationEntityService applicationEntityService){
- this.config = config;
- this.siteEntityService = siteEntityService;
- this.applicationProviderService = applicationProviderService;
- this.applicationEntityService = applicationEntityService;
- }
-
- public ApplicationEntity install(AppOperations.InstallOperation operation) {
- Preconditions.checkNotNull(operation.getSiteId(),"siteId is null");
- Preconditions.checkNotNull(operation.getAppType(),"appType is null");
- SiteEntity siteEntity = siteEntityService.getBySiteId(operation.getSiteId());
- Preconditions.checkNotNull(siteEntity,"Site with ID: "+operation.getSiteId()+" is not found");
- ApplicationDesc appDesc = applicationProviderService.getApplicationDescByType(operation.getAppType());
- Preconditions.checkNotNull("Application with TYPE: "+operation.getAppType()+" is not found");
- ApplicationEntity applicationEntity = new ApplicationEntity();
- applicationEntity.setDescriptor(appDesc);
- applicationEntity.setSite(siteEntity);
- applicationEntity.setConfiguration(operation.getConfiguration());
- applicationEntity.setMode(operation.getMode());
- ApplicationContext applicationContext = new ApplicationContext(applicationEntity,config);
- applicationEntity.setStreams(applicationContext.getStreamSinkDescs());
- applicationEntityService.create(applicationEntity);
- return applicationEntity;
- }
-
- public ApplicationEntity uninstall(AppOperations.UninstallOperation operation) {
- ApplicationEntity applicationEntity = applicationEntityService.getByUUIDOrAppId(operation.getUuid(),operation.getAppId());
- Application application = applicationProviderService.getApplicationProviderByType(applicationEntity.getDescriptor().getType()).getApplication();
- // TODO: Check status, skip stop if already STOPPED
- try {
- application.stop(new ApplicationContext(applicationEntity, this.config));
- }catch (Throwable throwable){
- LOGGER.error(throwable.getMessage(),throwable);
- }
- return applicationEntityService.delete(applicationEntity);
- }
-
- public ApplicationEntity start(AppOperations.StartOperation operation) {
- ApplicationEntity applicationEntity = applicationEntityService.getByUUIDOrAppId(operation.getUuid(),operation.getAppId());
- Application application = applicationProviderService.getApplicationProviderByType(applicationEntity.getDescriptor().getType()).getApplication();
- application.start(new ApplicationContext(applicationEntity,this.config));
- return applicationEntity;
- }
-
- public ApplicationEntity stop(AppOperations.StopOperation operation) {
- ApplicationEntity applicationEntity = applicationEntityService.getByUUIDOrAppId(operation.getUuid(),operation.getAppId());
- Application application = applicationProviderService.getApplicationProviderByType(applicationEntity.getDescriptor().getType()).getApplication();
- application.stop(new ApplicationContext(applicationEntity,this.config));
- return applicationEntity;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationOperations.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationOperations.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationOperations.java
new file mode 100644
index 0000000..792b7d4
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationOperations.java
@@ -0,0 +1,189 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.service;
+
+import org.apache.eagle.metadata.model.ApplicationEntity;
+
+import java.io.Serializable;
+import java.util.Map;
+
+public final class ApplicationOperations {
+ interface Operation extends Serializable {
+ String getType();
+ }
+
+ private final static String INSTALL = "INSTALL";
+ private final static String UNINSTALL = "UNINSTALL";
+ private final static String START = "START";
+ private final static String STOP = "STOP";
+
+ public static class InstallOperation implements Operation{
+ private String siteId;
+ private String appType;
+ private ApplicationEntity.Mode mode = ApplicationEntity.Mode.LOCAL;
+ private Map<String,Object> configuration;
+
+ public InstallOperation(){}
+ public InstallOperation(String siteId,String appType){
+ this.setSiteId(siteId);
+ this.setAppType(appType);
+ }
+ public InstallOperation(String siteId,String appType,ApplicationEntity.Mode mode){
+ this.setSiteId(siteId);
+ this.setAppType(appType);
+ this.setMode(mode);
+ }
+ public InstallOperation(String siteId,String appType,ApplicationEntity.Mode mode,Map<String,Object> configuration){
+ this.setSiteId(siteId);
+ this.setAppType(appType);
+ this.setMode(mode);
+ this.setConfiguration(configuration);
+ }
+
+ public String getSiteId() {
+ return siteId;
+ }
+ public void setSiteId(String siteId) {
+ this.siteId = siteId;
+ }
+ public String getAppType() {
+ return appType;
+ }
+ public void setAppType(String appType) {
+ this.appType = appType;
+ }
+
+ public Map<String, Object> getConfiguration() {
+ return configuration;
+ }
+
+ public void setConfiguration(Map<String, Object> configuration) {
+ this.configuration = configuration;
+ }
+
+ public ApplicationEntity.Mode getMode() {
+ return mode;
+ }
+
+ public void setMode(ApplicationEntity.Mode mode) {
+ this.mode = mode;
+ }
+
+ @Override
+ public String getType() {
+ return INSTALL;
+ }
+ }
+
+ public static class UninstallOperation implements Operation{
+ private String uuid;
+ private String appId;
+ public UninstallOperation(){}
+ public UninstallOperation(String uuid){
+ this.setUuid(uuid);
+ }
+ public UninstallOperation(String uuid,String appId){
+ this.setUuid(uuid);
+ this.setAppId(appId);
+ }
+
+ public String getUuid() {
+ return uuid;
+ }
+ public void setUuid(String uuid) {
+ this.uuid = uuid;
+ }
+
+ public String getAppId() {
+ return appId;
+ }
+
+ public void setAppId(String appId) {
+ this.appId = appId;
+ }
+
+ @Override
+ public String getType() {
+ return UNINSTALL;
+ }
+ }
+
+ public static class StartOperation implements Operation{
+ private String uuid;
+ private String appId;
+ public StartOperation(){}
+ public StartOperation(String uuid){
+ this.setUuid(uuid);
+ }
+ public StartOperation(String uuid,String appId){
+ this.setUuid(uuid);
+ this.setAppId(appId);
+ }
+ public String getUuid() {
+ return uuid;
+ }
+ public void setUuid(String uuid) {
+ this.uuid = uuid;
+ }
+
+ public String getAppId() {
+ return appId;
+ }
+
+ public void setAppId(String appId) {
+ this.appId = appId;
+ }
+
+ @Override
+ public String getType() {
+ return START;
+ }
+ }
+
+ public static class StopOperation implements Operation{
+ private String uuid;
+ private String appId;
+
+ public StopOperation(){}
+ public StopOperation(String uuid){
+ this.setUuid(uuid);
+ }
+ public StopOperation(String uuid,String appId){
+ this.setUuid(uuid);
+ this.setAppId(appId);
+ }
+ public String getUuid() {
+ return uuid;
+ }
+ public void setUuid(String uuid) {
+ this.uuid = uuid;
+ }
+
+ public String getAppId() {
+ return appId;
+ }
+
+ public void setAppId(String appId) {
+ this.appId = appId;
+ }
+
+ @Override
+ public String getType() {
+ return STOP;
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationProviderLoader.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationProviderLoader.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationProviderLoader.java
index 8b98404..85f0709 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationProviderLoader.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationProviderLoader.java
@@ -17,8 +17,8 @@
package org.apache.eagle.app.service;
import com.typesafe.config.Config;
-import org.apache.eagle.app.service.loader.ApplicationProviderConfigLoader;
-import org.apache.eagle.app.service.loader.ApplicationProviderSPILoader;
+import org.apache.eagle.app.service.impl.ApplicationProviderConfigLoader;
+import org.apache.eagle.app.service.impl.ApplicationProviderSPILoader;
import org.apache.eagle.app.spi.ApplicationProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationProviderServiceImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationProviderServiceImpl.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationProviderServiceImpl.java
deleted file mode 100644
index 6ce463f..0000000
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationProviderServiceImpl.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.service;
-
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-import com.typesafe.config.Config;
-import org.apache.eagle.app.spi.ApplicationProvider;
-import org.apache.eagle.metadata.model.ApplicationDesc;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.*;
-import java.util.stream.Collectors;
-
-/**
- * Support to load application provider from application.provider.config = "providers.xml" configuration file
- * or application.provider.dir = "lib/apps" with SPI Class loader
- *
- * TODO: hot-manage application provider loading
- */
-@Singleton
-public class ApplicationProviderServiceImpl implements ApplicationProviderService {
- private final Config config;
- private final static Logger LOG = LoggerFactory.getLogger(ApplicationProviderServiceImpl.class);
- private final ApplicationProviderLoader appProviderLoader;
- public final static String APP_PROVIDER_LOADER_CLASS_KEY = "application.provider.loader";
-
- @Inject
- public ApplicationProviderServiceImpl(Config config){
- LOG.info("Initializing {}",this.getClass().getCanonicalName());
- this.config = config;
- String appProviderLoaderClass = this.config.hasPath(APP_PROVIDER_LOADER_CLASS_KEY)?
- this.config.getString(APP_PROVIDER_LOADER_CLASS_KEY):ApplicationProviderLoader.getDefaultAppProviderLoader();
- LOG.info("Initializing {} = {}",APP_PROVIDER_LOADER_CLASS_KEY,appProviderLoaderClass);
- appProviderLoader = initializeAppProviderLoader(appProviderLoaderClass);
- LOG.info("Initialized {}",appProviderLoader);
- reload();
- }
-
- private ApplicationProviderLoader initializeAppProviderLoader(String appProviderLoaderClass){
- try {
- return (ApplicationProviderLoader) Class.forName(appProviderLoaderClass).getConstructor(Config.class).newInstance(this.config);
- } catch (Throwable e) {
- LOG.error("Failed to initialize ApplicationProviderLoader: "+appProviderLoaderClass,e);
- throw new IllegalStateException("Failed to initialize ApplicationProviderLoader: "+appProviderLoaderClass,e);
- }
- }
-
- public synchronized void reload(){
- appProviderLoader.reset();
- LOG.info("Loading application providers ...");
- appProviderLoader.load();
- LOG.info("Loaded {} application providers",appProviderLoader.getProviders().size());
- }
-
- public Collection<ApplicationProvider> getProviders(){
- return appProviderLoader.getProviders();
- }
-
- public Collection<ApplicationDesc> getApplicationDescs(){
- return getProviders().stream().map(ApplicationProvider::getApplicationDesc).collect(Collectors.toList());
- }
-
- public ApplicationProvider<?> getApplicationProviderByType(String type) {
- return appProviderLoader.getApplicationProviderByType(type);
- }
-
- @Deprecated
- public ApplicationDesc getApplicationDescByType(String appType) {
- return appProviderLoader.getApplicationProviderByType(appType).getApplicationDesc();
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationManagementServiceImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationManagementServiceImpl.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationManagementServiceImpl.java
new file mode 100644
index 0000000..9e34459
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationManagementServiceImpl.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.service.impl;
+
+import com.google.common.base.Preconditions;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.typesafe.config.Config;
+import org.apache.eagle.app.ApplicationContext;
+import org.apache.eagle.app.service.ApplicationOperations;
+import org.apache.eagle.app.service.ApplicationManagementService;
+import org.apache.eagle.app.service.ApplicationProviderService;
+import org.apache.eagle.metadata.model.ApplicationDesc;
+import org.apache.eagle.metadata.model.ApplicationEntity;
+import org.apache.eagle.metadata.model.SiteEntity;
+import org.apache.eagle.metadata.service.ApplicationEntityService;
+import org.apache.eagle.metadata.service.SiteEntityService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Singleton
+public class ApplicationManagementServiceImpl implements ApplicationManagementService {
+ private final SiteEntityService siteEntityService;
+ private final ApplicationProviderService applicationProviderService;
+ private final ApplicationEntityService applicationEntityService;
+ private final Config config;
+ private final static Logger LOGGER = LoggerFactory.getLogger(ApplicationManagementServiceImpl.class);
+
+ @Inject
+ public ApplicationManagementServiceImpl(
+ Config config,
+ SiteEntityService siteEntityService,
+ ApplicationProviderService applicationProviderService,
+ ApplicationEntityService applicationEntityService){
+ this.config = config;
+ this.siteEntityService = siteEntityService;
+ this.applicationProviderService = applicationProviderService;
+ this.applicationEntityService = applicationEntityService;
+ }
+
+ public ApplicationEntity install(ApplicationOperations.InstallOperation operation) {
+ Preconditions.checkNotNull(operation.getSiteId(),"siteId is null");
+ Preconditions.checkNotNull(operation.getAppType(),"appType is null");
+ SiteEntity siteEntity = siteEntityService.getBySiteId(operation.getSiteId());
+ Preconditions.checkNotNull(siteEntity,"Site with ID: "+operation.getSiteId()+" is not found");
+ ApplicationDesc appDesc = applicationProviderService.getApplicationDescByType(operation.getAppType());
+ Preconditions.checkNotNull("Application with TYPE: "+operation.getAppType()+" is not found");
+ ApplicationEntity applicationEntity = new ApplicationEntity();
+ applicationEntity.setDescriptor(appDesc);
+ applicationEntity.setSite(siteEntity);
+ applicationEntity.setConfiguration(operation.getConfiguration());
+ applicationEntity.setMode(operation.getMode());
+ ApplicationContext applicationContext = new ApplicationContext(applicationEntity,applicationProviderService.getApplicationProviderByType(applicationEntity.getDescriptor().getType()),config);
+ applicationEntity.setStreams(applicationContext.getStreamSinkDescs());
+ applicationContext.onAppInstall();
+ applicationEntityService.create(applicationEntity);
+ return applicationEntity;
+ }
+
+ public ApplicationEntity uninstall(ApplicationOperations.UninstallOperation operation) {
+ ApplicationEntity applicationEntity = applicationEntityService.getByUUIDOrAppId(operation.getUuid(),operation.getAppId());
+ ApplicationContext applicationContext = new ApplicationContext(applicationEntity,applicationProviderService.getApplicationProviderByType(applicationEntity.getDescriptor().getType()),config);
+ // TODO: Check status, skip stop if already STOPPED
+ try {
+ applicationContext.onAppStop();
+ }catch (Throwable throwable){
+ LOGGER.error(throwable.getMessage(),throwable);
+ }
+ applicationContext.onAppUninstall();
+ return applicationEntityService.delete(applicationEntity);
+ }
+
+ public ApplicationEntity start(ApplicationOperations.StartOperation operation) {
+ ApplicationEntity applicationEntity = applicationEntityService.getByUUIDOrAppId(operation.getUuid(),operation.getAppId());
+ new ApplicationContext(applicationEntity,applicationProviderService.getApplicationProviderByType(applicationEntity.getDescriptor().getType()),this.config).onAppStart();
+ return applicationEntity;
+ }
+
+ public ApplicationEntity stop(ApplicationOperations.StopOperation operation) {
+ ApplicationEntity applicationEntity = applicationEntityService.getByUUIDOrAppId(operation.getUuid(),operation.getAppId());
+ new ApplicationContext(applicationEntity,applicationProviderService.getApplicationProviderByType(applicationEntity.getDescriptor().getType()),this.config).onAppStop();
+ return applicationEntity;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationProviderConfigLoader.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationProviderConfigLoader.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationProviderConfigLoader.java
new file mode 100644
index 0000000..3b2bca9
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationProviderConfigLoader.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.service.impl;
+
+import com.google.common.base.Preconditions;
+import com.typesafe.config.Config;
+import org.apache.eagle.app.config.ApplicationProviderConfig;
+import org.apache.eagle.app.config.ApplicationProvidersConfig;
+import org.apache.eagle.app.service.ApplicationProviderLoader;
+import org.apache.eagle.app.spi.ApplicationProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.Unmarshaller;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+
+public class ApplicationProviderConfigLoader extends ApplicationProviderLoader {
+ public final static String DEFAULT_APPLICATIONS_CONFIG_FILE = "providers.xml";
+ private final static String APPLICATIONS_CONFIG_PROPS_KEY = "application.provider.config";
+ private final static Logger LOG = LoggerFactory.getLogger(ApplicationProviderConfigLoader.class);
+ public ApplicationProviderConfigLoader(Config config) {
+ super(config);
+ }
+
+ @Override
+ public void load() {
+ List<ApplicationProviderConfig> applicationProviderConfigs = loadProvidersFromProvidersConf();
+ int totalCount = applicationProviderConfigs.size();
+ int loadedCount = 0,failedCount = 0;
+ for(ApplicationProviderConfig providerConfig: applicationProviderConfigs){
+ try {
+ initializeProvider(providerConfig);
+ loadedCount ++;
+ }catch (Throwable ex){
+ LOG.warn("Failed to initialized {}, ignored",providerConfig,ex);
+ failedCount ++;
+ }
+ }
+ LOG.info("Loaded {} app providers (total: {}, failed: {})",loadedCount,totalCount,failedCount);
+ }
+
+ public static boolean appProviderConfExists(String applicationConfFile){
+ InputStream is = ApplicationProviderConfigLoader.class.getResourceAsStream(applicationConfFile);
+ if(is == null){
+ is = ApplicationProviderConfigLoader.class.getResourceAsStream("/"+applicationConfFile);
+ }
+
+ if(is != null){
+ try {
+ return true;
+ } finally {
+ try {
+ is.close();
+ } catch (IOException e) {
+ LOG.debug(e.getMessage());
+ }
+ }
+ } else {
+ return false;
+ }
+ }
+
+ private void initializeProvider(ApplicationProviderConfig providerConfig) throws ClassNotFoundException, IllegalAccessException, InstantiationException {
+ LOG.info("Loading application provider {} from {}",providerConfig.getClassName(),providerConfig.getJarPath());
+ String providerClassName = providerConfig.getClassName();
+ if(providerClassName == null) throw new RuntimeException("provider.classname is null: "+providerConfig);
+ if(providerConfig.getJarPath() == null) throw new RuntimeException("provider.jarpath is null: "+providerConfig);
+
+ Class<?> providerClass = Class.forName(providerClassName);
+
+ if(!ApplicationProvider.class.isAssignableFrom(providerClass)){
+ throw new RuntimeException("providerClassName is not implementation of "+ApplicationProvider.class.getCanonicalName());
+ }
+ ApplicationProvider provider = (ApplicationProvider) providerClass.newInstance();
+ provider.prepare(providerConfig,this.getConfig());
+ Preconditions.checkNotNull(provider.getApplicationDesc(),"appDesc is null");
+ Preconditions.checkNotNull(provider.getApplicationDesc().getType(),"type is null");
+ registerProvider(provider);
+ }
+
+ private List<ApplicationProviderConfig> loadProvidersFromProvidersConf() {
+ String providerConfigFile = DEFAULT_APPLICATIONS_CONFIG_FILE;
+ if(getConfig().hasPath(APPLICATIONS_CONFIG_PROPS_KEY)){
+ providerConfigFile = getConfig().getString(APPLICATIONS_CONFIG_PROPS_KEY);
+ LOG.info("Set {} = {}",APPLICATIONS_CONFIG_PROPS_KEY,providerConfigFile);
+ }
+ InputStream is = null;
+ try {
+ JAXBContext jc = JAXBContext.newInstance(ApplicationProvidersConfig.class);
+ Unmarshaller unmarshaller = jc.createUnmarshaller();
+ is = ApplicationProviderConfigLoader.class.getResourceAsStream(providerConfigFile);
+ if(is == null){
+ is = ApplicationProviderConfigLoader.class.getResourceAsStream("/"+providerConfigFile);
+ }
+ if(is == null){
+ LOG.error("Application provider configuration {} is not found",providerConfigFile);
+ }
+ Preconditions.checkNotNull(is,providerConfigFile+" is not found");
+ return ((ApplicationProvidersConfig) unmarshaller.unmarshal(is)).getProviders();
+ }catch (Exception ex){
+ LOG.error("Failed to load application provider configuration: {}",providerConfigFile,ex);
+ throw new RuntimeException("Failed to load application provider configuration: "+providerConfigFile,ex);
+ } finally {
+ if(is != null) try {
+ is.close();
+ } catch (IOException e) {
+ LOG.error(e.getMessage(),e);
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationProviderSPILoader.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationProviderSPILoader.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationProviderSPILoader.java
new file mode 100644
index 0000000..3304286
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationProviderSPILoader.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.service.impl;
+
+import com.typesafe.config.Config;
+import org.apache.eagle.app.config.ApplicationProviderConfig;
+import org.apache.eagle.app.service.ApplicationProviderLoader;
+import org.apache.eagle.app.spi.ApplicationProvider;
+import org.apache.eagle.app.tools.DynamicJarPathFinder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.ServiceLoader;
+import java.util.function.Function;
+
+public class ApplicationProviderSPILoader extends ApplicationProviderLoader{
+ private final String appProviderExtDir;
+ private final static Logger LOG = LoggerFactory.getLogger(ApplicationProviderSPILoader.class);
+ private final static String APPLICATIONS_DIR_PROPS_KEY = "application.provider.dir";
+
+ public ApplicationProviderSPILoader(Config config) {
+ super(config);
+ if(config.hasPath(APPLICATIONS_DIR_PROPS_KEY)) {
+ this.appProviderExtDir = config.getString(APPLICATIONS_DIR_PROPS_KEY);
+ }else{
+ this.appProviderExtDir = null;
+ }
+
+ LOG.info("Using {}: {}",APPLICATIONS_DIR_PROPS_KEY,this.appProviderExtDir);
+
+ }
+
+ @Override
+ public void load() {
+ if(appProviderExtDir != null) {
+ LOG.info("Loading application providers from class loader of jars in {}", appProviderExtDir);
+ File loc = new File(appProviderExtDir);
+ File[] jarFiles = loc.listFiles(file -> file.getPath().toLowerCase().endsWith(".jar"));
+ if (jarFiles != null) {
+ for (File jarFile : jarFiles) {
+ try {
+ URL jarFileUrl = jarFile.toURI().toURL();
+ LOG.debug("Loading ApplicationProvider from jar: {}", jarFileUrl.toString());
+ URLClassLoader jarFileClassLoader = new URLClassLoader(new URL[]{jarFileUrl});
+ loadProviderFromClassLoader(jarFileClassLoader, (applicationProviderConfig) -> jarFileUrl.getPath());
+ } catch (Exception e) {
+ LOG.warn("Failed to load application provider from jar {}", jarFile,e);
+ }
+ }
+ }
+ } else {
+ LOG.info("Loading application providers from context class loader");
+ ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
+ loadProviderFromClassLoader(classLoader,(applicationProviderConfig) -> DynamicJarPathFinder.findPath(applicationProviderConfig.getClass()));
+ }
+ }
+
+ private void loadProviderFromClassLoader(ClassLoader jarFileClassLoader, Function<ApplicationProviderConfig,String> jarFileSupplier){
+ ServiceLoader<ApplicationProvider> serviceLoader = ServiceLoader.load(ApplicationProvider.class, jarFileClassLoader);
+ for (ApplicationProvider applicationProvider : serviceLoader) {
+ try {
+ ApplicationProviderConfig providerConfig = new ApplicationProviderConfig();
+ providerConfig.setClassName(applicationProvider.getClass().getCanonicalName());
+ providerConfig.setJarPath(jarFileSupplier.apply(providerConfig));
+ applicationProvider.prepare(providerConfig, getConfig());
+ registerProvider(applicationProvider);
+ }catch (Throwable ex){
+ LOG.warn("Failed to register application provider {}",applicationProvider,ex);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationProviderServiceImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationProviderServiceImpl.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationProviderServiceImpl.java
new file mode 100644
index 0000000..2297ea5
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationProviderServiceImpl.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.service.impl;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.typesafe.config.Config;
+import org.apache.eagle.app.service.ApplicationProviderLoader;
+import org.apache.eagle.app.service.ApplicationProviderService;
+import org.apache.eagle.app.spi.ApplicationProvider;
+import org.apache.eagle.metadata.model.ApplicationDesc;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.stream.Collectors;
+
+/**
+ * Support to load application provider from application.provider.config = "providers.xml" configuration file
+ * or application.provider.dir = "lib/apps" with SPI Class loader
+ *
+ * TODO: hot-manage application provider loading
+ */
+@Singleton
+public class ApplicationProviderServiceImpl implements ApplicationProviderService {
+ private final Config config;
+ private final static Logger LOG = LoggerFactory.getLogger(ApplicationProviderServiceImpl.class);
+ private final ApplicationProviderLoader appProviderLoader;
+ public final static String APP_PROVIDER_LOADER_CLASS_KEY = "application.provider.loader";
+
+ @Inject
+ public ApplicationProviderServiceImpl(Config config){
+ LOG.info("Initializing {}",this.getClass().getCanonicalName());
+ this.config = config;
+ String appProviderLoaderClass = this.config.hasPath(APP_PROVIDER_LOADER_CLASS_KEY)?
+ this.config.getString(APP_PROVIDER_LOADER_CLASS_KEY):ApplicationProviderLoader.getDefaultAppProviderLoader();
+ LOG.info("Initializing {} = {}",APP_PROVIDER_LOADER_CLASS_KEY,appProviderLoaderClass);
+ appProviderLoader = initializeAppProviderLoader(appProviderLoaderClass);
+ LOG.info("Initialized {}",appProviderLoader);
+ reload();
+ }
+
+ private ApplicationProviderLoader initializeAppProviderLoader(String appProviderLoaderClass){
+ try {
+ return (ApplicationProviderLoader) Class.forName(appProviderLoaderClass).getConstructor(Config.class).newInstance(this.config);
+ } catch (Throwable e) {
+ LOG.error("Failed to initialize ApplicationProviderLoader: "+appProviderLoaderClass,e);
+ throw new IllegalStateException("Failed to initialize ApplicationProviderLoader: "+appProviderLoaderClass,e);
+ }
+ }
+
+ public synchronized void reload(){
+ appProviderLoader.reset();
+ LOG.info("Loading application providers ...");
+ appProviderLoader.load();
+ LOG.info("Loaded {} application providers",appProviderLoader.getProviders().size());
+ }
+
+ public Collection<ApplicationProvider> getProviders(){
+ return appProviderLoader.getProviders();
+ }
+
+ public Collection<ApplicationDesc> getApplicationDescs(){
+ return getProviders().stream().map(ApplicationProvider::getApplicationDesc).collect(Collectors.toList());
+ }
+
+ public ApplicationProvider<?> getApplicationProviderByType(String type) {
+ return appProviderLoader.getApplicationProviderByType(type);
+ }
+
+ @Deprecated
+ public ApplicationDesc getApplicationDescByType(String appType) {
+ return appProviderLoader.getApplicationProviderByType(appType).getApplicationDesc();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/loader/ApplicationProviderConfigLoader.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/loader/ApplicationProviderConfigLoader.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/loader/ApplicationProviderConfigLoader.java
deleted file mode 100644
index 11d9905..0000000
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/loader/ApplicationProviderConfigLoader.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.service.loader;
-
-import com.google.common.base.Preconditions;
-import com.typesafe.config.Config;
-import org.apache.eagle.app.config.ApplicationProviderConfig;
-import org.apache.eagle.app.config.ApplicationProvidersConfig;
-import org.apache.eagle.app.service.ApplicationProviderLoader;
-import org.apache.eagle.app.spi.ApplicationProvider;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.xml.bind.JAXBContext;
-import javax.xml.bind.Unmarshaller;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.List;
-
-public class ApplicationProviderConfigLoader extends ApplicationProviderLoader {
- public final static String DEFAULT_APPLICATIONS_CONFIG_FILE = "providers.xml";
- private final static String APPLICATIONS_CONFIG_PROPS_KEY = "application.provider.config";
- private final static Logger LOG = LoggerFactory.getLogger(ApplicationProviderConfigLoader.class);
- public ApplicationProviderConfigLoader(Config config) {
- super(config);
- }
-
- @Override
- public void load() {
- List<ApplicationProviderConfig> applicationProviderConfigs = loadProvidersFromProvidersConf();
- int totalCount = applicationProviderConfigs.size();
- int loadedCount = 0,failedCount = 0;
- for(ApplicationProviderConfig providerConfig: applicationProviderConfigs){
- try {
- initializeProvider(providerConfig);
- loadedCount ++;
- }catch (Throwable ex){
- LOG.warn("Failed to initialized {}, ignored",providerConfig,ex);
- failedCount ++;
- }
- }
- LOG.info("Loaded {} app providers (total: {}, failed: {})",loadedCount,totalCount,failedCount);
- }
-
- public static boolean appProviderConfExists(String applicationConfFile){
- InputStream is = ApplicationProviderConfigLoader.class.getResourceAsStream(applicationConfFile);
- if(is == null){
- is = ApplicationProviderConfigLoader.class.getResourceAsStream("/"+applicationConfFile);
- }
-
- if(is != null){
- try {
- return true;
- } finally {
- try {
- is.close();
- } catch (IOException e) {
- LOG.debug(e.getMessage());
- }
- }
- } else {
- return false;
- }
- }
-
- private void initializeProvider(ApplicationProviderConfig providerConfig) throws ClassNotFoundException, IllegalAccessException, InstantiationException {
- LOG.info("Loading application provider {} from {}",providerConfig.getClassName(),providerConfig.getJarPath());
- String providerClassName = providerConfig.getClassName();
- if(providerClassName == null) throw new RuntimeException("provider.classname is null: "+providerConfig);
- if(providerConfig.getJarPath() == null) throw new RuntimeException("provider.jarpath is null: "+providerConfig);
-
- Class<?> providerClass = Class.forName(providerClassName);
-
- if(!ApplicationProvider.class.isAssignableFrom(providerClass)){
- throw new RuntimeException("providerClassName is not implementation of "+ApplicationProvider.class.getCanonicalName());
- }
- ApplicationProvider provider = (ApplicationProvider) providerClass.newInstance();
- provider.prepare(providerConfig,this.getConfig());
- Preconditions.checkNotNull(provider.getApplicationDesc(),"appDesc is null");
- Preconditions.checkNotNull(provider.getApplicationDesc().getType(),"type is null");
- registerProvider(provider);
- }
-
- private List<ApplicationProviderConfig> loadProvidersFromProvidersConf() {
- String providerConfigFile = DEFAULT_APPLICATIONS_CONFIG_FILE;
- if(getConfig().hasPath(APPLICATIONS_CONFIG_PROPS_KEY)){
- providerConfigFile = getConfig().getString(APPLICATIONS_CONFIG_PROPS_KEY);
- LOG.info("Set {} = {}",APPLICATIONS_CONFIG_PROPS_KEY,providerConfigFile);
- }
- InputStream is = null;
- try {
- JAXBContext jc = JAXBContext.newInstance(ApplicationProvidersConfig.class);
- Unmarshaller unmarshaller = jc.createUnmarshaller();
- is = ApplicationProviderConfigLoader.class.getResourceAsStream(providerConfigFile);
- if(is == null){
- is = ApplicationProviderConfigLoader.class.getResourceAsStream("/"+providerConfigFile);
- }
- if(is == null){
- LOG.error("Application provider configuration {} is not found",providerConfigFile);
- }
- Preconditions.checkNotNull(is,providerConfigFile+" is not found");
- return ((ApplicationProvidersConfig) unmarshaller.unmarshal(is)).getProviders();
- }catch (Exception ex){
- LOG.error("Failed to load application provider configuration: {}",providerConfigFile,ex);
- throw new RuntimeException("Failed to load application provider configuration: "+providerConfigFile,ex);
- } finally {
- if(is != null) try {
- is.close();
- } catch (IOException e) {
- LOG.error(e.getMessage(),e);
- }
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/loader/ApplicationProviderSPILoader.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/loader/ApplicationProviderSPILoader.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/loader/ApplicationProviderSPILoader.java
deleted file mode 100644
index 33cb401..0000000
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/loader/ApplicationProviderSPILoader.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.service.loader;
-
-import com.typesafe.config.Config;
-import org.apache.eagle.app.config.ApplicationProviderConfig;
-import org.apache.eagle.app.service.ApplicationProviderLoader;
-import org.apache.eagle.app.spi.ApplicationProvider;
-import org.apache.eagle.app.tools.DynamicJarPathFinder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.net.URL;
-import java.net.URLClassLoader;
-import java.util.ServiceLoader;
-import java.util.function.Function;
-
-public class ApplicationProviderSPILoader extends ApplicationProviderLoader{
- private final String appProviderExtDir;
- private final static Logger LOG = LoggerFactory.getLogger(ApplicationProviderSPILoader.class);
- private final static String APPLICATIONS_DIR_PROPS_KEY = "application.provider.dir";
-
- public ApplicationProviderSPILoader(Config config) {
- super(config);
- if(config.hasPath(APPLICATIONS_DIR_PROPS_KEY)) {
- this.appProviderExtDir = config.getString(APPLICATIONS_DIR_PROPS_KEY);
- }else{
- this.appProviderExtDir = null;
- }
-
- LOG.info("Using {}: {}",APPLICATIONS_DIR_PROPS_KEY,this.appProviderExtDir);
-
- }
-
- @Override
- public void load() {
- if(appProviderExtDir != null) {
- LOG.info("Loading application providers from class loader of jars in {}", appProviderExtDir);
- File loc = new File(appProviderExtDir);
- File[] jarFiles = loc.listFiles(file -> file.getPath().toLowerCase().endsWith(".jar"));
- if (jarFiles != null) {
- for (File jarFile : jarFiles) {
- try {
- URL jarFileUrl = jarFile.toURI().toURL();
- LOG.debug("Loading ApplicationProvider from jar: {}", jarFileUrl.toString());
- URLClassLoader jarFileClassLoader = new URLClassLoader(new URL[]{jarFileUrl});
- loadProviderFromClassLoader(jarFileClassLoader, (applicationProviderConfig) -> jarFileUrl.getPath());
- } catch (Exception e) {
- LOG.warn("Failed to load application provider from jar {}", jarFile,e);
- }
- }
- }
- } else {
- LOG.info("Loading application providers from context class loader");
- ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
- loadProviderFromClassLoader(classLoader,(applicationProviderConfig) -> DynamicJarPathFinder.findPath(applicationProviderConfig.getClass()));
- }
- }
-
- private void loadProviderFromClassLoader(ClassLoader jarFileClassLoader, Function<ApplicationProviderConfig,String> jarFileSupplier){
- ServiceLoader<ApplicationProvider> serviceLoader = ServiceLoader.load(ApplicationProvider.class, jarFileClassLoader);
- for (ApplicationProvider applicationProvider : serviceLoader) {
- try {
- ApplicationProviderConfig providerConfig = new ApplicationProviderConfig();
- providerConfig.setClassName(applicationProvider.getClass().getCanonicalName());
- providerConfig.setJarPath(jarFileSupplier.apply(providerConfig));
- applicationProvider.prepare(providerConfig, getConfig());
- registerProvider(applicationProvider);
- }catch (Throwable ex){
- LOG.warn("Failed to register application provider {}",applicationProvider,ex);
- }
- }
- }
-}
[2/3] incubator-eagle git commit: [EAGLE-386] Refactor Application
Framework Interfaces and StreamEventMapper
Posted by ha...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/AbstractStreamSink.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/AbstractStreamSink.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/AbstractStreamSink.java
new file mode 100644
index 0000000..322ff8b
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/AbstractStreamSink.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.sink;
+
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.BasicOutputCollector;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import org.apache.eagle.alert.engine.model.StreamEvent;
+import org.apache.eagle.app.sink.mapper.StreamEventMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+
+public abstract class AbstractStreamSink extends StreamSink {
+ private final static Logger LOG = LoggerFactory.getLogger(AbstractStreamSink.class);
+ private final static String KEY_FIELD = "KEY";
+ private final static String VALUE_FIELD = "VALUE";
+ private StreamEventMapper streamEventMapper;
+
+ public AbstractStreamSink setEventMapper(StreamEventMapper streamEventMapper){
+ this.streamEventMapper = streamEventMapper;
+ return this;
+ }
+
+ @Override
+ public void prepare(Map stormConf, TopologyContext context) {
+ super.prepare(stormConf, context);
+ }
+
+ @Override
+ public void execute(Tuple input, BasicOutputCollector collector) {
+ try {
+ List<StreamEvent> streamEvents = streamEventMapper.map(input);
+ if(streamEvents!=null) {
+ streamEvents.forEach((streamEvent -> {
+ try {
+ onEvent(streamEvent);
+ } catch (Exception e) {
+ LOG.error("Failed to execute event {}", streamEvent);
+ collector.reportError(e);
+ }
+ }));
+ }
+ } catch (Exception e) {
+ LOG.error("Failed to execute event {}",input);
+ collector.reportError(e);
+ }
+ }
+
+ protected abstract void onEvent(StreamEvent streamEvent);
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields(KEY_FIELD,VALUE_FIELD));
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSink.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSink.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSink.java
index eee2a70..1490368 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSink.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSink.java
@@ -26,32 +26,27 @@ import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
-public class KafkaStreamSink extends StreamSink {
+public class KafkaStreamSink extends AbstractStreamSink {
private final static Logger LOGGER = LoggerFactory.getLogger(KafkaStreamSink.class);
- private final String topicId;
+ private String topicId;
- public KafkaStreamSink(StreamDefinition streamDefinition, ApplicationContext applicationContext) {
- super(streamDefinition, applicationContext);
- this.topicId = String.format("EAGLE_%s_%s_%s",
- applicationContext.getAppEntity().getSite().getSiteId(),
- applicationContext.getAppEntity().getDescriptor().getType(),
+ @Override
+ public void init(StreamDefinition streamDefinition, ApplicationContext context) {
+ this.topicId = String.format("EAGLE_TOPIC_%s_%s_%s",
+ context.getAppEntity().getSite().getSiteId(),
+ context.getAppEntity().getDescriptor().getType(),
streamDefinition.getStreamId());
}
@Override
public void prepare(Map stormConf, TopologyContext context) {
super.prepare(stormConf, context);
- ensureTopic();
// TODO: Create KafkaProducer
}
- private void ensureTopic(){
- LOGGER.info("TODO: ensure kafka topic {} created",this.topicId);
- }
-
@Override
protected void onEvent(StreamEvent streamEvent) {
- LOGGER.info("TODO: producing {}",streamEvent);
+ LOGGER.info("TODO: producing {} to '{}'",streamEvent,topicId);
}
@Override
@@ -62,4 +57,22 @@ public class KafkaStreamSink extends StreamSink {
}
};
}
+
+ @Override
+ public void onAppInstall() {
+ ensureTopicCreated();
+ }
+
+ private void ensureTopicCreated(){
+ LOGGER.info("TODO: ensure kafka topic {} created",this.topicId);
+ }
+
+ private void ensureTopicDeleted(){
+ LOGGER.info("TODO: ensure kafka topic {} deleted",this.topicId);
+ }
+
+ @Override
+ public void onAppUninstall() {
+ ensureTopicDeleted();
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/LoggingStreamSink.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/LoggingStreamSink.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/LoggingStreamSink.java
index ca201a8..8acf325 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/LoggingStreamSink.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/LoggingStreamSink.java
@@ -25,10 +25,12 @@ import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
-public class LoggingStreamSink extends StreamSink {
+public class LoggingStreamSink extends AbstractStreamSink {
private final static Logger LOGGER = LoggerFactory.getLogger(KafkaStreamSink.class);
- public LoggingStreamSink(StreamDefinition streamDefinition, ApplicationContext applicationContext) {
- super(streamDefinition, applicationContext);
+
+ @Override
+ public void init(StreamDefinition streamDefinition, ApplicationContext context) {
+ // do nothing
}
@Override
@@ -40,4 +42,14 @@ public class LoggingStreamSink extends StreamSink {
public Map<String, Object> getSinkContext() {
return new HashMap<>();
}
+
+ @Override
+ public void onAppInstall() {
+ LOGGER.info("Executing onAppInstall callback, do nothing");
+ }
+
+ @Override
+ public void onAppUninstall() {
+ LOGGER.info("Executing onAppUninstall callback, do nothing");
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/StreamSink.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/StreamSink.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/StreamSink.java
index e0f2db2..2052484 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/StreamSink.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/StreamSink.java
@@ -1,82 +1,45 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.sink;
-
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.BasicOutputCollector;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseBasicBolt;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.engine.model.StreamEvent;
-import org.apache.eagle.app.ApplicationContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-import java.util.Map;
-
-public abstract class StreamSink extends BaseBasicBolt {
- private final static Logger LOG = LoggerFactory.getLogger(StreamSink.class);
- public final static String KEY_FIELD = "KEY";
- public final static String VALUE_FIELD = "VALUE";
-
- public StreamSink(StreamDefinition streamDefinition,ApplicationContext applicationContext){
-
- }
-
- @Override
- public void prepare(Map stormConf, TopologyContext context) {
- super.prepare(stormConf, context);
- }
-
- @Override
- public void execute(Tuple input, BasicOutputCollector collector) {
- List<Object> values = input.getValues();
- Object inputValue;
- if(values.size() == 1){
- inputValue = values.get(0);
- } else if(values.size() == 2){
- inputValue = values.get(1);
- } else{
- collector.reportError(new IllegalStateException("Expect tuple in size of 1: <StreamEvent> or 2: <Object,StreamEvent>, but got "+values.size()+": "+values));
- return;
- }
-
- if(inputValue instanceof StreamEvent){
- try {
- onEvent((StreamEvent) inputValue);
- }catch (Exception e){
- LOG.error("Failed to execute event {}",inputValue);
- collector.reportError(e);
- }
- } else {
- LOG.error("{} is not StreamEvent",inputValue);
- collector.reportError(new IllegalStateException("Input tuple "+input+"is not type of StreamEvent"));
- }
- }
-
- protected abstract void onEvent(StreamEvent streamEvent);
-
- public abstract Map<String,Object> getSinkContext();
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields(KEY_FIELD,VALUE_FIELD));
- }
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.sink;
+
+import backtype.storm.topology.base.BaseBasicBolt;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.app.ApplicationContext;
+import org.apache.eagle.app.ApplicationLifecycleListener;
+
+import java.util.Map;
+
+public abstract class StreamSink extends BaseBasicBolt implements ApplicationLifecycleListener {
+ /**
+ * Should only initialize metadata in this method but must not open any resource like connection
+ *
+ * @param streamDefinition
+ * @param context
+ */
+ public abstract void init(StreamDefinition streamDefinition, ApplicationContext context);
+ public abstract Map<String,Object> getSinkContext();
+
+ @Override
+ public void onAppStart() {
+ // StreamSink by default will do nothing when application start
+ }
+
+ @Override
+ public void onAppStop() {
+ // StreamSink by default will do nothing when application start
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/mapper/DirectEventMapper.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/mapper/DirectEventMapper.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/mapper/DirectEventMapper.java
new file mode 100644
index 0000000..631e02a
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/mapper/DirectEventMapper.java
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.sink.mapper;
+
+public interface DirectEventMapper extends StreamEventMapper{}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/mapper/FieldIndexDirectEventMapper.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/mapper/FieldIndexDirectEventMapper.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/mapper/FieldIndexDirectEventMapper.java
new file mode 100644
index 0000000..aa60414
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/mapper/FieldIndexDirectEventMapper.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.sink.mapper;
+
+import backtype.storm.tuple.Tuple;
+import org.apache.eagle.alert.engine.model.StreamEvent;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class FieldIndexDirectEventMapper implements DirectEventMapper {
+ private final int[] fieldIndexs;
+
+ public FieldIndexDirectEventMapper(int ... fieldIndexs){
+ this.fieldIndexs = fieldIndexs;
+ }
+
+ @Override
+ public List<StreamEvent> map(Tuple tuple) throws Exception {
+ List<StreamEvent> events = new ArrayList<>(fieldIndexs.length);
+ for(int index:fieldIndexs){
+ events.add((StreamEvent) tuple.getValue(index));
+ }
+ return events;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/mapper/FieldNameDirectEventMapper.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/mapper/FieldNameDirectEventMapper.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/mapper/FieldNameDirectEventMapper.java
new file mode 100644
index 0000000..bbb4a11
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/mapper/FieldNameDirectEventMapper.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.sink.mapper;
+
+import backtype.storm.tuple.Tuple;
+import org.apache.eagle.alert.engine.model.StreamEvent;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class FieldNameDirectEventMapper implements DirectEventMapper {
+ private final String[] fieldNames;
+
+ public FieldNameDirectEventMapper(String ... fieldNames){
+ this.fieldNames = fieldNames;
+ }
+
+ @Override
+ public List<StreamEvent> map(Tuple tuple) throws Exception {
+ List<StreamEvent> events = new ArrayList<>(fieldNames.length);
+ for(String fieldName:fieldNames){
+ events.add((StreamEvent) tuple.getValueByField(fieldName));
+ }
+ return events;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/mapper/FlattenEventMapper.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/mapper/FlattenEventMapper.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/mapper/FlattenEventMapper.java
new file mode 100644
index 0000000..cb8ee33
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/mapper/FlattenEventMapper.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.sink.mapper;
+
+import backtype.storm.tuple.Tuple;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.engine.model.StreamEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.List;
+
+public class FlattenEventMapper implements StreamEventMapper {
+
+ private final StreamDefinition streamDefinition;
+ private final TimestampSelector timestampSelector;
+
+ private final static String DEFAULT_TIMESTAMP_COLUMN_NAME = "timestamp";
+ private final static Logger LOG = LoggerFactory.getLogger(FlattenEventMapper.class);
+
+ public FlattenEventMapper(StreamDefinition streamDefinition, TimestampSelector timestampSelector){
+ this.streamDefinition = streamDefinition;
+ this.timestampSelector = timestampSelector;
+ }
+
+ public FlattenEventMapper(StreamDefinition streamDefinition, String timestampFieldName){
+ this.streamDefinition = streamDefinition;
+ this.timestampSelector = tuple -> tuple.getLongByField(timestampFieldName);
+ }
+
+ public FlattenEventMapper(StreamDefinition streamDefinition){
+ this(streamDefinition,DEFAULT_TIMESTAMP_COLUMN_NAME);
+ }
+
+ @Override
+ public List<StreamEvent> map(Tuple tuple) throws Exception {
+ Long timestamp = 0L;
+ try {
+ timestamp = timestampSelector.apply(tuple);
+ } catch (Throwable fieldNotExistException){
+ if(streamDefinition.isTimeseries()) {
+ LOG.error("Stream (streamId = {}) is time series, but failed to detect timestamp, treating as {}", streamDefinition.getStreamId(), timestamp, fieldNotExistException);
+ } else{
+ /// Ignored for non-timeseries stream
+ }
+ }
+
+ StreamEvent streamEvent = new StreamEvent(streamDefinition.getStreamId(),
+ timestamp,
+ this.streamDefinition.getColumns().stream().map((column) -> {
+ Object value = null;
+ try {
+ value = tuple.getValueByField(column.getName());
+ }catch (IllegalArgumentException fieldNotExistException){
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Column '{}' of stream {} not exist in {}, treating as null", column.getName(), streamDefinition.getStreamId(), tuple, fieldNotExistException);
+ }
+ }
+ return value;
+ }).toArray());
+ return Collections.singletonList(streamEvent);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/mapper/StreamEventMapper.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/mapper/StreamEventMapper.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/mapper/StreamEventMapper.java
new file mode 100644
index 0000000..4f27423
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/mapper/StreamEventMapper.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.sink.mapper;
+
+import backtype.storm.tuple.Tuple;
+import org.apache.eagle.alert.engine.model.StreamEvent;
+
+import java.io.Serializable;
+import java.util.List;
+
+@FunctionalInterface
+public interface StreamEventMapper extends Serializable{
+ /**
+ * @param tuple
+ * @return
+ * @throws Exception
+ */
+ List<StreamEvent> map(Tuple tuple) throws Exception;
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/mapper/TimestampSelector.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/mapper/TimestampSelector.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/mapper/TimestampSelector.java
new file mode 100644
index 0000000..cac66e7
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/mapper/TimestampSelector.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.sink.mapper;
+
+import backtype.storm.tuple.Tuple;
+
+import java.io.Serializable;
+import java.util.function.Function;
+
+@FunctionalInterface
+public interface TimestampSelector extends Function<Tuple,Long>,Serializable{}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/AbstractApplicationProvider.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/AbstractApplicationProvider.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/AbstractApplicationProvider.java
index 37311eb..8d61066 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/AbstractApplicationProvider.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/AbstractApplicationProvider.java
@@ -22,7 +22,7 @@ import org.apache.eagle.app.Application;
import org.apache.eagle.app.config.ApplicationProviderConfig;
import org.apache.eagle.app.config.ApplicationProviderDescConfig;
import org.apache.eagle.app.sink.KafkaStreamSink;
-import org.apache.eagle.app.sink.StreamSink;
+import org.apache.eagle.app.sink.AbstractStreamSink;
import org.apache.eagle.metadata.model.ApplicationDesc;
import org.apache.eagle.metadata.model.ApplicationDocs;
import org.apache.eagle.metadata.model.Configuration;
@@ -78,8 +78,8 @@ public abstract class AbstractApplicationProvider<T extends Application> impleme
envConfig.getString(APPLICATIONS_SINK_TYPE_PROPS_KEY) : DEFAULT_APPLICATIONS_SINK_TYPE;
try {
Class<?> sinkClass = Class.forName(sinkClassName);
- if(!StreamSink.class.isAssignableFrom(sinkClass)){
- throw new IllegalStateException(sinkClassName+ "is not assignable from "+StreamSink.class.getCanonicalName());
+ if(!AbstractStreamSink.class.isAssignableFrom(sinkClass)){
+ throw new IllegalStateException(sinkClassName+ "is not assignable from "+AbstractStreamSink.class.getCanonicalName());
}
applicationDesc.setSinkClass(sinkClass);
} catch (ClassNotFoundException e) {
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/AppSimulator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/AppSimulator.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/AppSimulator.java
deleted file mode 100644
index a3ef0ee..0000000
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/AppSimulator.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.test;
-
-import org.apache.eagle.app.spi.ApplicationProvider;
-
-import java.util.Map;
-
-/**
- * Application test simulator for developer to quickly run application without diving into application lifecycle
- */
-public interface AppSimulator {
- /**
- *
- * @param appType
- */
- void submit(String appType);
- /**
- *
- * @param appType
- * @param appConfig
- */
- void submit(String appType, Map<String,Object> appConfig);
-
- /**
- *
- * @param appProviderClass
- */
- void submit(Class<? extends ApplicationProvider> appProviderClass);
-
- /**
- *
- * @param appProviderClass
- * @param appConfig
- */
- void submit(Class<? extends ApplicationProvider> appProviderClass, Map<String,Object> appConfig) throws Exception;
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/AppSimulatorImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/AppSimulatorImpl.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/AppSimulatorImpl.java
index f5af815..2e4d33b 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/AppSimulatorImpl.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/AppSimulatorImpl.java
@@ -20,7 +20,7 @@ import com.google.inject.Inject;
import com.typesafe.config.Config;
import org.apache.eagle.app.config.ApplicationProviderConfig;
import org.apache.eagle.app.resource.ApplicationResource;
-import org.apache.eagle.app.service.AppOperations;
+import org.apache.eagle.app.service.ApplicationOperations;
import org.apache.eagle.app.spi.ApplicationProvider;
import org.apache.eagle.app.tools.DynamicJarPathFinder;
import org.apache.eagle.metadata.model.ApplicationEntity;
@@ -32,7 +32,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
-public class AppSimulatorImpl implements AppSimulator {
+public class AppSimulatorImpl extends ApplicationSimulator {
private final Config config;
private final SiteResource siteResource;
private final ApplicationResource applicationResource;
@@ -55,9 +55,9 @@ public class AppSimulatorImpl implements AppSimulator {
siteResource.createSite(siteEntity);
Assert.assertNotNull(siteEntity.getUuid());
// Install application
- ApplicationEntity applicationEntity = applicationResource.installApplication(new AppOperations.InstallOperation(siteEntity.getSiteId(),appType, ApplicationEntity.Mode.LOCAL));
+ ApplicationEntity applicationEntity = applicationResource.installApplication(new ApplicationOperations.InstallOperation(siteEntity.getSiteId(),appType, ApplicationEntity.Mode.LOCAL));
// Start application
- applicationResource.startApplication(new AppOperations.StartOperation(applicationEntity.getUuid()));
+ applicationResource.startApplication(new ApplicationOperations.StartOperation(applicationEntity.getUuid()));
}
private final static AtomicInteger incr = new AtomicInteger();
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/AppTestGuiceModule.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/AppTestGuiceModule.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/AppTestGuiceModule.java
index b2b0194..7882a9b 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/AppTestGuiceModule.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/AppTestGuiceModule.java
@@ -20,7 +20,7 @@ import com.google.inject.AbstractModule;
import com.google.inject.Singleton;
import org.apache.eagle.app.ApplicationGuiceModule;
import org.apache.eagle.common.module.CommonGuiceModule;
-import org.apache.eagle.metadata.persistence.MemoryMetadataStore;
+import org.apache.eagle.metadata.service.memory.MemoryMetadataStore;
public class AppTestGuiceModule extends AbstractModule{
@Override
@@ -28,6 +28,6 @@ public class AppTestGuiceModule extends AbstractModule{
install(new CommonGuiceModule());
install(new ApplicationGuiceModule());
install(new MemoryMetadataStore());
- bind(AppSimulator.class).to(AppSimulatorImpl.class).in(Singleton.class);
+ bind(ApplicationSimulator.class).to(AppSimulatorImpl.class).in(Singleton.class);
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationSimulator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationSimulator.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationSimulator.java
new file mode 100644
index 0000000..f8114d2
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationSimulator.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.test;
+
+import com.google.inject.Guice;
+import com.google.inject.Module;
+import org.apache.eagle.app.spi.ApplicationProvider;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Application test simulator for developer to quickly run application without diving into application lifecycle
+ */
+public abstract class ApplicationSimulator {
+ /**
+ *
+ * @param appType
+ */
+ public abstract void submit(String appType);
+
+ /**
+ *
+ * @param appType
+ * @param appConfig
+ */
+ public abstract void submit(String appType, Map<String,Object> appConfig);
+
+ /**
+ *
+ * @param appProviderClass
+ */
+ public abstract void submit(Class<? extends ApplicationProvider> appProviderClass);
+
+ /**
+ *
+ * @param appProviderClass
+ * @param appConfig
+ */
+ public abstract void submit(Class<? extends ApplicationProvider> appProviderClass, Map<String,Object> appConfig) throws Exception;
+
+ public static ApplicationSimulator getInstance(){
+ return Guice.createInjector(new AppTestGuiceModule()).getInstance(ApplicationSimulator.class);
+ }
+
+ /**
+ * @param modules additional modules
+ * @return ApplicationSimulator instance
+ */
+ public static ApplicationSimulator getInstance(Module ... modules){
+ List<Module> contextModules = Arrays.asList(modules);
+ contextModules.add(new AppTestGuiceModule());
+ return Guice.createInjector(contextModules).getInstance(ApplicationSimulator.class);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/eagle-core/eagle-app/eagle-app-base/src/main/resources/applications.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/resources/applications.xml b/eagle-core/eagle-app/eagle-app-base/src/main/resources/applications.xml
deleted file mode 100644
index 5f67807..0000000
--- a/eagle-core/eagle-app/eagle-app-base/src/main/resources/applications.xml
+++ /dev/null
@@ -1,42 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one or more
- ~ contributor license agreements. See the NOTICE file distributed with
- ~ this work for additional information regarding copyright ownership.
- ~ The ASF licenses this file to You under the Apache License, Version 2.0
- ~ (the "License"); you may not use this file except in compliance with
- ~ the License. You may obtain a copy of the License at
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing, software
- ~ distributed under the License is distributed on an "AS IS" BASIS,
- ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ~ See the License for the specific language governing permissions and
- ~ limitations under the License.
- -->
-
-<applications>
- <application>
- <type>EXAMPLE_APP</type>
- <version>0.2.0</version>
- <name>Example Application</name>
- <jarPath>target/apache-eagle-example-app.jar</jarPath>
- <className>org.apache.eagle.app.base.example.ExampleApplication</className>
- <!-- 'view' provides UI elements like portal/widget/dashboard, etc. -->
- <viewPath>webapp/app/example</viewPath>
- <configuration>
- <property>
- <name>kafka.topic</name>
- <value>hdfs_audit</value>
- <description>Kafka Topic</description>
- </property>
- <property>
- <name>zookeeper.server</name>
- <displayName>Zookeeper Server</displayName>
- <value>localhost:2181</value>
- <description>Zookeeper Server address</description>
- </property>
- </configuration>
- </application>
-</applications>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/ApplicationProviderServiceTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/ApplicationProviderServiceTest.java b/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/ApplicationProviderServiceTest.java
index 61f7542..a3f0ba0 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/ApplicationProviderServiceTest.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/ApplicationProviderServiceTest.java
@@ -19,12 +19,11 @@ package org.apache.eagle.app;
import com.google.inject.Guice;
import com.google.inject.Injector;
-import org.apache.eagle.app.config.ApplicationProviderConfig;
import org.apache.eagle.app.service.ApplicationProviderService;
import org.apache.eagle.app.spi.ApplicationProvider;
import org.apache.eagle.common.module.CommonGuiceModule;
import org.apache.eagle.metadata.model.ApplicationDesc;
-import org.apache.eagle.metadata.persistence.MemoryMetadataStore;
+import org.apache.eagle.metadata.service.memory.MemoryMetadataStore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/TestApplicationImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/TestApplicationImpl.java b/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/TestApplicationImpl.java
index 90a2b90..d379185 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/TestApplicationImpl.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/TestApplicationImpl.java
@@ -28,45 +28,35 @@ import org.junit.Ignore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Arrays;
import java.util.Map;
@Ignore
public class TestApplicationImpl extends AbstractApplication {
private final static Logger LOG = LoggerFactory.getLogger(TestApplicationImpl.class);
- public class RandomEventSpout extends BaseRichSpout {
- SpoutOutputCollector _collector;
+ protected void buildApp(TopologyBuilder builder, ApplicationContext context) {
+ builder.setSpout("metric_spout", new RandomEventSpout(), 4);
+ builder.setBolt("sink_1",context.getFlattenStreamSink("SAMPLE_STREAM_1")).fieldsGrouping("metric_spout",new Fields("metric"));
+ builder.setBolt("sink_2",context.getFlattenStreamSink("SAMPLE_STREAM_2")).fieldsGrouping("metric_spout",new Fields("metric"));
+ }
- @SuppressWarnings("rawtypes")
+ private class RandomEventSpout extends BaseRichSpout {
+ private SpoutOutputCollector _collector;
@Override
- public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
- _collector = collector;
+ public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
+ _collector = spoutOutputCollector;
}
@Override
public void nextTuple() {
-
- }
-
- @Override
- public void ack(Object id) {
- //Ignored
+ _collector.emit(Arrays.asList("disk.usage",System.currentTimeMillis(),"host_1",56.7));
+ _collector.emit(Arrays.asList("cpu.usage",System.currentTimeMillis(),"host_2",99.8));
}
@Override
- public void fail(Object id) {
- _collector.emit(new Values(id), id);
+ public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
+ outputFieldsDeclarer.declare(new Fields("metric","timestamp","source","value"));
}
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("key","event"));
- }
- }
-
- protected void buildTopology(TopologyBuilder builder, ApplicationContext context) {
- builder.setSpout("mockMetricSpout", new RandomEventSpout(), 4);
- builder.setBolt("sink_1",context.getStreamSink("TEST_STREAM_1")).fieldsGrouping("mockMetricSpout",new Fields("key"));
- builder.setBolt("sink_2",context.getStreamSink("TEST_STREAM_2")).fieldsGrouping("mockMetricSpout",new Fields("key"));
}
public static class Provider extends AbstractApplicationProvider<TestApplicationImpl> {
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/TestApplicationTestSuite.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/TestApplicationTestSuite.java b/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/TestApplicationTestSuite.java
index f86f903..10fcffc 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/TestApplicationTestSuite.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/TestApplicationTestSuite.java
@@ -18,8 +18,8 @@ package org.apache.eagle.app;
import com.google.inject.Inject;
import org.apache.eagle.app.resource.ApplicationResource;
-import org.apache.eagle.app.service.AppOperations;
-import org.apache.eagle.app.test.AppSimulator;
+import org.apache.eagle.app.service.ApplicationOperations;
+import org.apache.eagle.app.test.ApplicationSimulator;
import org.apache.eagle.app.test.AppUnitTestRunner;
import org.apache.eagle.metadata.model.ApplicationDesc;
import org.apache.eagle.metadata.model.ApplicationEntity;
@@ -35,7 +35,7 @@ import java.util.Collection;
public class TestApplicationTestSuite {
@Inject private SiteResource siteResource;
@Inject private ApplicationResource applicationResource;
- @Inject private AppSimulator simulator;
+ @Inject private ApplicationSimulator simulator;
@Test
public void testApplicationProviderLoading(){
@@ -55,13 +55,13 @@ public class TestApplicationTestSuite {
Assert.assertNotNull(siteEntity.getUuid());
// Install application
- ApplicationEntity applicationEntity = applicationResource.installApplication(new AppOperations.InstallOperation("test_site","TEST_APPLICATION", ApplicationEntity.Mode.LOCAL));
+ ApplicationEntity applicationEntity = applicationResource.installApplication(new ApplicationOperations.InstallOperation("test_site","TEST_APPLICATION", ApplicationEntity.Mode.LOCAL));
// Start application
- applicationResource.startApplication(new AppOperations.StartOperation(applicationEntity.getUuid()));
+ applicationResource.startApplication(new ApplicationOperations.StartOperation(applicationEntity.getUuid()));
// Stop application
- applicationResource.stopApplication(new AppOperations.StopOperation(applicationEntity.getUuid()));
+ applicationResource.stopApplication(new ApplicationOperations.StopOperation(applicationEntity.getUuid()));
// Uninstall application
- applicationResource.uninstallApplication(new AppOperations.UninstallOperation(applicationEntity.getUuid()));
+ applicationResource.uninstallApplication(new ApplicationOperations.UninstallOperation(applicationEntity.getUuid()));
try {
applicationResource.getApplicationEntityByUUID(applicationEntity.getUuid());
Assert.fail("Application instance (UUID: "+applicationEntity.getUuid()+") should have been uninstalled");
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/eagle-core/eagle-app/eagle-app-base/src/test/resources/TestApplicationMetadata.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/test/resources/TestApplicationMetadata.xml b/eagle-core/eagle-app/eagle-app-base/src/test/resources/TestApplicationMetadata.xml
index 36a64d0..4582ed1 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/test/resources/TestApplicationMetadata.xml
+++ b/eagle-core/eagle-app/eagle-app-base/src/test/resources/TestApplicationMetadata.xml
@@ -44,6 +44,10 @@
<type>string</type>
</column>
<column>
+ <name>source</name>
+ <type>string</type>
+ </column>
+ <column>
<name>value</name>
<type>double</type>
<defaultValue>0.0</defaultValue>
@@ -61,6 +65,10 @@
<type>string</type>
</column>
<column>
+ <name>source</name>
+ <type>string</type>
+ </column>
+ <column>
<name>value</name>
<type>double</type>
<defaultValue>0.0</defaultValue>
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/eagle-core/eagle-app/eagle-app-base/src/test/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/test/resources/application.conf b/eagle-core/eagle-app/eagle-app-base/src/test/resources/application.conf
index 49654c0..ac0b76c 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/test/resources/application.conf
+++ b/eagle-core/eagle-app/eagle-app-base/src/test/resources/application.conf
@@ -40,7 +40,7 @@
}
},
"metadata":{
- "store": "org.apache.eagle.metadata.persistence.MemoryMetadataStore"
+ "store": "org.apache.eagle.metadata.service.memory.MemoryMetadataStore"
},
"application":{
"sink":{
@@ -48,7 +48,7 @@
"boostrap.server":"localhost:9092"
}
"provider":{
- "loader":"org.apache.eagle.app.service.loader.ApplicationProviderSPILoader"
+ "loader": "org.apache.eagle.app.service.impl.ApplicationProviderSPILoader"
}
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/eagle-core/eagle-metadata/eagle-metadata-base/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-metadata/eagle-metadata-base/pom.xml b/eagle-core/eagle-metadata/eagle-metadata-base/pom.xml
index 5e74a41..ffe8c73 100644
--- a/eagle-core/eagle-metadata/eagle-metadata-base/pom.xml
+++ b/eagle-core/eagle-metadata/eagle-metadata-base/pom.xml
@@ -49,5 +49,9 @@
<groupId>com.google.inject.extensions</groupId>
<artifactId>guice-servlet</artifactId>
</dependency>
+ <dependency>
+ <groupId>io.dropwizard</groupId>
+ <artifactId>dropwizard-servlets</artifactId>
+ </dependency>
</dependencies>
</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/model/ApplicationEntity.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/model/ApplicationEntity.java b/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/model/ApplicationEntity.java
index 09377ab..852041a 100644
--- a/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/model/ApplicationEntity.java
+++ b/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/model/ApplicationEntity.java
@@ -79,7 +79,7 @@ public class ApplicationEntity extends PersistenceEntity {
public void ensureDefault() {
super.ensureDefault();
if(this.appId == null){
- this.appId = String.format("EAGLE_APP_%s_%s",this.getSite().getSiteId(),this.getDescriptor().getType());
+ this.appId = String.format("EAGLE_APP[TYPE=%s,SITE=%s]",this.getDescriptor().getType(),this.getSite().getSiteId());
}
if(this.status == null){
this.status = Status.INITIAILIZED;
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/model/StreamSinkDesc.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/model/StreamSinkDesc.java b/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/model/StreamSinkDesc.java
new file mode 100644
index 0000000..42d7603
--- /dev/null
+++ b/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/model/StreamSinkDesc.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.metadata.model;
+
+import java.util.Map;
+
+public class StreamSinkDesc {
+ private Class<?> sinkType;
+ private Map<String,Object> sinkContext;
+
+ public Class<?> getSinkType() {
+ return sinkType;
+ }
+
+ public void setSinkType(Class<?> sinkType) {
+ this.sinkType = sinkType;
+ }
+
+ public Map<String, Object> getSinkContext() {
+ return sinkContext;
+ }
+
+ public void setSinkContext(Map<String, Object> sinkContext) {
+ this.sinkContext = sinkContext;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/persistence/MemoryMetadataStore.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/persistence/MemoryMetadataStore.java b/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/persistence/MemoryMetadataStore.java
deleted file mode 100644
index 5e3c2f4..0000000
--- a/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/persistence/MemoryMetadataStore.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.metadata.persistence;
-
-import org.apache.eagle.alert.metadata.IMetadataDao;
-import org.apache.eagle.alert.metadata.impl.InMemMetadataDaoImpl;
-import org.apache.eagle.metadata.service.ApplicationEntityService;
-import org.apache.eagle.metadata.service.SiteEntityService;
-import org.apache.eagle.metadata.service.memory.ApplicationEntityServiceMemoryImpl;
-import org.apache.eagle.metadata.service.memory.SiteEntityEntityServiceMemoryImpl;
-
-public class MemoryMetadataStore extends MetadataStore {
- @Override
- protected void configure() {
- bind(SiteEntityService.class).to(SiteEntityEntityServiceMemoryImpl.class);
- bind(ApplicationEntityService.class).to(ApplicationEntityServiceMemoryImpl.class);
- bind(IMetadataDao.class).to(InMemMetadataDaoImpl.class);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/persistence/MetadataStore.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/persistence/MetadataStore.java b/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/persistence/MetadataStore.java
index 71b1476..0ad8c87 100644
--- a/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/persistence/MetadataStore.java
+++ b/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/persistence/MetadataStore.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -19,6 +19,7 @@ package org.apache.eagle.metadata.persistence;
import com.google.inject.AbstractModule;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
+import org.apache.eagle.metadata.service.memory.MemoryMetadataStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -27,6 +28,7 @@ public abstract class MetadataStore extends AbstractModule {
public static final String METADATA_STORE_CONFIG_KEY = "metadata.store";
private static MetadataStore instance;
+
public static MetadataStore getInstance(){
String metadataStoreClass = null;
if(instance == null) {
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/service/memory/MemoryMetadataStore.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/service/memory/MemoryMetadataStore.java b/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/service/memory/MemoryMetadataStore.java
new file mode 100644
index 0000000..0bdb199
--- /dev/null
+++ b/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/service/memory/MemoryMetadataStore.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.metadata.service.memory;
+
+import org.apache.eagle.alert.metadata.IMetadataDao;
+import org.apache.eagle.alert.metadata.impl.InMemMetadataDaoImpl;
+import org.apache.eagle.metadata.persistence.MetadataStore;
+import org.apache.eagle.metadata.service.ApplicationEntityService;
+import org.apache.eagle.metadata.service.SiteEntityService;
+
+public class MemoryMetadataStore extends MetadataStore {
+ @Override
+ protected void configure() {
+ bind(SiteEntityService.class).to(SiteEntityEntityServiceMemoryImpl.class);
+ bind(ApplicationEntityService.class).to(ApplicationEntityServiceMemoryImpl.class);
+ bind(IMetadataDao.class).to(InMemMetadataDaoImpl.class);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/eagle-examples/eagle-app-example/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-examples/eagle-app-example/pom.xml b/eagle-examples/eagle-app-example/pom.xml
index 45c887c..5d00fc8 100644
--- a/eagle-examples/eagle-app-example/pom.xml
+++ b/eagle-examples/eagle-app-example/pom.xml
@@ -26,17 +26,11 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>eagle-app-example</artifactId>
<dependencies>
-
<dependency>
<groupId>org.apache.eagle</groupId>
<artifactId>eagle-app-base</artifactId>
<version>${project.version}</version>
</dependency>
- <dependency>
- <groupId>org.mockito</groupId>
- <artifactId>mockito-all</artifactId>
- <version>1.10.19</version>
- </dependency>
</dependencies>
<build>
<resources>
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/eagle-examples/eagle-app-example/src/main/java/org/apache/eagle/app/example/ExampleApplication.java
----------------------------------------------------------------------
diff --git a/eagle-examples/eagle-app-example/src/main/java/org/apache/eagle/app/example/ExampleApplication.java b/eagle-examples/eagle-app-example/src/main/java/org/apache/eagle/app/example/ExampleApplication.java
index 71037b3..5e6e548 100644
--- a/eagle-examples/eagle-app-example/src/main/java/org/apache/eagle/app/example/ExampleApplication.java
+++ b/eagle-examples/eagle-app-example/src/main/java/org/apache/eagle/app/example/ExampleApplication.java
@@ -16,15 +16,41 @@
*/
package org.apache.eagle.app.example;
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
-import org.apache.eagle.app.ApplicationContext;
import org.apache.eagle.app.AbstractApplication;
+import org.apache.eagle.app.ApplicationContext;
+
+import java.util.Arrays;
+import java.util.Map;
public class ExampleApplication extends AbstractApplication {
- protected void buildTopology(TopologyBuilder builder, ApplicationContext context) {
- builder.setSpout("mockMetricSpout", new RandomEventSpout(), 4);
- builder.setBolt("sink_1",context.getStreamSink("SAMPLE_STREAM_1")).fieldsGrouping("mockMetricSpout",new Fields("key"));
- builder.setBolt("sink_2",context.getStreamSink("SAMPLE_STREAM_2")).fieldsGrouping("mockMetricSpout",new Fields("key"));
+ protected void buildApp(TopologyBuilder builder, ApplicationContext context) {
+ builder.setSpout("metric_spout", new RandomEventSpout(), 4);
+ builder.setBolt("sink_1",context.getFlattenStreamSink("SAMPLE_STREAM_1")).fieldsGrouping("metric_spout",new Fields("metric"));
+ builder.setBolt("sink_2",context.getFlattenStreamSink("SAMPLE_STREAM_2")).fieldsGrouping("metric_spout",new Fields("metric"));
+ }
+
+ private class RandomEventSpout extends BaseRichSpout {
+ private SpoutOutputCollector _collector;
+ @Override
+ public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
+ _collector = spoutOutputCollector;
+ }
+
+ @Override
+ public void nextTuple() {
+ _collector.emit(Arrays.asList("disk.usage",System.currentTimeMillis(),"host_1",56.7));
+ _collector.emit(Arrays.asList("cpu.usage",System.currentTimeMillis(),"host_2",99.8));
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
+ outputFieldsDeclarer.declare(new Fields("metric","timestamp","source","value"));
+ }
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/eagle-examples/eagle-app-example/src/main/java/org/apache/eagle/app/example/ExampleApplicationProvider.java
----------------------------------------------------------------------
diff --git a/eagle-examples/eagle-app-example/src/main/java/org/apache/eagle/app/example/ExampleApplicationProvider.java b/eagle-examples/eagle-app-example/src/main/java/org/apache/eagle/app/example/ExampleApplicationProvider.java
index bd4e9b1..5949853 100644
--- a/eagle-examples/eagle-app-example/src/main/java/org/apache/eagle/app/example/ExampleApplicationProvider.java
+++ b/eagle-examples/eagle-app-example/src/main/java/org/apache/eagle/app/example/ExampleApplicationProvider.java
@@ -47,9 +47,8 @@ public class ExampleApplicationProvider extends AbstractApplicationProvider<Exam
sampleStreamDefinition.setDescription("Auto generated sample Schema for "+streamId);
List<StreamColumn> streamColumns = new ArrayList<>();
- streamColumns.add(new StreamColumn.Builder().name("name").type(StreamColumn.Type.STRING).build());
- streamColumns.add(new StreamColumn.Builder().name("host").type(StreamColumn.Type.STRING).build());
- streamColumns.add(new StreamColumn.Builder().name("flag").type(StreamColumn.Type.BOOL).build());
+ streamColumns.add(new StreamColumn.Builder().name("metric").type(StreamColumn.Type.STRING).build());
+ streamColumns.add(new StreamColumn.Builder().name("source").type(StreamColumn.Type.STRING).build());
streamColumns.add(new StreamColumn.Builder().name("timestamp").type(StreamColumn.Type.LONG).build());
streamColumns.add(new StreamColumn.Builder().name("value").type(StreamColumn.Type.DOUBLE).build());
sampleStreamDefinition.setColumns(streamColumns);
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/eagle-examples/eagle-app-example/src/main/java/org/apache/eagle/app/example/ExampleApplicationProvider2.java
----------------------------------------------------------------------
diff --git a/eagle-examples/eagle-app-example/src/main/java/org/apache/eagle/app/example/ExampleApplicationProvider2.java b/eagle-examples/eagle-app-example/src/main/java/org/apache/eagle/app/example/ExampleApplicationProvider2.java
index f4b3f2f..ca5a69b 100644
--- a/eagle-examples/eagle-app-example/src/main/java/org/apache/eagle/app/example/ExampleApplicationProvider2.java
+++ b/eagle-examples/eagle-app-example/src/main/java/org/apache/eagle/app/example/ExampleApplicationProvider2.java
@@ -23,7 +23,7 @@ import org.apache.eagle.app.spi.AbstractApplicationProvider;
*/
public class ExampleApplicationProvider2 extends AbstractApplicationProvider<ExampleApplication> {
public ExampleApplicationProvider2() {
- super("ExampleApplicationMetadata.xml");
+ super("/META-INF/apps/example/metadata.xml");
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/eagle-examples/eagle-app-example/src/main/java/org/apache/eagle/app/example/RandomEventSpout.java
----------------------------------------------------------------------
diff --git a/eagle-examples/eagle-app-example/src/main/java/org/apache/eagle/app/example/RandomEventSpout.java b/eagle-examples/eagle-app-example/src/main/java/org/apache/eagle/app/example/RandomEventSpout.java
deleted file mode 100644
index ac507b6..0000000
--- a/eagle-examples/eagle-app-example/src/main/java/org/apache/eagle/app/example/RandomEventSpout.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.example;
-
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichSpout;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Values;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-
-public class RandomEventSpout extends BaseRichSpout {
- private final static Logger LOG = LoggerFactory.getLogger(RandomEventSpout.class);
- SpoutOutputCollector _collector;
-
- @SuppressWarnings("rawtypes")
- @Override
- public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
- _collector = collector;
- }
-
- @Override
- public void nextTuple() {
-
- }
-
- @Override
- public void ack(Object id) {
- //Ignored
- }
-
- @Override
- public void fail(Object id) {
- _collector.emit(new Values(id), id);
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("key","event"));
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/eagle-examples/eagle-app-example/src/main/resources/ExampleApplicationMetadata.xml
----------------------------------------------------------------------
diff --git a/eagle-examples/eagle-app-example/src/main/resources/ExampleApplicationMetadata.xml b/eagle-examples/eagle-app-example/src/main/resources/ExampleApplicationMetadata.xml
index b61ed54..0e5536c 100644
--- a/eagle-examples/eagle-app-example/src/main/resources/ExampleApplicationMetadata.xml
+++ b/eagle-examples/eagle-app-example/src/main/resources/ExampleApplicationMetadata.xml
@@ -42,6 +42,10 @@
<type>string</type>
</column>
<column>
+ <name>source</name>
+ <type>string</type>
+ </column>
+ <column>
<name>value</name>
<type>double</type>
<defaultValue>0.0</defaultValue>
@@ -59,6 +63,10 @@
<type>string</type>
</column>
<column>
+ <name>source</name>
+ <type>string</type>
+ </column>
+ <column>
<name>value</name>
<type>double</type>
<defaultValue>0.0</defaultValue>
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/eagle-examples/eagle-app-example/src/main/resources/META-INF/apps/example/metadata.xml
----------------------------------------------------------------------
diff --git a/eagle-examples/eagle-app-example/src/main/resources/META-INF/apps/example/metadata.xml b/eagle-examples/eagle-app-example/src/main/resources/META-INF/apps/example/metadata.xml
new file mode 100644
index 0000000..0e5536c
--- /dev/null
+++ b/eagle-examples/eagle-app-example/src/main/resources/META-INF/apps/example/metadata.xml
@@ -0,0 +1,109 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<application>
+ <type>EXAMPLE_APPLICATION_2</type>
+ <name>Example Monitoring Application</name>
+ <version>0.5.0-incubating</version>
+ <appClass>org.apache.eagle.app.example.ExampleApplication</appClass>
+ <viewPath>/apps/example</viewPath>
+ <configuration>
+ <property>
+ <name>message</name>
+ <displayName>Message</displayName>
+ <value>Hello, example application!</value>
+ <description>Just an sample configuration property</description>
+ </property>
+ </configuration>
+ <streams>
+ <stream>
+ <streamId>SAMPLE_STREAM_1</streamId>
+ <description>Sample output stream #1</description>
+ <validate>true</validate>
+ <timeseries>true</timeseries>
+ <columns>
+ <column>
+ <name>metric</name>
+ <type>string</type>
+ </column>
+ <column>
+ <name>source</name>
+ <type>string</type>
+ </column>
+ <column>
+ <name>value</name>
+ <type>double</type>
+ <defaultValue>0.0</defaultValue>
+ </column>
+ </columns>
+ </stream>
+ <stream>
+ <streamId>SAMPLE_STREAM_2</streamId>
+ <description>Sample output stream #2</description>
+ <validate>true</validate>
+ <timeseries>true</timeseries>
+ <columns>
+ <column>
+ <name>metric</name>
+ <type>string</type>
+ </column>
+ <column>
+ <name>source</name>
+ <type>string</type>
+ </column>
+ <column>
+ <name>value</name>
+ <type>double</type>
+ <defaultValue>0.0</defaultValue>
+ </column>
+ </columns>
+ </stream>
+ </streams>
+ <docs>
+ <install>
+# Step 1: Create source kafka topic named "${site}_example_source_topic"
+
+./bin/kafka-topics.sh --create --topic example_source_topic --replication-factor 1 --replication 1
+
+# Step 2: Set up data collector to flow data into kafka topic in
+
+./bin/logstash -f log_collector.conf
+
+## `log_collector.conf` sample as following:
+
+input {
+
+}
+filter {
+
+}
+output{
+
+}
+
+# Step 3: start application
+
+# Step 4: monitor with featured portal or alert with policies
+ </install>
+ <uninstall>
+# Step 1: stop and uninstall application
+# Step 2: delete kafka topic named "${site}_example_source_topic"
+# Step 3: stop logstash
+ </uninstall>
+ </docs>
+</application>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/eagle-examples/eagle-app-example/src/test/java/org/apache/eagle/app/example/ExampleApplicationTest.java
----------------------------------------------------------------------
diff --git a/eagle-examples/eagle-app-example/src/test/java/org/apache/eagle/app/example/ExampleApplicationTest.java b/eagle-examples/eagle-app-example/src/test/java/org/apache/eagle/app/example/ExampleApplicationTest.java
index 3002025..f29c6ff 100644
--- a/eagle-examples/eagle-app-example/src/test/java/org/apache/eagle/app/example/ExampleApplicationTest.java
+++ b/eagle-examples/eagle-app-example/src/test/java/org/apache/eagle/app/example/ExampleApplicationTest.java
@@ -18,8 +18,8 @@ package org.apache.eagle.app.example;
import com.google.inject.Inject;
import org.apache.eagle.app.resource.ApplicationResource;
-import org.apache.eagle.app.service.AppOperations;
-import org.apache.eagle.app.test.AppSimulator;
+import org.apache.eagle.app.service.ApplicationOperations;
+import org.apache.eagle.app.test.ApplicationSimulator;
import org.apache.eagle.app.test.AppUnitTestRunner;
import org.apache.eagle.metadata.model.ApplicationDesc;
import org.apache.eagle.metadata.model.ApplicationEntity;
@@ -35,7 +35,7 @@ import java.util.Collection;
public class ExampleApplicationTest {
@Inject private SiteResource siteResource;
@Inject private ApplicationResource applicationResource;
- @Inject private AppSimulator simulator;
+ @Inject private ApplicationSimulator simulator;
@Test
public void testApplicationProviderLoading(){
@@ -64,13 +64,13 @@ public class ExampleApplicationTest {
Assert.assertNotNull(siteEntity.getUuid());
// Install application
- ApplicationEntity applicationEntity = applicationResource.installApplication(new AppOperations.InstallOperation("test_site","EXAMPLE_APPLICATION", ApplicationEntity.Mode.LOCAL));
+ ApplicationEntity applicationEntity = applicationResource.installApplication(new ApplicationOperations.InstallOperation("test_site","EXAMPLE_APPLICATION", ApplicationEntity.Mode.LOCAL));
// Start application
- applicationResource.startApplication(new AppOperations.StartOperation(applicationEntity.getUuid()));
+ applicationResource.startApplication(new ApplicationOperations.StartOperation(applicationEntity.getUuid()));
// Stop application
- applicationResource.stopApplication(new AppOperations.StopOperation(applicationEntity.getUuid()));
+ applicationResource.stopApplication(new ApplicationOperations.StopOperation(applicationEntity.getUuid()));
// Uninstall application
- applicationResource.uninstallApplication(new AppOperations.UninstallOperation(applicationEntity.getUuid()));
+ applicationResource.uninstallApplication(new ApplicationOperations.UninstallOperation(applicationEntity.getUuid()));
try {
applicationResource.getApplicationEntityByUUID(applicationEntity.getUuid());
Assert.fail("Application instance (UUID: "+applicationEntity.getUuid()+") should have been uninstalled");
@@ -88,4 +88,18 @@ public class ExampleApplicationTest {
public void testApplicationQuickRunWithAppProvider(){
simulator.submit(ExampleApplicationProvider.class);
}
+
+ @Test
+ public void testApplicationQuickRunWithAppProvider2(){
+ simulator.submit(ExampleApplicationProvider2.class);
+ }
+
+
+ /**
+ * For DEBUG
+ * @param args
+ */
+ public static void main(String[] args){
+ ApplicationSimulator.getInstance().submit(ExampleApplicationProvider.class);
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/eagle-examples/eagle-app-example/src/test/java/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-examples/eagle-app-example/src/test/java/resources/application.conf b/eagle-examples/eagle-app-example/src/test/java/resources/application.conf
new file mode 100644
index 0000000..18677b2
--- /dev/null
+++ b/eagle-examples/eagle-app-example/src/test/java/resources/application.conf
@@ -0,0 +1,58 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+{
+ "coordinator" : {
+ "policiesPerBolt" : 5,
+ "boltParallelism" : 5,
+ "policyDefaultParallelism" : 5,
+ "boltLoadUpbound": 0.8,
+ "topologyLoadUpbound" : 0.8,
+ "numOfAlertBoltsPerTopology" : 5,
+ "zkConfig" : {
+ "zkQuorum" : "127.0.0.1:2181",
+ "zkRoot" : "/alert",
+ "zkSessionTimeoutMs" : 10000,
+ "connectionTimeoutMs" : 10000,
+ "zkRetryTimes" : 3,
+ "zkRetryInterval" : 3000
+ },
+ "metadataService" : {
+ "host" : "localhost",
+ "port" : 8080,
+ "context" : "/rest"
+ },
+ "metadataDynamicCheck" : {
+ "initDelayMillis" : 1000,
+ "delayMillis" : 30000
+ }
+ },
+ "metadata":{
+ "store": "org.apache.eagle.metadata.service.memory.MemoryMetadataStore"
+ },
+ "application":{
+ "sink":{
+ "type": "org.apache.eagle.app.sink.KafkaStreamSink",
+ "config": {
+ "kafkaBrokerHost" : "",
+ "kafkaZkConnection" : ""
+ }
+ },
+ "storm": {
+ "nimbusHost": "localhost"
+ "nimbusThriftPort": 6627
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/eagle-server/src/main/java/org/apache/eagle/server/ServerApplication.java
----------------------------------------------------------------------
diff --git a/eagle-server/src/main/java/org/apache/eagle/server/ServerApplication.java b/eagle-server/src/main/java/org/apache/eagle/server/ServerApplication.java
index c64fa83..d7bd14a 100644
--- a/eagle-server/src/main/java/org/apache/eagle/server/ServerApplication.java
+++ b/eagle-server/src/main/java/org/apache/eagle/server/ServerApplication.java
@@ -33,11 +33,12 @@ import org.apache.eagle.metadata.persistence.MetadataStore;
import javax.servlet.DispatcherType;
import java.util.EnumSet;
-public class ServerApplication extends Application<ServerConfig> {
+class ServerApplication extends Application<ServerConfig> {
+ private GuiceBundle<ServerConfig> guiceBundle;
@Override
public void initialize(Bootstrap<ServerConfig> bootstrap) {
- GuiceBundle<ServerConfig> guiceBundle = GuiceBundle.<ServerConfig>newBuilder()
+ guiceBundle = GuiceBundle.<ServerConfig>newBuilder()
.addModule(new CommonGuiceModule())
.addModule(MetadataStore.getInstance())
.addModule(new ApplicationGuiceModule())
@@ -63,11 +64,14 @@ public class ServerApplication extends Application<ServerConfig> {
// Swagger resources
environment.jersey().register(ApiListingResource.class);
+
BeanConfig swaggerConfig = new BeanConfig();
swaggerConfig.setTitle(ServerConfig.getServerName());
- swaggerConfig.setVersion(ServerConfig.getServerName());
+ swaggerConfig.setVersion(ServerConfig.getServerVersion());
swaggerConfig.setBasePath(ServerConfig.getApiBasePath());
swaggerConfig.setResourcePackage(ServerConfig.getResourcePackage());
+ swaggerConfig.setLicense(ServerConfig.getLicense());
+ swaggerConfig.setLicenseUrl(ServerConfig.getLicenseUrl());
swaggerConfig.setScan(true);
// Simple CORS filter
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/eagle-server/src/main/java/org/apache/eagle/server/ServerConfig.java
----------------------------------------------------------------------
diff --git a/eagle-server/src/main/java/org/apache/eagle/server/ServerConfig.java b/eagle-server/src/main/java/org/apache/eagle/server/ServerConfig.java
index 5ca9256..55c5c5d 100644
--- a/eagle-server/src/main/java/org/apache/eagle/server/ServerConfig.java
+++ b/eagle-server/src/main/java/org/apache/eagle/server/ServerConfig.java
@@ -20,33 +20,43 @@ import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import io.dropwizard.Configuration;
-public class ServerConfig extends Configuration {
+class ServerConfig extends Configuration {
private final static String SERVER_NAME = "Apache Eagle";
private final static String SERVER_VERSION = "0.5.0-incubating";
private final static String API_BASE_PATH = "/rest/*";
private final static String CONTEXT_PATH="/";
private final static String RESOURCE_PACKAGE = "org.apache.eagle";
+ private final static String LICENSE = "Apache License (Version 2.0)";
+ private final static String LICENSE_URL = "http://www.apache.org/licenses/LICENSE-2.0";
public Config getConfig(){
return ConfigFactory.load();
}
- public static String getServerName(){
+ static String getServerName(){
return SERVER_NAME;
}
- public static String getServerVersion(){
+ static String getServerVersion(){
return SERVER_VERSION;
}
- public static String getApiBasePath(){
+ static String getApiBasePath(){
return API_BASE_PATH;
}
- public static String getResourcePackage(){
+ static String getResourcePackage(){
return RESOURCE_PACKAGE;
}
- public static String getContextPath(){
+ static String getContextPath(){
return CONTEXT_PATH;
}
+
+ public static String getLicense(){
+ return LICENSE;
+ }
+
+ static String getLicenseUrl(){
+ return LICENSE_URL;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/eagle-server/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-server/src/main/resources/application.conf b/eagle-server/src/main/resources/application.conf
index 48a2723..19b87b2 100644
--- a/eagle-server/src/main/resources/application.conf
+++ b/eagle-server/src/main/resources/application.conf
@@ -39,16 +39,13 @@
"delayMillis" : 30000
}
},
- "datastore": {
- "metadataDao": "org.apache.eagle.alert.metadata.impl.InMemMetadataDaoImpl"
- }
"metadata":{
- "store": "org.apache.eagle.metadata.persistence.MemoryMetadataStore"
+ "store": "org.apache.eagle.metadata.service.memory.MemoryMetadataStore"
},
"application":{
"sink":{
- "type":"org.apache.eagle.app.sink.KafkaStreamSink"
- }
+ "type": "org.apache.eagle.app.sink.KafkaStreamSink"
+ },
"storm": {
"nimbusHost": "localhost"
"nimbusThriftPort": 6627