You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2016/07/22 16:54:13 UTC

[1/3] incubator-eagle git commit: [EAGLE-386] Refactor Application Framework Interfaces and StreamEventMapper

Repository: incubator-eagle
Updated Branches:
  refs/heads/develop e21b073f6 -> e73e35da0


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/eagle-server/src/main/resources/providers-disabled.xml
----------------------------------------------------------------------
diff --git a/eagle-server/src/main/resources/providers-disabled.xml b/eagle-server/src/main/resources/providers-disabled.xml
new file mode 100644
index 0000000..b434715
--- /dev/null
+++ b/eagle-server/src/main/resources/providers-disabled.xml
@@ -0,0 +1,27 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<providers>
+    <provider>
+        <jarPath>target/apache-eagle-example-app.jar</jarPath>
+        <className>org.apache.eagle.app.example.ExampleApplicationProvider</className>
+    </provider>
+    <provider>
+        <jarPath>target/apache-eagle-example-app.jar</jarPath>
+        <className>org.apache.eagle.app.example.ExampleApplicationProvider2</className>
+    </provider>
+</providers>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/eagle-server/src/main/resources/providers.xml
----------------------------------------------------------------------
diff --git a/eagle-server/src/main/resources/providers.xml b/eagle-server/src/main/resources/providers.xml
deleted file mode 100644
index b434715..0000000
--- a/eagle-server/src/main/resources/providers.xml
+++ /dev/null
@@ -1,27 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one or more
-  ~ contributor license agreements.  See the NOTICE file distributed with
-  ~ this work for additional information regarding copyright ownership.
-  ~ The ASF licenses this file to You under the Apache License, Version 2.0
-  ~ (the "License"); you may not use this file except in compliance with
-  ~ the License.  You may obtain a copy of the License at
-  ~
-  ~    http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing, software
-  ~ distributed under the License is distributed on an "AS IS" BASIS,
-  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  ~ See the License for the specific language governing permissions and
-  ~ limitations under the License.
-  -->
-<providers>
-    <provider>
-        <jarPath>target/apache-eagle-example-app.jar</jarPath>
-        <className>org.apache.eagle.app.example.ExampleApplicationProvider</className>
-    </provider>
-    <provider>
-        <jarPath>target/apache-eagle-example-app.jar</jarPath>
-        <className>org.apache.eagle.app.example.ExampleApplicationProvider2</className>
-    </provider>
-</providers>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/eagle-server/src/main/webapp/WEB-INF/web.xml
----------------------------------------------------------------------
diff --git a/eagle-server/src/main/webapp/WEB-INF/web.xml b/eagle-server/src/main/webapp/WEB-INF/web.xml
new file mode 100644
index 0000000..9b8c5ac
--- /dev/null
+++ b/eagle-server/src/main/webapp/WEB-INF/web.xml
@@ -0,0 +1,88 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<web-app xmlns="http://java.sun.com/xml/ns/javaee"
+           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+           xsi:schemaLocation="http://java.sun.com/xml/ns/javaee
+		  http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd"
+           version="3.0">
+    <welcome-file-list>
+        <welcome-file>index.html</welcome-file>
+    </welcome-file-list>
+
+    
+    <servlet>
+        <servlet-name>Jersey Web Application</servlet-name>
+        <servlet-class>com.sun.jersey.spi.container.servlet.ServletContainer</servlet-class>
+        <init-param>
+            <param-name>com.sun.jersey.config.property.packages</param-name>
+            <param-value>io.swagger.jaxrs.json,io.swagger.jaxrs.listing,org.apache.eagle,org.codehaus.jackson.jaxrs</param-value>
+        </init-param>
+        <init-param>
+            <param-name>com.sun.jersey.spi.container.ContainerRequestFilters</param-name>
+            <param-value>com.sun.jersey.api.container.filter.GZIPContentEncodingFilter;com.sun.jersey.api.container.filter.PostReplaceFilter</param-value>
+        </init-param>
+        <init-param>
+            <param-name>com.sun.jersey.spi.container.ContainerResponseFilters</param-name>
+            <param-value>com.sun.jersey.api.container.filter.GZIPContentEncodingFilter</param-value>
+        </init-param>
+        <load-on-startup>1</load-on-startup>
+    </servlet>
+    <!-- Servlet for swagger initialization only, no URL mapping. -->
+	<servlet>
+		<servlet-name>swaggerConfig</servlet-name>
+		<servlet-class>io.swagger.jaxrs.config.DefaultJaxrsConfig</servlet-class>
+		<init-param>
+			<param-name>api.version</param-name>
+			<param-value>1.0.0</param-value>
+		</init-param>
+		<init-param>
+			<param-name>swagger.api.basepath</param-name>
+			<param-value>/rest</param-value>
+		</init-param>
+		<load-on-startup>2</load-on-startup>
+	</servlet>
+    <servlet-mapping>
+        <servlet-name>Jersey Web Application</servlet-name>
+        <url-pattern>/rest/*</url-pattern>
+    </servlet-mapping>
+    <filter>
+        <filter-name>CorsFilter</filter-name>
+        <!-- this should be replaced by tomcat ones, see also metadata resource -->
+        <filter-class>org.apache.eagle.alert.resource.SimpleCORSFiler</filter-class>
+        <init-param>
+            <param-name>cors.allowed.origins</param-name>
+            <param-value>*</param-value>
+        </init-param>
+        <init-param>
+            <param-name>cors.allowed.headers</param-name>
+            <param-value>Authorization,Origin, No-Cache, X-Requested-With, If-Modified-Since, Pragma, Last-Modified, Cache-Control, Expires, Content-Type, X-E4M-With, Accept</param-value>
+        </init-param>
+        <init-param>
+            <param-name>cors.allowed.methods</param-name>
+            <param-value>GET,POST,HEAD,OPTIONS,PUT,DELETE</param-value>
+        </init-param>
+        <init-param>
+            <param-name>cors.support.credentials</param-name>
+            <param-value>true</param-value>
+        </init-param>
+    </filter>
+    <filter-mapping>
+        <filter-name>CorsFilter</filter-name>
+        <url-pattern>/*</url-pattern>
+    </filter-mapping>
+</web-app>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index d7d0ccb..ccae27f 100755
--- a/pom.xml
+++ b/pom.xml
@@ -809,6 +809,17 @@
                 </exclusions>
             </dependency>
             <dependency>
+                <groupId>io.dropwizard</groupId>
+                <artifactId>dropwizard-servlets</artifactId>
+                <version>${dropwizard.version}</version>
+                <exclusions>
+                    <exclusion>
+                        <groupId>org.slf4j</groupId>
+                        <artifactId>slf4j-log4j12</artifactId>
+                    </exclusion>
+                </exclusions>
+            </dependency>
+            <dependency>
                 <groupId>io.swagger</groupId>
                 <artifactId>swagger-jersey-jaxrs</artifactId>
                 <version>${swagger.version}</version>


[3/3] incubator-eagle git commit: [EAGLE-386] Refactor Application Framework Interfaces and StreamEventMapper

Posted by ha...@apache.org.
[EAGLE-386] Refactor Application Framework Interfaces and StreamEventMapper

[Description]
h1. Application Framework Interfaces

Application Context (Runtime): org.apache.eagle.app.ApplicationContext
Application Metadata Entity (Persistence): org.apache.eagle.metadata.model.ApplicationEntity
Application Processing Logic (Execution): org.apache.eagle.app.Application
Application Lifecycle Listener (Installation): org.apache.eagle.app.ApplicationLifecycleListener

h1. StreamEventMapper (Flatten/Direct)
* FlattenEventMapper
* DirectEventMapper

h1. Metadata.xml Path Best Practice
/META-INF/apps/example/metadata.xml

[Changes]
Decouple Tuple to Event mapping with StreamEventMapper

Rename EventMapper and Application#buildApp

Integrate with ApplicationContext with ApplicationLifecycleListener to make sure callback onAppInstall/onAppUninstall

Clarify application framework interfaces and docs

Clarify ApplicationContext as single entrance of app metadata/processing/lifecycle

Clear application service code

Better metadata.xml practice: /META-INF/apps/example/metadata.xml

Fix ApplicationOperations json bug and MetadataStore related compile error


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/e73e35da
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/e73e35da
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/e73e35da

Branch: refs/heads/develop
Commit: e73e35da0fa778564fb23582478541fe390f2a0a
Parents: e21b073
Author: Chen, Hao <hc...@ebay.com>
Authored: Fri Jul 22 17:27:28 2016 +0800
Committer: Hao Chen <ha...@apache.org>
Committed: Sat Jul 23 00:50:40 2016 +0800

----------------------------------------------------------------------
 .../apache/eagle/app/AbstractApplication.java   |   4 +-
 .../java/org/apache/eagle/app/Application.java  |   2 +-
 .../apache/eagle/app/ApplicationContext.java    | 217 +++++++++++++++----
 .../eagle/app/ApplicationGuiceModule.java       |   4 +-
 .../eagle/app/ApplicationLifecycleListener.java |  39 ++++
 .../java/org/apache/eagle/app/package-info.java |  29 +++
 .../eagle/app/resource/ApplicationResource.java |  14 +-
 .../apache/eagle/app/service/AppOperations.java | 161 --------------
 .../service/ApplicationManagementService.java   |   8 +-
 .../ApplicationManagementServiceImpl.java       |  96 --------
 .../app/service/ApplicationOperations.java      | 189 ++++++++++++++++
 .../app/service/ApplicationProviderLoader.java  |   4 +-
 .../service/ApplicationProviderServiceImpl.java |  87 --------
 .../impl/ApplicationManagementServiceImpl.java  |  98 +++++++++
 .../impl/ApplicationProviderConfigLoader.java   | 128 +++++++++++
 .../impl/ApplicationProviderSPILoader.java      |  89 ++++++++
 .../impl/ApplicationProviderServiceImpl.java    |  89 ++++++++
 .../loader/ApplicationProviderConfigLoader.java | 128 -----------
 .../loader/ApplicationProviderSPILoader.java    |  89 --------
 .../eagle/app/sink/AbstractStreamSink.java      |  74 +++++++
 .../apache/eagle/app/sink/KafkaStreamSink.java  |  39 ++--
 .../eagle/app/sink/LoggingStreamSink.java       |  18 +-
 .../org/apache/eagle/app/sink/StreamSink.java   | 125 ++++-------
 .../app/sink/mapper/DirectEventMapper.java      |  19 ++
 .../mapper/FieldIndexDirectEventMapper.java     |  40 ++++
 .../sink/mapper/FieldNameDirectEventMapper.java |  40 ++++
 .../app/sink/mapper/FlattenEventMapper.java     |  78 +++++++
 .../app/sink/mapper/StreamEventMapper.java      |  33 +++
 .../app/sink/mapper/TimestampSelector.java      |  25 +++
 .../app/spi/AbstractApplicationProvider.java    |   6 +-
 .../org/apache/eagle/app/test/AppSimulator.java |  51 -----
 .../apache/eagle/app/test/AppSimulatorImpl.java |   8 +-
 .../eagle/app/test/AppTestGuiceModule.java      |   4 +-
 .../eagle/app/test/ApplicationSimulator.java    |  70 ++++++
 .../src/main/resources/applications.xml         |  42 ----
 .../app/ApplicationProviderServiceTest.java     |   3 +-
 .../apache/eagle/app/TestApplicationImpl.java   |  38 ++--
 .../eagle/app/TestApplicationTestSuite.java     |  14 +-
 .../test/resources/TestApplicationMetadata.xml  |   8 +
 .../src/test/resources/application.conf         |   4 +-
 .../eagle-metadata/eagle-metadata-base/pom.xml  |   4 +
 .../eagle/metadata/model/ApplicationEntity.java |   2 +-
 .../eagle/metadata/model/StreamSinkDesc.java    |  40 ++++
 .../persistence/MemoryMetadataStore.java        |  33 ---
 .../metadata/persistence/MetadataStore.java     |   4 +-
 .../service/memory/MemoryMetadataStore.java     |  32 +++
 eagle-examples/eagle-app-example/pom.xml        |   6 -
 .../eagle/app/example/ExampleApplication.java   |  36 ++-
 .../app/example/ExampleApplicationProvider.java |   5 +-
 .../example/ExampleApplicationProvider2.java    |   2 +-
 .../eagle/app/example/RandomEventSpout.java     |  59 -----
 .../resources/ExampleApplicationMetadata.xml    |   8 +
 .../META-INF/apps/example/metadata.xml          | 109 ++++++++++
 .../app/example/ExampleApplicationTest.java     |  28 ++-
 .../src/test/java/resources/application.conf    |  58 +++++
 .../apache/eagle/server/ServerApplication.java  |  10 +-
 .../org/apache/eagle/server/ServerConfig.java   |  22 +-
 .../src/main/resources/application.conf         |   9 +-
 .../src/main/resources/providers-disabled.xml   |  27 +++
 eagle-server/src/main/resources/providers.xml   |  27 ---
 eagle-server/src/main/webapp/WEB-INF/web.xml    |  88 ++++++++
 pom.xml                                         |  11 +
 62 files changed, 1821 insertions(+), 1013 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/AbstractApplication.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/AbstractApplication.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/AbstractApplication.java
index 1cb2625..5001a80 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/AbstractApplication.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/AbstractApplication.java
@@ -61,7 +61,7 @@ public abstract class AbstractApplication implements Application,Serializable {
         String topologyName = context.getAppEntity().getAppId();
 
         TopologyBuilder builder = new TopologyBuilder();
-        buildTopology(builder,context);
+        buildApp(builder,context);
         StormTopology topology = builder.createTopology();
         Config conf = getClusterStormConfig(context);
         if(appEntity.getMode() == ApplicationEntity.Mode.CLUSTER){
@@ -117,7 +117,7 @@ public abstract class AbstractApplication implements Application,Serializable {
         return conf;
     }
 
-    protected abstract void buildTopology(TopologyBuilder builder, ApplicationContext context);
+    protected abstract void buildApp(TopologyBuilder builder, ApplicationContext context);
 
     @Override
     public void stop(ApplicationContext context) {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/Application.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/Application.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/Application.java
index ffe5339..4d35e08 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/Application.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/Application.java
@@ -17,7 +17,7 @@
 package org.apache.eagle.app;
 
 /**
- * Application Lifecycle Interface
+ * Application Execution Interface
  */
 public interface Application {
     /**

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/ApplicationContext.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/ApplicationContext.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/ApplicationContext.java
index 1723250..dc2a456 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/ApplicationContext.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/ApplicationContext.java
@@ -18,65 +18,80 @@ package org.apache.eagle.app;
 
 import com.typesafe.config.Config;
 import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.app.sink.AbstractStreamSink;
 import org.apache.eagle.app.sink.StreamSink;
+import org.apache.eagle.app.sink.mapper.*;
+import org.apache.eagle.app.spi.ApplicationProvider;
 import org.apache.eagle.metadata.model.ApplicationEntity;
 import org.apache.eagle.metadata.model.StreamDesc;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.Serializable;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 
 /**
- * Application Execution Context
+ * Application Context Interface: org.apache.eagle.app.ApplicationContext
+ *
+ * <ul>
+ *     <li>Application Metadata Entity (Persistence): org.apache.eagle.metadata.model.ApplicationEntity</li>
+ *     <li>Application Processing Logic (Execution): org.apache.eagle.app.Application</li>
+ *     <li>Application Lifecycle Listener (Installation): org.apache.eagle.app.ApplicationLifecycleListener</li>
+ * </ul>
  */
-public class ApplicationContext implements Serializable {
+public class ApplicationContext implements Serializable, ApplicationLifecycleListener{
     private final Config envConfig;
     private final ApplicationEntity appEntity;
     private final static Logger LOG = LoggerFactory.getLogger(ApplicationContext.class);
-    private final Map<String, StreamSink> streamSinkMap;
+    private final Map<String, StreamDefinition> streamDefinitionMap;
     private final Map<String,StreamDesc> streamDescMap;
+    private final List<ApplicationLifecycleListener> applicationLifecycleListeners;
+    private final ApplicationProvider appProvider;
 
-    public ApplicationContext(ApplicationEntity appEntity, Config envConfig){
+    /**
+     * @param appEntity ApplicationEntity
+     * @param appProvider ApplicationProvider
+     * @param envConfig Config
+     */
+    public ApplicationContext(ApplicationEntity appEntity, ApplicationProvider appProvider, Config envConfig){
         this.appEntity = appEntity;
         this.envConfig = envConfig;
-        this.streamSinkMap = new HashMap<>();
+        this.appProvider = appProvider;
+        this.streamDefinitionMap = new HashMap<>();
         this.streamDescMap = new HashMap<>();
-
-        // TODO: Decouple out of ApplicationContext constructor
+        this.applicationLifecycleListeners = new LinkedList<>();
         doInit();
     }
 
+    public void registerListener(ApplicationLifecycleListener listener){
+        applicationLifecycleListeners.add(listener);
+    }
+
+    public List<ApplicationLifecycleListener> getListeners(){
+        return applicationLifecycleListeners;
+    }
+
     private void doInit() {
-        try {
-            Class<?> sinkClass = appEntity.getDescriptor().getSinkClass();
-            List<StreamDefinition> outputStreams = appEntity.getDescriptor().getStreams();
-            if(null != outputStreams){
-                Constructor constructor = sinkClass.getConstructor(StreamDefinition.class,ApplicationContext.class);
-                outputStreams.forEach((stream) -> {
-                    try {
-                        StreamSink streamSink = (StreamSink) constructor.newInstance(stream,this);
-                        streamSinkMap.put(stream.getStreamId(), streamSink);
-                        StreamDesc streamDesc = new StreamDesc();
-                        streamDesc.setStreamSchema(stream);
-                        streamDesc.setSinkContext(streamSink.getSinkContext());
-                        streamDesc.setSinkType(sinkClass);
-                        streamDesc.setStreamId(stream.getStreamId());
-                        streamDescMap.put(streamDesc.getStreamId(),streamDesc);
-                    } catch (InstantiationException | IllegalAccessException | InvocationTargetException e) {
-                        LOG.error("Failed to initialize instance "+sinkClass.getCanonicalName()+" for application: {}",this.getAppEntity());
-                        throw new RuntimeException("Failed to initialize instance "+sinkClass.getCanonicalName()+" for application:"+this.getAppEntity(),e);
-                    }
-                });
-            }
-        } catch (NoSuchMethodException e) {
-            LOG.error(e.getMessage(),e);
-            throw new RuntimeException(e.getMessage(),e.getCause());
+        Class<?> sinkClass = appEntity.getDescriptor().getSinkClass();
+        List<StreamDefinition> outputStreams = appEntity.getDescriptor().getStreams();
+        if(null != outputStreams){
+            outputStreams.forEach((stream) -> {
+                try {
+                    StreamSink streamSink = (StreamSink) sinkClass.newInstance();
+                    streamSink.init(stream,this);
+                    StreamDesc streamDesc = new StreamDesc();
+                    streamDesc.setStreamSchema(stream);
+                    streamDesc.setSinkContext(streamSink.getSinkContext());
+                    streamDesc.setSinkType(sinkClass);
+                    streamDesc.setStreamId(stream.getStreamId());
+                    streamDescMap.put(streamDesc.getStreamId(),streamDesc);
+                    streamDefinitionMap.put(streamDesc.getStreamId(),stream);
+                    registerListener(streamSink);
+                } catch (InstantiationException | IllegalAccessException e) {
+                    LOG.error("Failed to initialize instance "+sinkClass.getCanonicalName()+" for application: {}",this.getAppEntity());
+                    throw new RuntimeException("Failed to initialize instance "+sinkClass.getCanonicalName()+" for application:"+this.getAppEntity(),e);
+                }
+            });
         }
     }
 
@@ -94,15 +109,133 @@ public class ApplicationContext implements Serializable {
      * @param streamId
      * @return
      */
-    public StreamSink getStreamSink(String streamId){
-        if(streamSinkMap.containsKey(streamId)) {
-            return streamSinkMap.get(streamId);
-        } else {
-            throw new IllegalStateException("Stream (ID: "+streamId+") was not declared in "+this.appEntity.getDescriptor().getProviderClass().getCanonicalName()+" before being called");
+    public StreamSink getFlattenStreamSink(String streamId, StreamEventMapper mapper){
+        checkStreamExists(streamId);
+        Class<?> sinkClass = appEntity.getDescriptor().getSinkClass();
+        try {
+            AbstractStreamSink abstractStreamSink = (AbstractStreamSink) sinkClass.newInstance();
+            abstractStreamSink.setEventMapper(mapper);
+            abstractStreamSink.init(streamDefinitionMap.get(streamId),this);
+            return abstractStreamSink;
+        } catch (InstantiationException | IllegalAccessException e) {
+            LOG.error("Failed to instantiate "+sinkClass,e);
+            throw new IllegalStateException("Failed to instantiate "+sinkClass,e);
+        }
+    }
+
+    /**
+     * Make sure streamId is declared in Application Providers
+     *
+     * @param streamId
+     * @return
+     */
+    public StreamSink getDirectStreamSink(String streamId, String ... fieldNames){
+        return getFlattenStreamSink(streamId,new FieldNameDirectEventMapper(fieldNames));
+    }
+
+    /**
+     * Make sure streamId is declared in Application Providers
+     *
+     * @param streamId
+     * @return
+     */
+    public StreamSink getDirectStreamSink(String streamId, int ... fieldIndexs){
+        return getFlattenStreamSink(streamId,new FieldIndexDirectEventMapper(fieldIndexs));
+    }
+
+    /**
+     * Make sure streamId is declared in Application Providers
+     *
+     * @param streamId
+     * @return
+     */
+    public StreamSink getFlattenStreamSink(String streamId, TimestampSelector timestampSelector){
+        checkStreamExists(streamId);
+        return getFlattenStreamSink(streamId,new FlattenEventMapper(streamDefinitionMap.get(streamId),timestampSelector));
+    }
+
+    /**
+     * Make sure streamId is declared in Application Providers
+     *
+     * @param streamId
+     * @return
+     */
+    public StreamSink getFlattenStreamSink(String streamId, String timestampField){
+        checkStreamExists(streamId);
+        return getFlattenStreamSink(streamId,new FlattenEventMapper(streamDefinitionMap.get(streamId),timestampField));
+    }
+
+    /**
+     * Make sure streamId is declared in Application Providers
+     *
+     * @param streamId
+     * @return
+     */
+    public StreamSink getFlattenStreamSink(String streamId){
+        checkStreamExists(streamId);
+        return getFlattenStreamSink(streamId,new FlattenEventMapper(streamDefinitionMap.get(streamId)));
+    }
+
+    private void checkStreamExists(String streamId){
+        if(! streamDefinitionMap.containsKey(streamId)){
+            LOG.error("Stream [streamId = "+streamId+"] is not defined in "
+                    + appEntity.getDescriptor().getProviderClass().getCanonicalName());
+            throw new IllegalStateException("Stream [streamId = "+streamId+"] is not defined in "
+                    + appEntity.getDescriptor().getProviderClass().getCanonicalName());
         }
     }
 
     public Collection<StreamDesc> getStreamSinkDescs(){
         return streamDescMap.values();
     }
+
+    @Override
+    public void onAppInstall() {
+        getListeners().forEach((listener)->{
+            try {
+                listener.onAppInstall();
+            }catch (Throwable throwable){
+                LOG.error("Failed to invoked onAppInstall of listener {}",listener.toString(),throwable);
+                throw throwable;
+            }
+        });
+    }
+
+    @Override
+    public void onAppUninstall() {
+        getListeners().forEach((listener)->{
+            try {
+                listener.onAppUninstall();
+            }catch (Throwable throwable){
+                LOG.error("Failed to invoked onAppUninstall of listener {}",listener.toString(),throwable);
+                throw throwable;
+            }
+        });
+    }
+
+    @Override
+    public void onAppStart() {
+        getListeners().forEach((listener)->{
+            try {
+                listener.onAppStart();
+            }catch (Throwable throwable){
+                LOG.error("Failed to invoked onAppStart of listener {}",listener.toString(),throwable);
+                throw throwable;
+            }
+        });
+        appProvider.getApplication().start(this);
+    }
+
+    @Override
+    public void onAppStop() {
+        appProvider.getApplication().stop(this);
+        getListeners().forEach((listener)->{
+            try {
+                listener.onAppStop();
+            }catch (Throwable throwable){
+                LOG.error("Failed to invoked onAppStop of listener {}",listener.toString(),throwable);
+                throw throwable;
+            }
+        });
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/ApplicationGuiceModule.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/ApplicationGuiceModule.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/ApplicationGuiceModule.java
index 9a66b6c..ca3b8f2 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/ApplicationGuiceModule.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/ApplicationGuiceModule.java
@@ -19,9 +19,9 @@ package org.apache.eagle.app;
 import com.google.inject.AbstractModule;
 import com.google.inject.Singleton;
 import org.apache.eagle.app.service.ApplicationManagementService;
-import org.apache.eagle.app.service.ApplicationManagementServiceImpl;
+import org.apache.eagle.app.service.impl.ApplicationManagementServiceImpl;
 import org.apache.eagle.app.service.ApplicationProviderService;
-import org.apache.eagle.app.service.ApplicationProviderServiceImpl;
+import org.apache.eagle.app.service.impl.ApplicationProviderServiceImpl;
 import org.apache.eagle.metadata.service.ApplicationDescService;
 
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/ApplicationLifecycleListener.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/ApplicationLifecycleListener.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/ApplicationLifecycleListener.java
new file mode 100644
index 0000000..6ced960
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/ApplicationLifecycleListener.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app;
+
+public interface ApplicationLifecycleListener {
+    /**
+     * on application installed
+     */
+    void onAppInstall();
+
+    /**
+     * on application uninstalled
+     */
+    void onAppUninstall();
+
+    /**
+     *
+     */
+    void onAppStart();
+
+    /**
+     *
+     */
+    void onAppStop();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/package-info.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/package-info.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/package-info.java
new file mode 100644
index 0000000..8f1fcb9
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/package-info.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ *
+ * <h1>Application Management Framework Interfaces</h1>
+ *
+ * <ul>
+ *     <li>Application Context (Runtime): org.apache.eagle.app.ApplicationContext</li>
+ *     <li>Application Metadata Entity (Persistence): org.apache.eagle.metadata.model.ApplicationEntity</li>
+ *     <li>Application Processing Logic (Execution): org.apache.eagle.app.Application</li>
+ *     <li>Application Lifecycle Listener (Callback): org.apache.eagle.app.ApplicationLifecycleListener</li>
+ * </ul>
+ */
+package org.apache.eagle.app;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/resource/ApplicationResource.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/resource/ApplicationResource.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/resource/ApplicationResource.java
index a0654ff..b0e9988 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/resource/ApplicationResource.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/resource/ApplicationResource.java
@@ -19,7 +19,7 @@ package org.apache.eagle.app.resource;
 
 import com.google.inject.Inject;
 import org.apache.eagle.app.service.ApplicationManagementService;
-import org.apache.eagle.app.service.AppOperations;
+import org.apache.eagle.app.service.ApplicationOperations;
 import org.apache.eagle.app.service.ApplicationProviderService;
 import org.apache.eagle.metadata.model.ApplicationDesc;
 import org.apache.eagle.metadata.model.ApplicationEntity;
@@ -59,7 +59,7 @@ public class ApplicationResource {
         return providerService.getApplicationDescByType(type);
     }
 
-    @POST
+    @PUT
     @Path("/providers/reload")
     @Produces(MediaType.APPLICATION_JSON)
     public Collection<ApplicationDesc> reloadApplicationDescs(){
@@ -97,7 +97,7 @@ public class ApplicationResource {
     @Path("/install")
     @Consumes(MediaType.APPLICATION_JSON)
     @Produces(MediaType.APPLICATION_JSON)
-    public ApplicationEntity installApplication(AppOperations.InstallOperation operation){
+    public ApplicationEntity installApplication(ApplicationOperations.InstallOperation operation){
         return applicationManagementService.install(operation);
     }
 
@@ -111,11 +111,11 @@ public class ApplicationResource {
      *
      * @param operation
      */
-    @POST
+    @DELETE
     @Path("/uninstall")
     @Consumes(MediaType.APPLICATION_JSON)
     @Produces(MediaType.APPLICATION_JSON)
-    public ApplicationEntity uninstallApplication(AppOperations.UninstallOperation operation){
+    public ApplicationEntity uninstallApplication(ApplicationOperations.UninstallOperation operation){
         return applicationManagementService.uninstall(operation);
     }
 
@@ -132,7 +132,7 @@ public class ApplicationResource {
     @Path("/start")
     @Consumes(MediaType.APPLICATION_JSON)
     @Produces(MediaType.APPLICATION_JSON)
-    public ApplicationEntity startApplication(AppOperations.StartOperation operation){
+    public ApplicationEntity startApplication(ApplicationOperations.StartOperation operation){
         return applicationManagementService.start(operation);
     }
 
@@ -149,7 +149,7 @@ public class ApplicationResource {
     @Path("/stop")
     @Consumes(MediaType.APPLICATION_JSON)
     @Produces(MediaType.APPLICATION_JSON)
-    public ApplicationEntity stopApplication(AppOperations.StopOperation operation){
+    public ApplicationEntity stopApplication(ApplicationOperations.StopOperation operation){
         return applicationManagementService.stop(operation);
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/AppOperations.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/AppOperations.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/AppOperations.java
deleted file mode 100644
index f2395d6..0000000
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/AppOperations.java
+++ /dev/null
@@ -1,161 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.service;
-
-import org.apache.eagle.metadata.model.ApplicationEntity;
-
-import java.io.Serializable;
-import java.util.Map;
-
-public class AppOperations {
-    interface Operation extends Serializable {
-        //
-    }
-
-    public static class InstallOperation implements Operation{
-        private String siteId;
-        private String appType;
-        private ApplicationEntity.Mode mode = ApplicationEntity.Mode.LOCAL;
-        private Map<String,Object> configuration;
-
-        public InstallOperation(){}
-        public InstallOperation(String siteId,String appType){
-            this.setSiteId(siteId);
-            this.setAppType(appType);
-        }
-        public InstallOperation(String siteId,String appType,ApplicationEntity.Mode mode){
-            this.setSiteId(siteId);
-            this.setAppType(appType);
-            this.setMode(mode);
-        }
-        public InstallOperation(String siteId,String appType,ApplicationEntity.Mode mode,Map<String,Object> configuration){
-            this.setSiteId(siteId);
-            this.setAppType(appType);
-            this.setMode(mode);
-            this.setConfiguration(configuration);
-        }
-
-        public String getSiteId() {
-           return siteId;
-       }
-        public void setSiteId(String siteId) {
-           this.siteId = siteId;
-       }
-        public String getAppType() {
-           return appType;
-       }
-        public void setAppType(String appType) {
-           this.appType = appType;
-       }
-
-        public Map<String, Object> getConfiguration() {
-            return configuration;
-        }
-
-        public void setConfiguration(Map<String, Object> configuration) {
-            this.configuration = configuration;
-        }
-
-        public ApplicationEntity.Mode getMode() {
-            return mode;
-        }
-
-        public void setMode(ApplicationEntity.Mode mode) {
-            this.mode = mode;
-        }
-    }
-
-    public static class UninstallOperation implements Operation{
-        private String uuid;
-        private String appId;
-        public UninstallOperation(String uuid){
-            this.setUuid(uuid);
-        }
-        public UninstallOperation(String uuid,String appId){
-            this.setUuid(uuid);
-            this.setAppId(appId);
-        }
-
-        public String getUuid() {
-            return uuid;
-        }
-        public void setUuid(String uuid) {
-            this.uuid = uuid;
-        }
-
-        public String getAppId() {
-            return appId;
-        }
-
-        public void setAppId(String appId) {
-            this.appId = appId;
-        }
-    }
-
-    public static class StartOperation implements Operation{
-        private String uuid;
-        private String appId;
-        public StartOperation(String uuid){
-            this.setUuid(uuid);
-        }
-        public StartOperation(String uuid,String appId){
-            this.setUuid(uuid);
-            this.setAppId(appId);
-        }
-        public String getUuid() {
-            return uuid;
-        }
-        public void setUuid(String uuid) {
-            this.uuid = uuid;
-        }
-
-        public String getAppId() {
-            return appId;
-        }
-
-        public void setAppId(String appId) {
-            this.appId = appId;
-        }
-    }
-
-    public static class StopOperation implements Operation{
-        private String uuid;
-        private String appId;
-
-        public StopOperation(String uuid){
-            this.setUuid(uuid);
-        }
-        public StopOperation(String uuid,String appId){
-            this.setUuid(uuid);
-            this.setAppId(appId);
-        }
-        public String getUuid() {
-            return uuid;
-        }
-        public void setUuid(String uuid) {
-            this.uuid = uuid;
-        }
-
-        public String getAppId() {
-            return appId;
-        }
-
-        public void setAppId(String appId) {
-            this.appId = appId;
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationManagementService.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationManagementService.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationManagementService.java
index 965a3fa..30ae0b9 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationManagementService.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationManagementService.java
@@ -24,26 +24,26 @@ public interface ApplicationManagementService {
      * @param operation
      * @return
      */
-    ApplicationEntity install(AppOperations.InstallOperation operation);
+    ApplicationEntity install(ApplicationOperations.InstallOperation operation);
 
     /**
      *
      * @param operation
      * @return
      */
-    ApplicationEntity uninstall(AppOperations.UninstallOperation operation);
+    ApplicationEntity uninstall(ApplicationOperations.UninstallOperation operation);
 
     /**
      *
      * @param operation
      * @return
      */
-    ApplicationEntity start(AppOperations.StartOperation operation);
+    ApplicationEntity start(ApplicationOperations.StartOperation operation);
 
     /**
      *
      * @param operation
      * @return
      */
-    ApplicationEntity stop(AppOperations.StopOperation operation);
+    ApplicationEntity stop(ApplicationOperations.StopOperation operation);
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationManagementServiceImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationManagementServiceImpl.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationManagementServiceImpl.java
deleted file mode 100644
index 566fb62..0000000
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationManagementServiceImpl.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.service;
-
-import com.google.common.base.Preconditions;
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-import com.typesafe.config.Config;
-import org.apache.eagle.app.Application;
-import org.apache.eagle.app.ApplicationContext;
-import org.apache.eagle.metadata.model.ApplicationDesc;
-import org.apache.eagle.metadata.model.ApplicationEntity;
-import org.apache.eagle.metadata.model.SiteEntity;
-import org.apache.eagle.metadata.service.ApplicationEntityService;
-import org.apache.eagle.metadata.service.SiteEntityService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@Singleton
-public class ApplicationManagementServiceImpl implements ApplicationManagementService {
-    private final SiteEntityService siteEntityService;
-    private final ApplicationProviderService applicationProviderService;
-    private final ApplicationEntityService applicationEntityService;
-    private final Config config;
-    private final static Logger LOGGER = LoggerFactory.getLogger(ApplicationManagementServiceImpl.class);
-
-    @Inject
-    public ApplicationManagementServiceImpl(
-            Config config,
-            SiteEntityService siteEntityService,
-            ApplicationProviderService applicationProviderService,
-            ApplicationEntityService applicationEntityService){
-        this.config = config;
-        this.siteEntityService = siteEntityService;
-        this.applicationProviderService = applicationProviderService;
-        this.applicationEntityService = applicationEntityService;
-    }
-
-    public ApplicationEntity install(AppOperations.InstallOperation operation) {
-        Preconditions.checkNotNull(operation.getSiteId(),"siteId is null");
-        Preconditions.checkNotNull(operation.getAppType(),"appType is null");
-        SiteEntity siteEntity = siteEntityService.getBySiteId(operation.getSiteId());
-        Preconditions.checkNotNull(siteEntity,"Site with ID: "+operation.getSiteId()+" is not found");
-        ApplicationDesc appDesc = applicationProviderService.getApplicationDescByType(operation.getAppType());
-        Preconditions.checkNotNull("Application with TYPE: "+operation.getAppType()+" is not found");
-        ApplicationEntity applicationEntity = new ApplicationEntity();
-        applicationEntity.setDescriptor(appDesc);
-        applicationEntity.setSite(siteEntity);
-        applicationEntity.setConfiguration(operation.getConfiguration());
-        applicationEntity.setMode(operation.getMode());
-        ApplicationContext applicationContext = new ApplicationContext(applicationEntity,config);
-        applicationEntity.setStreams(applicationContext.getStreamSinkDescs());
-        applicationEntityService.create(applicationEntity);
-        return applicationEntity;
-    }
-
-    public ApplicationEntity uninstall(AppOperations.UninstallOperation operation) {
-        ApplicationEntity applicationEntity = applicationEntityService.getByUUIDOrAppId(operation.getUuid(),operation.getAppId());
-        Application application = applicationProviderService.getApplicationProviderByType(applicationEntity.getDescriptor().getType()).getApplication();
-        // TODO: Check status, skip stop if already STOPPED
-        try {
-            application.stop(new ApplicationContext(applicationEntity, this.config));
-        }catch (Throwable throwable){
-            LOGGER.error(throwable.getMessage(),throwable);
-        }
-        return applicationEntityService.delete(applicationEntity);
-    }
-
-    public ApplicationEntity start(AppOperations.StartOperation operation) {
-        ApplicationEntity applicationEntity = applicationEntityService.getByUUIDOrAppId(operation.getUuid(),operation.getAppId());
-        Application application = applicationProviderService.getApplicationProviderByType(applicationEntity.getDescriptor().getType()).getApplication();
-        application.start(new ApplicationContext(applicationEntity,this.config));
-        return applicationEntity;
-    }
-
-    public ApplicationEntity stop(AppOperations.StopOperation operation) {
-        ApplicationEntity applicationEntity = applicationEntityService.getByUUIDOrAppId(operation.getUuid(),operation.getAppId());
-        Application application = applicationProviderService.getApplicationProviderByType(applicationEntity.getDescriptor().getType()).getApplication();
-        application.stop(new ApplicationContext(applicationEntity,this.config));
-        return applicationEntity;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationOperations.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationOperations.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationOperations.java
new file mode 100644
index 0000000..792b7d4
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationOperations.java
@@ -0,0 +1,189 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.service;
+
+import org.apache.eagle.metadata.model.ApplicationEntity;
+
+import java.io.Serializable;
+import java.util.Map;
+
+public final class ApplicationOperations {
+    interface Operation extends Serializable {
+        String getType();
+    }
+
+    private final static String INSTALL = "INSTALL";
+    private final static String UNINSTALL = "UNINSTALL";
+    private final static String START = "START";
+    private final static String STOP = "STOP";
+
+    public static class InstallOperation implements Operation{
+        private String siteId;
+        private String appType;
+        private ApplicationEntity.Mode mode = ApplicationEntity.Mode.LOCAL;
+        private Map<String,Object> configuration;
+
+        public InstallOperation(){}
+        public InstallOperation(String siteId,String appType){
+            this.setSiteId(siteId);
+            this.setAppType(appType);
+        }
+        public InstallOperation(String siteId,String appType,ApplicationEntity.Mode mode){
+            this.setSiteId(siteId);
+            this.setAppType(appType);
+            this.setMode(mode);
+        }
+        public InstallOperation(String siteId,String appType,ApplicationEntity.Mode mode,Map<String,Object> configuration){
+            this.setSiteId(siteId);
+            this.setAppType(appType);
+            this.setMode(mode);
+            this.setConfiguration(configuration);
+        }
+
+        public String getSiteId() {
+           return siteId;
+       }
+        public void setSiteId(String siteId) {
+           this.siteId = siteId;
+       }
+        public String getAppType() {
+           return appType;
+       }
+        public void setAppType(String appType) {
+           this.appType = appType;
+       }
+
+        public Map<String, Object> getConfiguration() {
+            return configuration;
+        }
+
+        public void setConfiguration(Map<String, Object> configuration) {
+            this.configuration = configuration;
+        }
+
+        public ApplicationEntity.Mode getMode() {
+            return mode;
+        }
+
+        public void setMode(ApplicationEntity.Mode mode) {
+            this.mode = mode;
+        }
+
+        @Override
+        public String getType() {
+            return INSTALL;
+        }
+    }
+
+    public static class UninstallOperation implements Operation{
+        private String uuid;
+        private String appId;
+        public UninstallOperation(){}
+        public UninstallOperation(String uuid){
+            this.setUuid(uuid);
+        }
+        public UninstallOperation(String uuid,String appId){
+            this.setUuid(uuid);
+            this.setAppId(appId);
+        }
+
+        public String getUuid() {
+            return uuid;
+        }
+        public void setUuid(String uuid) {
+            this.uuid = uuid;
+        }
+
+        public String getAppId() {
+            return appId;
+        }
+
+        public void setAppId(String appId) {
+            this.appId = appId;
+        }
+
+        @Override
+        public String getType() {
+            return UNINSTALL;
+        }
+    }
+
+    public static class StartOperation implements Operation{
+        private String uuid;
+        private String appId;
+        public StartOperation(){}
+        public StartOperation(String uuid){
+            this.setUuid(uuid);
+        }
+        public StartOperation(String uuid,String appId){
+            this.setUuid(uuid);
+            this.setAppId(appId);
+        }
+        public String getUuid() {
+            return uuid;
+        }
+        public void setUuid(String uuid) {
+            this.uuid = uuid;
+        }
+
+        public String getAppId() {
+            return appId;
+        }
+
+        public void setAppId(String appId) {
+            this.appId = appId;
+        }
+
+        @Override
+        public String getType() {
+            return START;
+        }
+    }
+
+    public static class StopOperation implements Operation{
+        private String uuid;
+        private String appId;
+
+        public StopOperation(){}
+        public StopOperation(String uuid){
+            this.setUuid(uuid);
+        }
+        public StopOperation(String uuid,String appId){
+            this.setUuid(uuid);
+            this.setAppId(appId);
+        }
+        public String getUuid() {
+            return uuid;
+        }
+        public void setUuid(String uuid) {
+            this.uuid = uuid;
+        }
+
+        public String getAppId() {
+            return appId;
+        }
+
+        public void setAppId(String appId) {
+            this.appId = appId;
+        }
+
+        @Override
+        public String getType() {
+            return STOP;
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationProviderLoader.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationProviderLoader.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationProviderLoader.java
index 8b98404..85f0709 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationProviderLoader.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationProviderLoader.java
@@ -17,8 +17,8 @@
 package org.apache.eagle.app.service;
 
 import com.typesafe.config.Config;
-import org.apache.eagle.app.service.loader.ApplicationProviderConfigLoader;
-import org.apache.eagle.app.service.loader.ApplicationProviderSPILoader;
+import org.apache.eagle.app.service.impl.ApplicationProviderConfigLoader;
+import org.apache.eagle.app.service.impl.ApplicationProviderSPILoader;
 import org.apache.eagle.app.spi.ApplicationProvider;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationProviderServiceImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationProviderServiceImpl.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationProviderServiceImpl.java
deleted file mode 100644
index 6ce463f..0000000
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationProviderServiceImpl.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.service;
-
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-import com.typesafe.config.Config;
-import org.apache.eagle.app.spi.ApplicationProvider;
-import org.apache.eagle.metadata.model.ApplicationDesc;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.*;
-import java.util.stream.Collectors;
-
-/**
- * Support to load application provider from application.provider.config = "providers.xml" configuration file
- * or application.provider.dir = "lib/apps" with SPI Class loader
- *
- * TODO: hot-manage application provider loading
- */
-@Singleton
-public class ApplicationProviderServiceImpl implements ApplicationProviderService {
-    private final Config config;
-    private final static Logger LOG = LoggerFactory.getLogger(ApplicationProviderServiceImpl.class);
-    private final ApplicationProviderLoader appProviderLoader;
-    public final static String APP_PROVIDER_LOADER_CLASS_KEY = "application.provider.loader";
-
-    @Inject
-    public ApplicationProviderServiceImpl(Config config){
-        LOG.info("Initializing {}",this.getClass().getCanonicalName());
-        this.config = config;
-        String appProviderLoaderClass = this.config.hasPath(APP_PROVIDER_LOADER_CLASS_KEY)?
-                this.config.getString(APP_PROVIDER_LOADER_CLASS_KEY):ApplicationProviderLoader.getDefaultAppProviderLoader();
-        LOG.info("Initializing {} = {}",APP_PROVIDER_LOADER_CLASS_KEY,appProviderLoaderClass);
-        appProviderLoader = initializeAppProviderLoader(appProviderLoaderClass);
-        LOG.info("Initialized {}",appProviderLoader);
-        reload();
-    }
-
-    private ApplicationProviderLoader initializeAppProviderLoader(String appProviderLoaderClass){
-        try {
-            return (ApplicationProviderLoader) Class.forName(appProviderLoaderClass).getConstructor(Config.class).newInstance(this.config);
-        } catch (Throwable e) {
-            LOG.error("Failed to initialize ApplicationProviderLoader: "+appProviderLoaderClass,e);
-            throw new IllegalStateException("Failed to initialize ApplicationProviderLoader: "+appProviderLoaderClass,e);
-        }
-    }
-
-    public synchronized void reload(){
-        appProviderLoader.reset();
-        LOG.info("Loading application providers ...");
-        appProviderLoader.load();
-        LOG.info("Loaded {} application providers",appProviderLoader.getProviders().size());
-    }
-
-    public Collection<ApplicationProvider> getProviders(){
-        return appProviderLoader.getProviders();
-    }
-
-    public Collection<ApplicationDesc> getApplicationDescs(){
-        return getProviders().stream().map(ApplicationProvider::getApplicationDesc).collect(Collectors.toList());
-    }
-
-    public ApplicationProvider<?> getApplicationProviderByType(String type) {
-        return appProviderLoader.getApplicationProviderByType(type);
-    }
-
-    @Deprecated
-    public ApplicationDesc getApplicationDescByType(String appType) {
-        return appProviderLoader.getApplicationProviderByType(appType).getApplicationDesc();
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationManagementServiceImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationManagementServiceImpl.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationManagementServiceImpl.java
new file mode 100644
index 0000000..9e34459
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationManagementServiceImpl.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.service.impl;
+
+import com.google.common.base.Preconditions;
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.typesafe.config.Config;
+import org.apache.eagle.app.ApplicationContext;
+import org.apache.eagle.app.service.ApplicationOperations;
+import org.apache.eagle.app.service.ApplicationManagementService;
+import org.apache.eagle.app.service.ApplicationProviderService;
+import org.apache.eagle.metadata.model.ApplicationDesc;
+import org.apache.eagle.metadata.model.ApplicationEntity;
+import org.apache.eagle.metadata.model.SiteEntity;
+import org.apache.eagle.metadata.service.ApplicationEntityService;
+import org.apache.eagle.metadata.service.SiteEntityService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Singleton
+public class ApplicationManagementServiceImpl implements ApplicationManagementService {
+    private final SiteEntityService siteEntityService;
+    private final ApplicationProviderService applicationProviderService;
+    private final ApplicationEntityService applicationEntityService;
+    private final Config config;
+    private final static Logger LOGGER = LoggerFactory.getLogger(ApplicationManagementServiceImpl.class);
+
+    @Inject
+    public ApplicationManagementServiceImpl(
+            Config config,
+            SiteEntityService siteEntityService,
+            ApplicationProviderService applicationProviderService,
+            ApplicationEntityService applicationEntityService){
+        this.config = config;
+        this.siteEntityService = siteEntityService;
+        this.applicationProviderService = applicationProviderService;
+        this.applicationEntityService = applicationEntityService;
+    }
+
+    public ApplicationEntity install(ApplicationOperations.InstallOperation operation) {
+        Preconditions.checkNotNull(operation.getSiteId(),"siteId is null");
+        Preconditions.checkNotNull(operation.getAppType(),"appType is null");
+        SiteEntity siteEntity = siteEntityService.getBySiteId(operation.getSiteId());
+        Preconditions.checkNotNull(siteEntity,"Site with ID: "+operation.getSiteId()+" is not found");
+        ApplicationDesc appDesc = applicationProviderService.getApplicationDescByType(operation.getAppType());
+        Preconditions.checkNotNull("Application with TYPE: "+operation.getAppType()+" is not found");
+        ApplicationEntity applicationEntity = new ApplicationEntity();
+        applicationEntity.setDescriptor(appDesc);
+        applicationEntity.setSite(siteEntity);
+        applicationEntity.setConfiguration(operation.getConfiguration());
+        applicationEntity.setMode(operation.getMode());
+        ApplicationContext applicationContext = new ApplicationContext(applicationEntity,applicationProviderService.getApplicationProviderByType(applicationEntity.getDescriptor().getType()),config);
+        applicationEntity.setStreams(applicationContext.getStreamSinkDescs());
+        applicationContext.onAppInstall();
+        applicationEntityService.create(applicationEntity);
+        return applicationEntity;
+    }
+
+    public ApplicationEntity uninstall(ApplicationOperations.UninstallOperation operation) {
+        ApplicationEntity applicationEntity = applicationEntityService.getByUUIDOrAppId(operation.getUuid(),operation.getAppId());
+        ApplicationContext applicationContext = new ApplicationContext(applicationEntity,applicationProviderService.getApplicationProviderByType(applicationEntity.getDescriptor().getType()),config);
+        // TODO: Check status, skip stop if already STOPPED
+        try {
+            applicationContext.onAppStop();
+        }catch (Throwable throwable){
+            LOGGER.error(throwable.getMessage(),throwable);
+        }
+        applicationContext.onAppUninstall();
+        return applicationEntityService.delete(applicationEntity);
+    }
+
+    public ApplicationEntity start(ApplicationOperations.StartOperation operation) {
+        ApplicationEntity applicationEntity = applicationEntityService.getByUUIDOrAppId(operation.getUuid(),operation.getAppId());
+        new ApplicationContext(applicationEntity,applicationProviderService.getApplicationProviderByType(applicationEntity.getDescriptor().getType()),this.config).onAppStart();
+        return applicationEntity;
+    }
+
+    public ApplicationEntity stop(ApplicationOperations.StopOperation operation) {
+        ApplicationEntity applicationEntity = applicationEntityService.getByUUIDOrAppId(operation.getUuid(),operation.getAppId());
+        new ApplicationContext(applicationEntity,applicationProviderService.getApplicationProviderByType(applicationEntity.getDescriptor().getType()),this.config).onAppStop();
+        return applicationEntity;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationProviderConfigLoader.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationProviderConfigLoader.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationProviderConfigLoader.java
new file mode 100644
index 0000000..3b2bca9
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationProviderConfigLoader.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.service.impl;
+
+import com.google.common.base.Preconditions;
+import com.typesafe.config.Config;
+import org.apache.eagle.app.config.ApplicationProviderConfig;
+import org.apache.eagle.app.config.ApplicationProvidersConfig;
+import org.apache.eagle.app.service.ApplicationProviderLoader;
+import org.apache.eagle.app.spi.ApplicationProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.Unmarshaller;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+
+public class ApplicationProviderConfigLoader extends ApplicationProviderLoader {
+    public final static String DEFAULT_APPLICATIONS_CONFIG_FILE = "providers.xml";
+    private final static String APPLICATIONS_CONFIG_PROPS_KEY = "application.provider.config";
+    private final static Logger LOG = LoggerFactory.getLogger(ApplicationProviderConfigLoader.class);
+    public ApplicationProviderConfigLoader(Config config) {
+        super(config);
+    }
+
+    @Override
+    public void load() {
+        List<ApplicationProviderConfig> applicationProviderConfigs = loadProvidersFromProvidersConf();
+        int totalCount = applicationProviderConfigs.size();
+        int loadedCount = 0,failedCount = 0;
+        for(ApplicationProviderConfig providerConfig: applicationProviderConfigs){
+            try {
+                initializeProvider(providerConfig);
+                loadedCount ++;
+            }catch (Throwable ex){
+                LOG.warn("Failed to initialized {}, ignored",providerConfig,ex);
+                failedCount ++;
+            }
+        }
+        LOG.info("Loaded {} app providers (total: {}, failed: {})",loadedCount,totalCount,failedCount);
+    }
+
+    public static boolean appProviderConfExists(String applicationConfFile){
+        InputStream is = ApplicationProviderConfigLoader.class.getResourceAsStream(applicationConfFile);
+        if(is == null){
+            is = ApplicationProviderConfigLoader.class.getResourceAsStream("/"+applicationConfFile);
+        }
+
+        if(is != null){
+            try {
+                return true;
+            } finally {
+                try {
+                    is.close();
+                } catch (IOException e) {
+                    LOG.debug(e.getMessage());
+                }
+            }
+        } else {
+            return false;
+        }
+    }
+
+    private void initializeProvider(ApplicationProviderConfig providerConfig) throws ClassNotFoundException, IllegalAccessException, InstantiationException {
+        LOG.info("Loading application provider {} from {}",providerConfig.getClassName(),providerConfig.getJarPath());
+        String providerClassName = providerConfig.getClassName();
+        if(providerClassName == null) throw new RuntimeException("provider.classname is null: "+providerConfig);
+        if(providerConfig.getJarPath() == null) throw new RuntimeException("provider.jarpath is null: "+providerConfig);
+
+        Class<?> providerClass = Class.forName(providerClassName);
+
+        if(!ApplicationProvider.class.isAssignableFrom(providerClass)){
+            throw new RuntimeException("providerClassName is not implementation of "+ApplicationProvider.class.getCanonicalName());
+        }
+        ApplicationProvider provider = (ApplicationProvider) providerClass.newInstance();
+        provider.prepare(providerConfig,this.getConfig());
+        Preconditions.checkNotNull(provider.getApplicationDesc(),"appDesc is null");
+        Preconditions.checkNotNull(provider.getApplicationDesc().getType(),"type is null");
+        registerProvider(provider);
+    }
+
+    private List<ApplicationProviderConfig> loadProvidersFromProvidersConf() {
+        String providerConfigFile = DEFAULT_APPLICATIONS_CONFIG_FILE;
+        if(getConfig().hasPath(APPLICATIONS_CONFIG_PROPS_KEY)){
+            providerConfigFile = getConfig().getString(APPLICATIONS_CONFIG_PROPS_KEY);
+            LOG.info("Set {} = {}",APPLICATIONS_CONFIG_PROPS_KEY,providerConfigFile);
+        }
+        InputStream is = null;
+        try {
+            JAXBContext jc = JAXBContext.newInstance(ApplicationProvidersConfig.class);
+            Unmarshaller unmarshaller = jc.createUnmarshaller();
+            is = ApplicationProviderConfigLoader.class.getResourceAsStream(providerConfigFile);
+            if(is == null){
+                is = ApplicationProviderConfigLoader.class.getResourceAsStream("/"+providerConfigFile);
+            }
+            if(is == null){
+                LOG.error("Application provider configuration {} is not found",providerConfigFile);
+            }
+            Preconditions.checkNotNull(is,providerConfigFile+" is not found");
+            return ((ApplicationProvidersConfig) unmarshaller.unmarshal(is)).getProviders();
+        }catch (Exception ex){
+            LOG.error("Failed to load application provider configuration: {}",providerConfigFile,ex);
+            throw new RuntimeException("Failed to load application provider configuration: "+providerConfigFile,ex);
+        } finally {
+            if(is != null) try {
+                is.close();
+            } catch (IOException e) {
+                LOG.error(e.getMessage(),e);
+            }
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationProviderSPILoader.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationProviderSPILoader.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationProviderSPILoader.java
new file mode 100644
index 0000000..3304286
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationProviderSPILoader.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.service.impl;
+
+import com.typesafe.config.Config;
+import org.apache.eagle.app.config.ApplicationProviderConfig;
+import org.apache.eagle.app.service.ApplicationProviderLoader;
+import org.apache.eagle.app.spi.ApplicationProvider;
+import org.apache.eagle.app.tools.DynamicJarPathFinder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.ServiceLoader;
+import java.util.function.Function;
+
+public class ApplicationProviderSPILoader extends ApplicationProviderLoader{
+    private final String appProviderExtDir;
+    private final static Logger LOG = LoggerFactory.getLogger(ApplicationProviderSPILoader.class);
+    private final static String APPLICATIONS_DIR_PROPS_KEY = "application.provider.dir";
+
+    public ApplicationProviderSPILoader(Config config) {
+        super(config);
+        if(config.hasPath(APPLICATIONS_DIR_PROPS_KEY)) {
+            this.appProviderExtDir = config.getString(APPLICATIONS_DIR_PROPS_KEY);
+        }else{
+            this.appProviderExtDir = null;
+        }
+
+        LOG.info("Using {}: {}",APPLICATIONS_DIR_PROPS_KEY,this.appProviderExtDir);
+
+    }
+
+    @Override
+    public void load() {
+        if(appProviderExtDir != null) {
+            LOG.info("Loading application providers from class loader of jars in {}", appProviderExtDir);
+            File loc = new File(appProviderExtDir);
+            File[] jarFiles = loc.listFiles(file -> file.getPath().toLowerCase().endsWith(".jar"));
+            if (jarFiles != null) {
+                for (File jarFile : jarFiles) {
+                    try {
+                        URL jarFileUrl = jarFile.toURI().toURL();
+                        LOG.debug("Loading ApplicationProvider from jar: {}", jarFileUrl.toString());
+                        URLClassLoader jarFileClassLoader = new URLClassLoader(new URL[]{jarFileUrl});
+                        loadProviderFromClassLoader(jarFileClassLoader, (applicationProviderConfig) -> jarFileUrl.getPath());
+                    } catch (Exception e) {
+                        LOG.warn("Failed to load application provider from jar {}", jarFile,e);
+                    }
+                }
+            }
+        } else {
+            LOG.info("Loading application providers from context class loader");
+            ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
+            loadProviderFromClassLoader(classLoader,(applicationProviderConfig) -> DynamicJarPathFinder.findPath(applicationProviderConfig.getClass()));
+        }
+    }
+
+    private void loadProviderFromClassLoader(ClassLoader jarFileClassLoader, Function<ApplicationProviderConfig,String> jarFileSupplier){
+        ServiceLoader<ApplicationProvider> serviceLoader = ServiceLoader.load(ApplicationProvider.class, jarFileClassLoader);
+        for (ApplicationProvider applicationProvider : serviceLoader) {
+            try {
+                ApplicationProviderConfig providerConfig = new ApplicationProviderConfig();
+                providerConfig.setClassName(applicationProvider.getClass().getCanonicalName());
+                providerConfig.setJarPath(jarFileSupplier.apply(providerConfig));
+                applicationProvider.prepare(providerConfig, getConfig());
+                registerProvider(applicationProvider);
+            }catch (Throwable ex){
+                LOG.warn("Failed to register application provider {}",applicationProvider,ex);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationProviderServiceImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationProviderServiceImpl.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationProviderServiceImpl.java
new file mode 100644
index 0000000..2297ea5
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationProviderServiceImpl.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.service.impl;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import com.typesafe.config.Config;
+import org.apache.eagle.app.service.ApplicationProviderLoader;
+import org.apache.eagle.app.service.ApplicationProviderService;
+import org.apache.eagle.app.spi.ApplicationProvider;
+import org.apache.eagle.metadata.model.ApplicationDesc;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.stream.Collectors;
+
+/**
+ * Support to load application provider from application.provider.config = "providers.xml" configuration file
+ * or application.provider.dir = "lib/apps" with SPI Class loader
+ *
+ * TODO: hot-manage application provider loading
+ */
+@Singleton
+public class ApplicationProviderServiceImpl implements ApplicationProviderService {
+    private final Config config;
+    private final static Logger LOG = LoggerFactory.getLogger(ApplicationProviderServiceImpl.class);
+    private final ApplicationProviderLoader appProviderLoader;
+    public final static String APP_PROVIDER_LOADER_CLASS_KEY = "application.provider.loader";
+
+    @Inject
+    public ApplicationProviderServiceImpl(Config config){
+        LOG.info("Initializing {}",this.getClass().getCanonicalName());
+        this.config = config;
+        String appProviderLoaderClass = this.config.hasPath(APP_PROVIDER_LOADER_CLASS_KEY)?
+                this.config.getString(APP_PROVIDER_LOADER_CLASS_KEY):ApplicationProviderLoader.getDefaultAppProviderLoader();
+        LOG.info("Initializing {} = {}",APP_PROVIDER_LOADER_CLASS_KEY,appProviderLoaderClass);
+        appProviderLoader = initializeAppProviderLoader(appProviderLoaderClass);
+        LOG.info("Initialized {}",appProviderLoader);
+        reload();
+    }
+
+    private ApplicationProviderLoader initializeAppProviderLoader(String appProviderLoaderClass){
+        try {
+            return (ApplicationProviderLoader) Class.forName(appProviderLoaderClass).getConstructor(Config.class).newInstance(this.config);
+        } catch (Throwable e) {
+            LOG.error("Failed to initialize ApplicationProviderLoader: "+appProviderLoaderClass,e);
+            throw new IllegalStateException("Failed to initialize ApplicationProviderLoader: "+appProviderLoaderClass,e);
+        }
+    }
+
+    public synchronized void reload(){
+        appProviderLoader.reset();
+        LOG.info("Loading application providers ...");
+        appProviderLoader.load();
+        LOG.info("Loaded {} application providers",appProviderLoader.getProviders().size());
+    }
+
+    public Collection<ApplicationProvider> getProviders(){
+        return appProviderLoader.getProviders();
+    }
+
+    public Collection<ApplicationDesc> getApplicationDescs(){
+        return getProviders().stream().map(ApplicationProvider::getApplicationDesc).collect(Collectors.toList());
+    }
+
+    public ApplicationProvider<?> getApplicationProviderByType(String type) {
+        return appProviderLoader.getApplicationProviderByType(type);
+    }
+
+    @Deprecated
+    public ApplicationDesc getApplicationDescByType(String appType) {
+        return appProviderLoader.getApplicationProviderByType(appType).getApplicationDesc();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/loader/ApplicationProviderConfigLoader.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/loader/ApplicationProviderConfigLoader.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/loader/ApplicationProviderConfigLoader.java
deleted file mode 100644
index 11d9905..0000000
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/loader/ApplicationProviderConfigLoader.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.service.loader;
-
-import com.google.common.base.Preconditions;
-import com.typesafe.config.Config;
-import org.apache.eagle.app.config.ApplicationProviderConfig;
-import org.apache.eagle.app.config.ApplicationProvidersConfig;
-import org.apache.eagle.app.service.ApplicationProviderLoader;
-import org.apache.eagle.app.spi.ApplicationProvider;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.xml.bind.JAXBContext;
-import javax.xml.bind.Unmarshaller;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.List;
-
-public class ApplicationProviderConfigLoader extends ApplicationProviderLoader {
-    public final static String DEFAULT_APPLICATIONS_CONFIG_FILE = "providers.xml";
-    private final static String APPLICATIONS_CONFIG_PROPS_KEY = "application.provider.config";
-    private final static Logger LOG = LoggerFactory.getLogger(ApplicationProviderConfigLoader.class);
-    public ApplicationProviderConfigLoader(Config config) {
-        super(config);
-    }
-
-    @Override
-    public void load() {
-        List<ApplicationProviderConfig> applicationProviderConfigs = loadProvidersFromProvidersConf();
-        int totalCount = applicationProviderConfigs.size();
-        int loadedCount = 0,failedCount = 0;
-        for(ApplicationProviderConfig providerConfig: applicationProviderConfigs){
-            try {
-                initializeProvider(providerConfig);
-                loadedCount ++;
-            }catch (Throwable ex){
-                LOG.warn("Failed to initialized {}, ignored",providerConfig,ex);
-                failedCount ++;
-            }
-        }
-        LOG.info("Loaded {} app providers (total: {}, failed: {})",loadedCount,totalCount,failedCount);
-    }
-
-    public static boolean appProviderConfExists(String applicationConfFile){
-        InputStream is = ApplicationProviderConfigLoader.class.getResourceAsStream(applicationConfFile);
-        if(is == null){
-            is = ApplicationProviderConfigLoader.class.getResourceAsStream("/"+applicationConfFile);
-        }
-
-        if(is != null){
-            try {
-                return true;
-            } finally {
-                try {
-                    is.close();
-                } catch (IOException e) {
-                    LOG.debug(e.getMessage());
-                }
-            }
-        } else {
-            return false;
-        }
-    }
-
-    private void initializeProvider(ApplicationProviderConfig providerConfig) throws ClassNotFoundException, IllegalAccessException, InstantiationException {
-        LOG.info("Loading application provider {} from {}",providerConfig.getClassName(),providerConfig.getJarPath());
-        String providerClassName = providerConfig.getClassName();
-        if(providerClassName == null) throw new RuntimeException("provider.classname is null: "+providerConfig);
-        if(providerConfig.getJarPath() == null) throw new RuntimeException("provider.jarpath is null: "+providerConfig);
-
-        Class<?> providerClass = Class.forName(providerClassName);
-
-        if(!ApplicationProvider.class.isAssignableFrom(providerClass)){
-            throw new RuntimeException("providerClassName is not implementation of "+ApplicationProvider.class.getCanonicalName());
-        }
-        ApplicationProvider provider = (ApplicationProvider) providerClass.newInstance();
-        provider.prepare(providerConfig,this.getConfig());
-        Preconditions.checkNotNull(provider.getApplicationDesc(),"appDesc is null");
-        Preconditions.checkNotNull(provider.getApplicationDesc().getType(),"type is null");
-        registerProvider(provider);
-    }
-
-    private List<ApplicationProviderConfig> loadProvidersFromProvidersConf() {
-        String providerConfigFile = DEFAULT_APPLICATIONS_CONFIG_FILE;
-        if(getConfig().hasPath(APPLICATIONS_CONFIG_PROPS_KEY)){
-            providerConfigFile = getConfig().getString(APPLICATIONS_CONFIG_PROPS_KEY);
-            LOG.info("Set {} = {}",APPLICATIONS_CONFIG_PROPS_KEY,providerConfigFile);
-        }
-        InputStream is = null;
-        try {
-            JAXBContext jc = JAXBContext.newInstance(ApplicationProvidersConfig.class);
-            Unmarshaller unmarshaller = jc.createUnmarshaller();
-            is = ApplicationProviderConfigLoader.class.getResourceAsStream(providerConfigFile);
-            if(is == null){
-                is = ApplicationProviderConfigLoader.class.getResourceAsStream("/"+providerConfigFile);
-            }
-            if(is == null){
-                LOG.error("Application provider configuration {} is not found",providerConfigFile);
-            }
-            Preconditions.checkNotNull(is,providerConfigFile+" is not found");
-            return ((ApplicationProvidersConfig) unmarshaller.unmarshal(is)).getProviders();
-        }catch (Exception ex){
-            LOG.error("Failed to load application provider configuration: {}",providerConfigFile,ex);
-            throw new RuntimeException("Failed to load application provider configuration: "+providerConfigFile,ex);
-        } finally {
-            if(is != null) try {
-                is.close();
-            } catch (IOException e) {
-                LOG.error(e.getMessage(),e);
-            }
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/loader/ApplicationProviderSPILoader.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/loader/ApplicationProviderSPILoader.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/loader/ApplicationProviderSPILoader.java
deleted file mode 100644
index 33cb401..0000000
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/loader/ApplicationProviderSPILoader.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.service.loader;
-
-import com.typesafe.config.Config;
-import org.apache.eagle.app.config.ApplicationProviderConfig;
-import org.apache.eagle.app.service.ApplicationProviderLoader;
-import org.apache.eagle.app.spi.ApplicationProvider;
-import org.apache.eagle.app.tools.DynamicJarPathFinder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.net.URL;
-import java.net.URLClassLoader;
-import java.util.ServiceLoader;
-import java.util.function.Function;
-
-public class ApplicationProviderSPILoader extends ApplicationProviderLoader{
-    private final String appProviderExtDir;
-    private final static Logger LOG = LoggerFactory.getLogger(ApplicationProviderSPILoader.class);
-    private final static String APPLICATIONS_DIR_PROPS_KEY = "application.provider.dir";
-
-    public ApplicationProviderSPILoader(Config config) {
-        super(config);
-        if(config.hasPath(APPLICATIONS_DIR_PROPS_KEY)) {
-            this.appProviderExtDir = config.getString(APPLICATIONS_DIR_PROPS_KEY);
-        }else{
-            this.appProviderExtDir = null;
-        }
-
-        LOG.info("Using {}: {}",APPLICATIONS_DIR_PROPS_KEY,this.appProviderExtDir);
-
-    }
-
-    @Override
-    public void load() {
-        if(appProviderExtDir != null) {
-            LOG.info("Loading application providers from class loader of jars in {}", appProviderExtDir);
-            File loc = new File(appProviderExtDir);
-            File[] jarFiles = loc.listFiles(file -> file.getPath().toLowerCase().endsWith(".jar"));
-            if (jarFiles != null) {
-                for (File jarFile : jarFiles) {
-                    try {
-                        URL jarFileUrl = jarFile.toURI().toURL();
-                        LOG.debug("Loading ApplicationProvider from jar: {}", jarFileUrl.toString());
-                        URLClassLoader jarFileClassLoader = new URLClassLoader(new URL[]{jarFileUrl});
-                        loadProviderFromClassLoader(jarFileClassLoader, (applicationProviderConfig) -> jarFileUrl.getPath());
-                    } catch (Exception e) {
-                        LOG.warn("Failed to load application provider from jar {}", jarFile,e);
-                    }
-                }
-            }
-        } else {
-            LOG.info("Loading application providers from context class loader");
-            ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
-            loadProviderFromClassLoader(classLoader,(applicationProviderConfig) -> DynamicJarPathFinder.findPath(applicationProviderConfig.getClass()));
-        }
-    }
-
-    private void loadProviderFromClassLoader(ClassLoader jarFileClassLoader, Function<ApplicationProviderConfig,String> jarFileSupplier){
-        ServiceLoader<ApplicationProvider> serviceLoader = ServiceLoader.load(ApplicationProvider.class, jarFileClassLoader);
-        for (ApplicationProvider applicationProvider : serviceLoader) {
-            try {
-                ApplicationProviderConfig providerConfig = new ApplicationProviderConfig();
-                providerConfig.setClassName(applicationProvider.getClass().getCanonicalName());
-                providerConfig.setJarPath(jarFileSupplier.apply(providerConfig));
-                applicationProvider.prepare(providerConfig, getConfig());
-                registerProvider(applicationProvider);
-            }catch (Throwable ex){
-                LOG.warn("Failed to register application provider {}",applicationProvider,ex);
-            }
-        }
-    }
-}



[2/3] incubator-eagle git commit: [EAGLE-386] Refactor Application Framework Interfaces and StreamEventMapper

Posted by ha...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/AbstractStreamSink.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/AbstractStreamSink.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/AbstractStreamSink.java
new file mode 100644
index 0000000..322ff8b
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/AbstractStreamSink.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.sink;
+
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.BasicOutputCollector;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import org.apache.eagle.alert.engine.model.StreamEvent;
+import org.apache.eagle.app.sink.mapper.StreamEventMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+
+public abstract class AbstractStreamSink extends StreamSink {
+    private final static Logger LOG = LoggerFactory.getLogger(AbstractStreamSink.class);
+    private final static String KEY_FIELD = "KEY";
+    private final static String VALUE_FIELD = "VALUE";
+    private StreamEventMapper streamEventMapper;
+
+    public AbstractStreamSink setEventMapper(StreamEventMapper streamEventMapper){
+        this.streamEventMapper = streamEventMapper;
+        return this;
+    }
+
+    @Override
+    public void prepare(Map stormConf, TopologyContext context) {
+        super.prepare(stormConf, context);
+    }
+
+    @Override
+    public void execute(Tuple input, BasicOutputCollector collector) {
+        try {
+            List<StreamEvent> streamEvents = streamEventMapper.map(input);
+            if(streamEvents!=null) {
+                streamEvents.forEach((streamEvent -> {
+                    try {
+                        onEvent(streamEvent);
+                    } catch (Exception e) {
+                        LOG.error("Failed to execute event {}", streamEvent);
+                        collector.reportError(e);
+                    }
+                }));
+            }
+        } catch (Exception e) {
+                LOG.error("Failed to execute event {}",input);
+                collector.reportError(e);
+        }
+    }
+
+    protected abstract void onEvent(StreamEvent streamEvent);
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        declarer.declare(new Fields(KEY_FIELD,VALUE_FIELD));
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSink.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSink.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSink.java
index eee2a70..1490368 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSink.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSink.java
@@ -26,32 +26,27 @@ import org.slf4j.LoggerFactory;
 import java.util.HashMap;
 import java.util.Map;
 
-public class KafkaStreamSink extends StreamSink {
+public class KafkaStreamSink extends AbstractStreamSink {
     private final static Logger LOGGER = LoggerFactory.getLogger(KafkaStreamSink.class);
-    private final String topicId;
+    private String topicId;
 
-    public KafkaStreamSink(StreamDefinition streamDefinition, ApplicationContext applicationContext) {
-        super(streamDefinition, applicationContext);
-        this.topicId = String.format("EAGLE_%s_%s_%s",
-                applicationContext.getAppEntity().getSite().getSiteId(),
-                applicationContext.getAppEntity().getDescriptor().getType(),
+    @Override
+    public void init(StreamDefinition streamDefinition, ApplicationContext context) {
+        this.topicId = String.format("EAGLE_TOPIC_%s_%s_%s",
+                context.getAppEntity().getSite().getSiteId(),
+                context.getAppEntity().getDescriptor().getType(),
                 streamDefinition.getStreamId());
     }
 
     @Override
     public void prepare(Map stormConf, TopologyContext context) {
         super.prepare(stormConf, context);
-        ensureTopic();
         // TODO: Create KafkaProducer
     }
 
-    private void ensureTopic(){
-        LOGGER.info("TODO: ensure kafka topic {} created",this.topicId);
-    }
-
     @Override
     protected void onEvent(StreamEvent streamEvent) {
-        LOGGER.info("TODO: producing {}",streamEvent);
+        LOGGER.info("TODO: producing {} to '{}'",streamEvent,topicId);
     }
 
     @Override
@@ -62,4 +57,22 @@ public class KafkaStreamSink extends StreamSink {
             }
         };
     }
+
+    @Override
+    public void onAppInstall() {
+        ensureTopicCreated();
+    }
+
+    private void ensureTopicCreated(){
+        LOGGER.info("TODO: ensure kafka topic {} created",this.topicId);
+    }
+
+    private void ensureTopicDeleted(){
+        LOGGER.info("TODO: ensure kafka topic {} deleted",this.topicId);
+    }
+
+    @Override
+    public void onAppUninstall() {
+        ensureTopicDeleted();
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/LoggingStreamSink.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/LoggingStreamSink.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/LoggingStreamSink.java
index ca201a8..8acf325 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/LoggingStreamSink.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/LoggingStreamSink.java
@@ -25,10 +25,12 @@ import org.slf4j.LoggerFactory;
 import java.util.HashMap;
 import java.util.Map;
 
-public class LoggingStreamSink extends StreamSink {
+public class LoggingStreamSink extends AbstractStreamSink {
     private final static Logger LOGGER = LoggerFactory.getLogger(KafkaStreamSink.class);
-    public LoggingStreamSink(StreamDefinition streamDefinition, ApplicationContext applicationContext) {
-        super(streamDefinition, applicationContext);
+
+    @Override
+    public void init(StreamDefinition streamDefinition, ApplicationContext context) {
+        // do nothing
     }
 
     @Override
@@ -40,4 +42,14 @@ public class LoggingStreamSink extends StreamSink {
     public Map<String, Object> getSinkContext() {
         return new HashMap<>();
     }
+
+    @Override
+    public void onAppInstall() {
+        LOGGER.info("Executing onAppInstall callback, do nothing");
+    }
+
+    @Override
+    public void onAppUninstall() {
+        LOGGER.info("Executing onAppUninstall callback, do nothing");
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/StreamSink.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/StreamSink.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/StreamSink.java
index e0f2db2..2052484 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/StreamSink.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/StreamSink.java
@@ -1,82 +1,45 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.sink;
-
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.BasicOutputCollector;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseBasicBolt;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.engine.model.StreamEvent;
-import org.apache.eagle.app.ApplicationContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-import java.util.Map;
-
-public abstract class StreamSink extends BaseBasicBolt {
-    private final static Logger LOG = LoggerFactory.getLogger(StreamSink.class);
-    public final static String KEY_FIELD = "KEY";
-    public final static String VALUE_FIELD = "VALUE";
-
-    public StreamSink(StreamDefinition streamDefinition,ApplicationContext applicationContext){
-        
-    }
-
-    @Override
-    public void prepare(Map stormConf, TopologyContext context) {
-        super.prepare(stormConf, context);
-    }
-
-    @Override
-    public void execute(Tuple input, BasicOutputCollector collector) {
-        List<Object> values = input.getValues();
-        Object inputValue;
-        if(values.size() == 1){
-            inputValue = values.get(0);
-        } else if(values.size() == 2){
-            inputValue = values.get(1);
-        } else{
-            collector.reportError(new IllegalStateException("Expect tuple in size of 1: <StreamEvent> or 2: <Object,StreamEvent>, but got "+values.size()+": "+values));
-            return;
-        }
-
-        if(inputValue instanceof StreamEvent){
-            try {
-                onEvent((StreamEvent) inputValue);
-            }catch (Exception e){
-                LOG.error("Failed to execute event {}",inputValue);
-                collector.reportError(e);
-            }
-        } else {
-            LOG.error("{} is not StreamEvent",inputValue);
-            collector.reportError(new IllegalStateException("Input tuple "+input+"is not type of StreamEvent"));
-        }
-    }
-
-    protected abstract void onEvent(StreamEvent streamEvent);
-
-    public abstract Map<String,Object> getSinkContext();
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        declarer.declare(new Fields(KEY_FIELD,VALUE_FIELD));
-    }
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.sink;
+
+import backtype.storm.topology.base.BaseBasicBolt;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.app.ApplicationContext;
+import org.apache.eagle.app.ApplicationLifecycleListener;
+
+import java.util.Map;
+
+public abstract class StreamSink extends BaseBasicBolt implements ApplicationLifecycleListener {
+    /**
+     * Should only initialize metadata in this method but must not open any resource like connection
+     *
+     * @param streamDefinition
+     * @param context
+     */
+    public abstract void init(StreamDefinition streamDefinition, ApplicationContext context);
+    public abstract Map<String,Object> getSinkContext();
+
+    @Override
+    public void onAppStart() {
+        // StreamSink by default will do nothing when application start
+    }
+
+    @Override
+    public void onAppStop() {
+        // StreamSink by default will do nothing when application start
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/mapper/DirectEventMapper.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/mapper/DirectEventMapper.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/mapper/DirectEventMapper.java
new file mode 100644
index 0000000..631e02a
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/mapper/DirectEventMapper.java
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.sink.mapper;
+
+public interface DirectEventMapper extends StreamEventMapper{}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/mapper/FieldIndexDirectEventMapper.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/mapper/FieldIndexDirectEventMapper.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/mapper/FieldIndexDirectEventMapper.java
new file mode 100644
index 0000000..aa60414
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/mapper/FieldIndexDirectEventMapper.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.sink.mapper;
+
+import backtype.storm.tuple.Tuple;
+import org.apache.eagle.alert.engine.model.StreamEvent;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class FieldIndexDirectEventMapper implements DirectEventMapper {
+    private final int[] fieldIndexs;
+
+    public FieldIndexDirectEventMapper(int ... fieldIndexs){
+        this.fieldIndexs = fieldIndexs;
+    }
+
+    @Override
+    public List<StreamEvent> map(Tuple tuple) throws Exception {
+        List<StreamEvent> events = new ArrayList<>(fieldIndexs.length);
+        for(int index:fieldIndexs){
+            events.add((StreamEvent) tuple.getValue(index));
+        }
+        return events;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/mapper/FieldNameDirectEventMapper.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/mapper/FieldNameDirectEventMapper.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/mapper/FieldNameDirectEventMapper.java
new file mode 100644
index 0000000..bbb4a11
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/mapper/FieldNameDirectEventMapper.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.sink.mapper;
+
+import backtype.storm.tuple.Tuple;
+import org.apache.eagle.alert.engine.model.StreamEvent;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class FieldNameDirectEventMapper implements DirectEventMapper {
+    private final String[] fieldNames;
+
+    public FieldNameDirectEventMapper(String ... fieldNames){
+        this.fieldNames = fieldNames;
+    }
+
+    @Override
+    public List<StreamEvent> map(Tuple tuple) throws Exception {
+        List<StreamEvent> events = new ArrayList<>(fieldNames.length);
+        for(String fieldName:fieldNames){
+            events.add((StreamEvent) tuple.getValueByField(fieldName));
+        }
+        return events;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/mapper/FlattenEventMapper.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/mapper/FlattenEventMapper.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/mapper/FlattenEventMapper.java
new file mode 100644
index 0000000..cb8ee33
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/mapper/FlattenEventMapper.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.sink.mapper;
+
+import backtype.storm.tuple.Tuple;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.engine.model.StreamEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.List;
+
+public class FlattenEventMapper implements StreamEventMapper {
+
+    private final StreamDefinition streamDefinition;
+    private final TimestampSelector timestampSelector;
+
+    private final static String DEFAULT_TIMESTAMP_COLUMN_NAME = "timestamp";
+    private final static Logger LOG = LoggerFactory.getLogger(FlattenEventMapper.class);
+
+    public FlattenEventMapper(StreamDefinition streamDefinition, TimestampSelector timestampSelector){
+        this.streamDefinition = streamDefinition;
+        this.timestampSelector = timestampSelector;
+    }
+
+    public FlattenEventMapper(StreamDefinition streamDefinition, String timestampFieldName){
+        this.streamDefinition = streamDefinition;
+        this.timestampSelector = tuple -> tuple.getLongByField(timestampFieldName);
+    }
+
+    public FlattenEventMapper(StreamDefinition streamDefinition){
+        this(streamDefinition,DEFAULT_TIMESTAMP_COLUMN_NAME);
+    }
+
+    @Override
+    public List<StreamEvent> map(Tuple tuple) throws Exception {
+        Long timestamp = 0L;
+        try {
+            timestamp = timestampSelector.apply(tuple);
+        } catch (Throwable fieldNotExistException){
+            if(streamDefinition.isTimeseries()) {
+                LOG.error("Stream (streamId = {}) is time series, but failed to detect timestamp, treating as {}", streamDefinition.getStreamId(), timestamp, fieldNotExistException);
+            } else{
+                /// Ignored for non-timeseries stream
+            }
+        }
+
+        StreamEvent streamEvent = new StreamEvent(streamDefinition.getStreamId(),
+                timestamp,
+                this.streamDefinition.getColumns().stream().map((column) -> {
+                    Object value = null;
+                    try {
+                        value = tuple.getValueByField(column.getName());
+                    }catch (IllegalArgumentException fieldNotExistException){
+                        if(LOG.isDebugEnabled()) {
+                            LOG.debug("Column '{}' of stream {} not exist in {}, treating as null", column.getName(), streamDefinition.getStreamId(), tuple, fieldNotExistException);
+                        }
+                    }
+                    return value;
+                }).toArray());
+        return Collections.singletonList(streamEvent);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/mapper/StreamEventMapper.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/mapper/StreamEventMapper.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/mapper/StreamEventMapper.java
new file mode 100644
index 0000000..4f27423
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/mapper/StreamEventMapper.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.sink.mapper;
+
+import backtype.storm.tuple.Tuple;
+import org.apache.eagle.alert.engine.model.StreamEvent;
+
+import java.io.Serializable;
+import java.util.List;
+
+@FunctionalInterface
+public interface StreamEventMapper extends Serializable{
+    /**
+     * @param tuple
+     * @return
+     * @throws Exception
+     */
+    List<StreamEvent> map(Tuple tuple) throws Exception;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/mapper/TimestampSelector.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/mapper/TimestampSelector.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/mapper/TimestampSelector.java
new file mode 100644
index 0000000..cac66e7
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/mapper/TimestampSelector.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.sink.mapper;
+
+import backtype.storm.tuple.Tuple;
+
+import java.io.Serializable;
+import java.util.function.Function;
+
+@FunctionalInterface
+public interface TimestampSelector extends Function<Tuple,Long>,Serializable{}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/AbstractApplicationProvider.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/AbstractApplicationProvider.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/AbstractApplicationProvider.java
index 37311eb..8d61066 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/AbstractApplicationProvider.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/spi/AbstractApplicationProvider.java
@@ -22,7 +22,7 @@ import org.apache.eagle.app.Application;
 import org.apache.eagle.app.config.ApplicationProviderConfig;
 import org.apache.eagle.app.config.ApplicationProviderDescConfig;
 import org.apache.eagle.app.sink.KafkaStreamSink;
-import org.apache.eagle.app.sink.StreamSink;
+import org.apache.eagle.app.sink.AbstractStreamSink;
 import org.apache.eagle.metadata.model.ApplicationDesc;
 import org.apache.eagle.metadata.model.ApplicationDocs;
 import org.apache.eagle.metadata.model.Configuration;
@@ -78,8 +78,8 @@ public abstract class AbstractApplicationProvider<T extends Application> impleme
                 envConfig.getString(APPLICATIONS_SINK_TYPE_PROPS_KEY) : DEFAULT_APPLICATIONS_SINK_TYPE;
         try {
             Class<?> sinkClass = Class.forName(sinkClassName);
-            if(!StreamSink.class.isAssignableFrom(sinkClass)){
-                throw new IllegalStateException(sinkClassName+ "is not assignable from "+StreamSink.class.getCanonicalName());
+            if(!AbstractStreamSink.class.isAssignableFrom(sinkClass)){
+                throw new IllegalStateException(sinkClassName+ "is not assignable from "+AbstractStreamSink.class.getCanonicalName());
             }
             applicationDesc.setSinkClass(sinkClass);
         } catch (ClassNotFoundException e) {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/AppSimulator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/AppSimulator.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/AppSimulator.java
deleted file mode 100644
index a3ef0ee..0000000
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/AppSimulator.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.test;
-
-import org.apache.eagle.app.spi.ApplicationProvider;
-
-import java.util.Map;
-
-/**
- * Application test simulator for developer to quickly run application without diving into application lifecycle
- */
-public interface AppSimulator {
-    /**
-     *
-     * @param appType
-     */
-    void submit(String appType);
-    /**
-     *
-     * @param appType
-     * @param appConfig
-     */
-    void submit(String appType, Map<String,Object> appConfig);
-
-    /**
-     *
-     * @param appProviderClass
-     */
-    void submit(Class<? extends ApplicationProvider> appProviderClass);
-
-    /**
-     *
-     * @param appProviderClass
-     * @param appConfig
-     */
-    void submit(Class<? extends ApplicationProvider> appProviderClass, Map<String,Object> appConfig) throws Exception;
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/AppSimulatorImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/AppSimulatorImpl.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/AppSimulatorImpl.java
index f5af815..2e4d33b 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/AppSimulatorImpl.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/AppSimulatorImpl.java
@@ -20,7 +20,7 @@ import com.google.inject.Inject;
 import com.typesafe.config.Config;
 import org.apache.eagle.app.config.ApplicationProviderConfig;
 import org.apache.eagle.app.resource.ApplicationResource;
-import org.apache.eagle.app.service.AppOperations;
+import org.apache.eagle.app.service.ApplicationOperations;
 import org.apache.eagle.app.spi.ApplicationProvider;
 import org.apache.eagle.app.tools.DynamicJarPathFinder;
 import org.apache.eagle.metadata.model.ApplicationEntity;
@@ -32,7 +32,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 
-public class AppSimulatorImpl implements AppSimulator {
+public class AppSimulatorImpl extends ApplicationSimulator {
     private final Config config;
     private final SiteResource siteResource;
     private final ApplicationResource applicationResource;
@@ -55,9 +55,9 @@ public class AppSimulatorImpl implements AppSimulator {
         siteResource.createSite(siteEntity);
         Assert.assertNotNull(siteEntity.getUuid());
         // Install application
-        ApplicationEntity applicationEntity = applicationResource.installApplication(new AppOperations.InstallOperation(siteEntity.getSiteId(),appType, ApplicationEntity.Mode.LOCAL));
+        ApplicationEntity applicationEntity = applicationResource.installApplication(new ApplicationOperations.InstallOperation(siteEntity.getSiteId(),appType, ApplicationEntity.Mode.LOCAL));
         // Start application
-        applicationResource.startApplication(new AppOperations.StartOperation(applicationEntity.getUuid()));
+        applicationResource.startApplication(new ApplicationOperations.StartOperation(applicationEntity.getUuid()));
     }
 
     private final static AtomicInteger incr = new AtomicInteger();

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/AppTestGuiceModule.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/AppTestGuiceModule.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/AppTestGuiceModule.java
index b2b0194..7882a9b 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/AppTestGuiceModule.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/AppTestGuiceModule.java
@@ -20,7 +20,7 @@ import com.google.inject.AbstractModule;
 import com.google.inject.Singleton;
 import org.apache.eagle.app.ApplicationGuiceModule;
 import org.apache.eagle.common.module.CommonGuiceModule;
-import org.apache.eagle.metadata.persistence.MemoryMetadataStore;
+import org.apache.eagle.metadata.service.memory.MemoryMetadataStore;
 
 public class AppTestGuiceModule extends AbstractModule{
     @Override
@@ -28,6 +28,6 @@ public class AppTestGuiceModule extends AbstractModule{
         install(new CommonGuiceModule());
         install(new ApplicationGuiceModule());
         install(new MemoryMetadataStore());
-        bind(AppSimulator.class).to(AppSimulatorImpl.class).in(Singleton.class);
+        bind(ApplicationSimulator.class).to(AppSimulatorImpl.class).in(Singleton.class);
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationSimulator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationSimulator.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationSimulator.java
new file mode 100644
index 0000000..f8114d2
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/test/ApplicationSimulator.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.app.test;
+
+import com.google.inject.Guice;
+import com.google.inject.Module;
+import org.apache.eagle.app.spi.ApplicationProvider;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Application test simulator for developer to quickly run application without diving into application lifecycle
+ */
+public abstract class ApplicationSimulator {
+    /**
+     *
+     * @param appType
+     */
+    public abstract void submit(String appType);
+
+    /**
+     *
+     * @param appType
+     * @param appConfig
+     */
+    public abstract void submit(String appType, Map<String,Object> appConfig);
+
+    /**
+     *
+     * @param appProviderClass
+     */
+    public abstract void submit(Class<? extends ApplicationProvider> appProviderClass);
+
+    /**
+     *
+     * @param appProviderClass
+     * @param appConfig
+     */
+    public abstract void submit(Class<? extends ApplicationProvider> appProviderClass, Map<String,Object> appConfig) throws Exception;
+
+    public static ApplicationSimulator getInstance(){
+        return Guice.createInjector(new AppTestGuiceModule()).getInstance(ApplicationSimulator.class);
+    }
+
+    /**
+     * @param modules additional modules
+     * @return ApplicationSimulator instance
+     */
+    public static ApplicationSimulator getInstance(Module ... modules){
+        List<Module> contextModules = Arrays.asList(modules);
+        contextModules.add(new AppTestGuiceModule());
+        return Guice.createInjector(contextModules).getInstance(ApplicationSimulator.class);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/eagle-core/eagle-app/eagle-app-base/src/main/resources/applications.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/resources/applications.xml b/eagle-core/eagle-app/eagle-app-base/src/main/resources/applications.xml
deleted file mode 100644
index 5f67807..0000000
--- a/eagle-core/eagle-app/eagle-app-base/src/main/resources/applications.xml
+++ /dev/null
@@ -1,42 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one or more
-  ~ contributor license agreements.  See the NOTICE file distributed with
-  ~ this work for additional information regarding copyright ownership.
-  ~ The ASF licenses this file to You under the Apache License, Version 2.0
-  ~ (the "License"); you may not use this file except in compliance with
-  ~ the License.  You may obtain a copy of the License at
-  ~
-  ~    http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing, software
-  ~ distributed under the License is distributed on an "AS IS" BASIS,
-  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  ~ See the License for the specific language governing permissions and
-  ~ limitations under the License.
-  -->
-
-<applications>
-    <application>
-        <type>EXAMPLE_APP</type>
-        <version>0.2.0</version>
-        <name>Example Application</name>
-        <jarPath>target/apache-eagle-example-app.jar</jarPath>
-        <className>org.apache.eagle.app.base.example.ExampleApplication</className>
-        <!-- 'view' provides UI elements like portal/widget/dashboard, etc. -->
-        <viewPath>webapp/app/example</viewPath>
-        <configuration>
-            <property>
-                <name>kafka.topic</name>
-                <value>hdfs_audit</value>
-                <description>Kafka Topic</description>
-            </property>
-            <property>
-                <name>zookeeper.server</name>
-                <displayName>Zookeeper Server</displayName>
-                <value>localhost:2181</value>
-                <description>Zookeeper Server address</description>
-            </property>
-        </configuration>
-    </application>
-</applications>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/ApplicationProviderServiceTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/ApplicationProviderServiceTest.java b/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/ApplicationProviderServiceTest.java
index 61f7542..a3f0ba0 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/ApplicationProviderServiceTest.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/ApplicationProviderServiceTest.java
@@ -19,12 +19,11 @@ package org.apache.eagle.app;
 
 import com.google.inject.Guice;
 import com.google.inject.Injector;
-import org.apache.eagle.app.config.ApplicationProviderConfig;
 import org.apache.eagle.app.service.ApplicationProviderService;
 import org.apache.eagle.app.spi.ApplicationProvider;
 import org.apache.eagle.common.module.CommonGuiceModule;
 import org.apache.eagle.metadata.model.ApplicationDesc;
-import org.apache.eagle.metadata.persistence.MemoryMetadataStore;
+import org.apache.eagle.metadata.service.memory.MemoryMetadataStore;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/TestApplicationImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/TestApplicationImpl.java b/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/TestApplicationImpl.java
index 90a2b90..d379185 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/TestApplicationImpl.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/TestApplicationImpl.java
@@ -28,45 +28,35 @@ import org.junit.Ignore;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Arrays;
 import java.util.Map;
 
 @Ignore
 public class TestApplicationImpl extends AbstractApplication {
     private final static Logger LOG = LoggerFactory.getLogger(TestApplicationImpl.class);
-    public class RandomEventSpout extends BaseRichSpout {
-        SpoutOutputCollector _collector;
+    protected void buildApp(TopologyBuilder builder, ApplicationContext context) {
+        builder.setSpout("metric_spout", new RandomEventSpout(), 4);
+        builder.setBolt("sink_1",context.getFlattenStreamSink("SAMPLE_STREAM_1")).fieldsGrouping("metric_spout",new Fields("metric"));
+        builder.setBolt("sink_2",context.getFlattenStreamSink("SAMPLE_STREAM_2")).fieldsGrouping("metric_spout",new Fields("metric"));
+    }
 
-        @SuppressWarnings("rawtypes")
+    private class RandomEventSpout extends BaseRichSpout {
+        private SpoutOutputCollector _collector;
         @Override
-        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
-            _collector = collector;
+        public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
+            _collector = spoutOutputCollector;
         }
 
         @Override
         public void nextTuple() {
-
-        }
-
-        @Override
-        public void ack(Object id) {
-            //Ignored
+            _collector.emit(Arrays.asList("disk.usage",System.currentTimeMillis(),"host_1",56.7));
+            _collector.emit(Arrays.asList("cpu.usage",System.currentTimeMillis(),"host_2",99.8));
         }
 
         @Override
-        public void fail(Object id) {
-            _collector.emit(new Values(id), id);
+        public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
+            outputFieldsDeclarer.declare(new Fields("metric","timestamp","source","value"));
         }
-
-        @Override
-        public void declareOutputFields(OutputFieldsDeclarer declarer) {
-            declarer.declare(new Fields("key","event"));
-        }
-    }
-
-    protected void buildTopology(TopologyBuilder builder, ApplicationContext context) {
-        builder.setSpout("mockMetricSpout", new RandomEventSpout(), 4);
-        builder.setBolt("sink_1",context.getStreamSink("TEST_STREAM_1")).fieldsGrouping("mockMetricSpout",new Fields("key"));
-        builder.setBolt("sink_2",context.getStreamSink("TEST_STREAM_2")).fieldsGrouping("mockMetricSpout",new Fields("key"));
     }
 
     public static class Provider extends AbstractApplicationProvider<TestApplicationImpl> {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/TestApplicationTestSuite.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/TestApplicationTestSuite.java b/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/TestApplicationTestSuite.java
index f86f903..10fcffc 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/TestApplicationTestSuite.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/test/java/org/apache/eagle/app/TestApplicationTestSuite.java
@@ -18,8 +18,8 @@ package org.apache.eagle.app;
 
 import com.google.inject.Inject;
 import org.apache.eagle.app.resource.ApplicationResource;
-import org.apache.eagle.app.service.AppOperations;
-import org.apache.eagle.app.test.AppSimulator;
+import org.apache.eagle.app.service.ApplicationOperations;
+import org.apache.eagle.app.test.ApplicationSimulator;
 import org.apache.eagle.app.test.AppUnitTestRunner;
 import org.apache.eagle.metadata.model.ApplicationDesc;
 import org.apache.eagle.metadata.model.ApplicationEntity;
@@ -35,7 +35,7 @@ import java.util.Collection;
 public class TestApplicationTestSuite {
     @Inject private SiteResource siteResource;
     @Inject private ApplicationResource applicationResource;
-    @Inject private AppSimulator simulator;
+    @Inject private ApplicationSimulator simulator;
 
     @Test
     public void testApplicationProviderLoading(){
@@ -55,13 +55,13 @@ public class TestApplicationTestSuite {
         Assert.assertNotNull(siteEntity.getUuid());
 
         // Install application
-        ApplicationEntity applicationEntity = applicationResource.installApplication(new AppOperations.InstallOperation("test_site","TEST_APPLICATION", ApplicationEntity.Mode.LOCAL));
+        ApplicationEntity applicationEntity = applicationResource.installApplication(new ApplicationOperations.InstallOperation("test_site","TEST_APPLICATION", ApplicationEntity.Mode.LOCAL));
         // Start application
-        applicationResource.startApplication(new AppOperations.StartOperation(applicationEntity.getUuid()));
+        applicationResource.startApplication(new ApplicationOperations.StartOperation(applicationEntity.getUuid()));
         // Stop application
-        applicationResource.stopApplication(new AppOperations.StopOperation(applicationEntity.getUuid()));
+        applicationResource.stopApplication(new ApplicationOperations.StopOperation(applicationEntity.getUuid()));
         // Uninstall application
-        applicationResource.uninstallApplication(new AppOperations.UninstallOperation(applicationEntity.getUuid()));
+        applicationResource.uninstallApplication(new ApplicationOperations.UninstallOperation(applicationEntity.getUuid()));
         try {
             applicationResource.getApplicationEntityByUUID(applicationEntity.getUuid());
             Assert.fail("Application instance (UUID: "+applicationEntity.getUuid()+") should have been uninstalled");

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/eagle-core/eagle-app/eagle-app-base/src/test/resources/TestApplicationMetadata.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/test/resources/TestApplicationMetadata.xml b/eagle-core/eagle-app/eagle-app-base/src/test/resources/TestApplicationMetadata.xml
index 36a64d0..4582ed1 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/test/resources/TestApplicationMetadata.xml
+++ b/eagle-core/eagle-app/eagle-app-base/src/test/resources/TestApplicationMetadata.xml
@@ -44,6 +44,10 @@
                     <type>string</type>
                 </column>
                 <column>
+                    <name>source</name>
+                    <type>string</type>
+                </column>
+                <column>
                     <name>value</name>
                     <type>double</type>
                     <defaultValue>0.0</defaultValue>
@@ -61,6 +65,10 @@
                     <type>string</type>
                 </column>
                 <column>
+                    <name>source</name>
+                    <type>string</type>
+                </column>
+                <column>
                     <name>value</name>
                     <type>double</type>
                     <defaultValue>0.0</defaultValue>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/eagle-core/eagle-app/eagle-app-base/src/test/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/test/resources/application.conf b/eagle-core/eagle-app/eagle-app-base/src/test/resources/application.conf
index 49654c0..ac0b76c 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/test/resources/application.conf
+++ b/eagle-core/eagle-app/eagle-app-base/src/test/resources/application.conf
@@ -40,7 +40,7 @@
 		}
 	},
 	"metadata":{
-		"store": "org.apache.eagle.metadata.persistence.MemoryMetadataStore"
+		"store": "org.apache.eagle.metadata.service.memory.MemoryMetadataStore"
 	},
 	"application":{
 		"sink":{
@@ -48,7 +48,7 @@
 			"boostrap.server":"localhost:9092"
 		}
 		"provider":{
-			"loader":"org.apache.eagle.app.service.loader.ApplicationProviderSPILoader"
+			"loader": "org.apache.eagle.app.service.impl.ApplicationProviderSPILoader"
 		}
 	}
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/eagle-core/eagle-metadata/eagle-metadata-base/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-metadata/eagle-metadata-base/pom.xml b/eagle-core/eagle-metadata/eagle-metadata-base/pom.xml
index 5e74a41..ffe8c73 100644
--- a/eagle-core/eagle-metadata/eagle-metadata-base/pom.xml
+++ b/eagle-core/eagle-metadata/eagle-metadata-base/pom.xml
@@ -49,5 +49,9 @@
             <groupId>com.google.inject.extensions</groupId>
             <artifactId>guice-servlet</artifactId>
         </dependency>
+        <dependency>
+            <groupId>io.dropwizard</groupId>
+            <artifactId>dropwizard-servlets</artifactId>
+        </dependency>
     </dependencies>
 </project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/model/ApplicationEntity.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/model/ApplicationEntity.java b/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/model/ApplicationEntity.java
index 09377ab..852041a 100644
--- a/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/model/ApplicationEntity.java
+++ b/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/model/ApplicationEntity.java
@@ -79,7 +79,7 @@ public class ApplicationEntity extends PersistenceEntity {
     public void ensureDefault() {
         super.ensureDefault();
         if(this.appId == null){
-            this.appId = String.format("EAGLE_APP_%s_%s",this.getSite().getSiteId(),this.getDescriptor().getType());
+            this.appId = String.format("EAGLE_APP[TYPE=%s,SITE=%s]",this.getDescriptor().getType(),this.getSite().getSiteId());
         }
         if(this.status == null){
             this.status = Status.INITIAILIZED;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/model/StreamSinkDesc.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/model/StreamSinkDesc.java b/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/model/StreamSinkDesc.java
new file mode 100644
index 0000000..42d7603
--- /dev/null
+++ b/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/model/StreamSinkDesc.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.metadata.model;
+
+import java.util.Map;
+
+public class StreamSinkDesc {
+    private Class<?> sinkType;
+    private Map<String,Object> sinkContext;
+
+    public Class<?> getSinkType() {
+        return sinkType;
+    }
+
+    public void setSinkType(Class<?> sinkType) {
+        this.sinkType = sinkType;
+    }
+
+    public Map<String, Object> getSinkContext() {
+        return sinkContext;
+    }
+
+    public void setSinkContext(Map<String, Object> sinkContext) {
+        this.sinkContext = sinkContext;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/persistence/MemoryMetadataStore.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/persistence/MemoryMetadataStore.java b/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/persistence/MemoryMetadataStore.java
deleted file mode 100644
index 5e3c2f4..0000000
--- a/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/persistence/MemoryMetadataStore.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.metadata.persistence;
-
-import org.apache.eagle.alert.metadata.IMetadataDao;
-import org.apache.eagle.alert.metadata.impl.InMemMetadataDaoImpl;
-import org.apache.eagle.metadata.service.ApplicationEntityService;
-import org.apache.eagle.metadata.service.SiteEntityService;
-import org.apache.eagle.metadata.service.memory.ApplicationEntityServiceMemoryImpl;
-import org.apache.eagle.metadata.service.memory.SiteEntityEntityServiceMemoryImpl;
-
-public class MemoryMetadataStore extends MetadataStore {
-    @Override
-    protected void configure() {
-        bind(SiteEntityService.class).to(SiteEntityEntityServiceMemoryImpl.class);
-        bind(ApplicationEntityService.class).to(ApplicationEntityServiceMemoryImpl.class);
-        bind(IMetadataDao.class).to(InMemMetadataDaoImpl.class);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/persistence/MetadataStore.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/persistence/MetadataStore.java b/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/persistence/MetadataStore.java
index 71b1476..0ad8c87 100644
--- a/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/persistence/MetadataStore.java
+++ b/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/persistence/MetadataStore.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
@@ -19,6 +19,7 @@ package org.apache.eagle.metadata.persistence;
 import com.google.inject.AbstractModule;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
+import org.apache.eagle.metadata.service.memory.MemoryMetadataStore;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -27,6 +28,7 @@ public abstract class MetadataStore extends AbstractModule {
     public static final String METADATA_STORE_CONFIG_KEY = "metadata.store";
 
     private static MetadataStore instance;
+
     public static MetadataStore getInstance(){
         String metadataStoreClass = null;
         if(instance == null) {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/service/memory/MemoryMetadataStore.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/service/memory/MemoryMetadataStore.java b/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/service/memory/MemoryMetadataStore.java
new file mode 100644
index 0000000..0bdb199
--- /dev/null
+++ b/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/service/memory/MemoryMetadataStore.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.metadata.service.memory;
+
+import org.apache.eagle.alert.metadata.IMetadataDao;
+import org.apache.eagle.alert.metadata.impl.InMemMetadataDaoImpl;
+import org.apache.eagle.metadata.persistence.MetadataStore;
+import org.apache.eagle.metadata.service.ApplicationEntityService;
+import org.apache.eagle.metadata.service.SiteEntityService;
+
+public class MemoryMetadataStore extends MetadataStore {
+    @Override
+    protected void configure() {
+        bind(SiteEntityService.class).to(SiteEntityEntityServiceMemoryImpl.class);
+        bind(ApplicationEntityService.class).to(ApplicationEntityServiceMemoryImpl.class);
+        bind(IMetadataDao.class).to(InMemMetadataDaoImpl.class);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/eagle-examples/eagle-app-example/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-examples/eagle-app-example/pom.xml b/eagle-examples/eagle-app-example/pom.xml
index 45c887c..5d00fc8 100644
--- a/eagle-examples/eagle-app-example/pom.xml
+++ b/eagle-examples/eagle-app-example/pom.xml
@@ -26,17 +26,11 @@
     <modelVersion>4.0.0</modelVersion>
     <artifactId>eagle-app-example</artifactId>
     <dependencies>
-
         <dependency>
             <groupId>org.apache.eagle</groupId>
             <artifactId>eagle-app-base</artifactId>
             <version>${project.version}</version>
         </dependency>
-        <dependency>
-            <groupId>org.mockito</groupId>
-            <artifactId>mockito-all</artifactId>
-            <version>1.10.19</version>
-        </dependency>
     </dependencies>
     <build>
         <resources>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/eagle-examples/eagle-app-example/src/main/java/org/apache/eagle/app/example/ExampleApplication.java
----------------------------------------------------------------------
diff --git a/eagle-examples/eagle-app-example/src/main/java/org/apache/eagle/app/example/ExampleApplication.java b/eagle-examples/eagle-app-example/src/main/java/org/apache/eagle/app/example/ExampleApplication.java
index 71037b3..5e6e548 100644
--- a/eagle-examples/eagle-app-example/src/main/java/org/apache/eagle/app/example/ExampleApplication.java
+++ b/eagle-examples/eagle-app-example/src/main/java/org/apache/eagle/app/example/ExampleApplication.java
@@ -16,15 +16,41 @@
  */
 package org.apache.eagle.app.example;
 
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
 import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.topology.base.BaseRichSpout;
 import backtype.storm.tuple.Fields;
-import org.apache.eagle.app.ApplicationContext;
 import org.apache.eagle.app.AbstractApplication;
+import org.apache.eagle.app.ApplicationContext;
+
+import java.util.Arrays;
+import java.util.Map;
 
 public class ExampleApplication extends AbstractApplication {
-    protected void buildTopology(TopologyBuilder builder, ApplicationContext context) {
-        builder.setSpout("mockMetricSpout", new RandomEventSpout(), 4);
-        builder.setBolt("sink_1",context.getStreamSink("SAMPLE_STREAM_1")).fieldsGrouping("mockMetricSpout",new Fields("key"));
-        builder.setBolt("sink_2",context.getStreamSink("SAMPLE_STREAM_2")).fieldsGrouping("mockMetricSpout",new Fields("key"));
+    protected void buildApp(TopologyBuilder builder, ApplicationContext context) {
+        builder.setSpout("metric_spout", new RandomEventSpout(), 4);
+        builder.setBolt("sink_1",context.getFlattenStreamSink("SAMPLE_STREAM_1")).fieldsGrouping("metric_spout",new Fields("metric"));
+        builder.setBolt("sink_2",context.getFlattenStreamSink("SAMPLE_STREAM_2")).fieldsGrouping("metric_spout",new Fields("metric"));
+    }
+
+    private class RandomEventSpout extends BaseRichSpout {
+        private SpoutOutputCollector _collector;
+        @Override
+        public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
+            _collector = spoutOutputCollector;
+        }
+
+        @Override
+        public void nextTuple() {
+            _collector.emit(Arrays.asList("disk.usage",System.currentTimeMillis(),"host_1",56.7));
+            _collector.emit(Arrays.asList("cpu.usage",System.currentTimeMillis(),"host_2",99.8));
+        }
+
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
+            outputFieldsDeclarer.declare(new Fields("metric","timestamp","source","value"));
+        }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/eagle-examples/eagle-app-example/src/main/java/org/apache/eagle/app/example/ExampleApplicationProvider.java
----------------------------------------------------------------------
diff --git a/eagle-examples/eagle-app-example/src/main/java/org/apache/eagle/app/example/ExampleApplicationProvider.java b/eagle-examples/eagle-app-example/src/main/java/org/apache/eagle/app/example/ExampleApplicationProvider.java
index bd4e9b1..5949853 100644
--- a/eagle-examples/eagle-app-example/src/main/java/org/apache/eagle/app/example/ExampleApplicationProvider.java
+++ b/eagle-examples/eagle-app-example/src/main/java/org/apache/eagle/app/example/ExampleApplicationProvider.java
@@ -47,9 +47,8 @@ public class ExampleApplicationProvider extends AbstractApplicationProvider<Exam
         sampleStreamDefinition.setDescription("Auto generated sample Schema for "+streamId);
         List<StreamColumn> streamColumns = new ArrayList<>();
 
-        streamColumns.add(new StreamColumn.Builder().name("name").type(StreamColumn.Type.STRING).build());
-        streamColumns.add(new StreamColumn.Builder().name("host").type(StreamColumn.Type.STRING).build());
-        streamColumns.add(new StreamColumn.Builder().name("flag").type(StreamColumn.Type.BOOL).build());
+        streamColumns.add(new StreamColumn.Builder().name("metric").type(StreamColumn.Type.STRING).build());
+        streamColumns.add(new StreamColumn.Builder().name("source").type(StreamColumn.Type.STRING).build());
         streamColumns.add(new StreamColumn.Builder().name("timestamp").type(StreamColumn.Type.LONG).build());
         streamColumns.add(new StreamColumn.Builder().name("value").type(StreamColumn.Type.DOUBLE).build());
         sampleStreamDefinition.setColumns(streamColumns);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/eagle-examples/eagle-app-example/src/main/java/org/apache/eagle/app/example/ExampleApplicationProvider2.java
----------------------------------------------------------------------
diff --git a/eagle-examples/eagle-app-example/src/main/java/org/apache/eagle/app/example/ExampleApplicationProvider2.java b/eagle-examples/eagle-app-example/src/main/java/org/apache/eagle/app/example/ExampleApplicationProvider2.java
index f4b3f2f..ca5a69b 100644
--- a/eagle-examples/eagle-app-example/src/main/java/org/apache/eagle/app/example/ExampleApplicationProvider2.java
+++ b/eagle-examples/eagle-app-example/src/main/java/org/apache/eagle/app/example/ExampleApplicationProvider2.java
@@ -23,7 +23,7 @@ import org.apache.eagle.app.spi.AbstractApplicationProvider;
  */
 public class ExampleApplicationProvider2 extends AbstractApplicationProvider<ExampleApplication> {
     public ExampleApplicationProvider2() {
-        super("ExampleApplicationMetadata.xml");
+        super("/META-INF/apps/example/metadata.xml");
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/eagle-examples/eagle-app-example/src/main/java/org/apache/eagle/app/example/RandomEventSpout.java
----------------------------------------------------------------------
diff --git a/eagle-examples/eagle-app-example/src/main/java/org/apache/eagle/app/example/RandomEventSpout.java b/eagle-examples/eagle-app-example/src/main/java/org/apache/eagle/app/example/RandomEventSpout.java
deleted file mode 100644
index ac507b6..0000000
--- a/eagle-examples/eagle-app-example/src/main/java/org/apache/eagle/app/example/RandomEventSpout.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.app.example;
-
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichSpout;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Values;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-
-public class RandomEventSpout extends BaseRichSpout {
-        private final static Logger LOG = LoggerFactory.getLogger(RandomEventSpout.class);
-        SpoutOutputCollector _collector;
-
-        @SuppressWarnings("rawtypes")
-        @Override
-        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
-            _collector = collector;
-        }
-
-        @Override
-        public void nextTuple() {
-
-        }
-
-        @Override
-        public void ack(Object id) {
-            //Ignored
-        }
-
-        @Override
-        public void fail(Object id) {
-            _collector.emit(new Values(id), id);
-        }
-
-        @Override
-        public void declareOutputFields(OutputFieldsDeclarer declarer) {
-            declarer.declare(new Fields("key","event"));
-        }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/eagle-examples/eagle-app-example/src/main/resources/ExampleApplicationMetadata.xml
----------------------------------------------------------------------
diff --git a/eagle-examples/eagle-app-example/src/main/resources/ExampleApplicationMetadata.xml b/eagle-examples/eagle-app-example/src/main/resources/ExampleApplicationMetadata.xml
index b61ed54..0e5536c 100644
--- a/eagle-examples/eagle-app-example/src/main/resources/ExampleApplicationMetadata.xml
+++ b/eagle-examples/eagle-app-example/src/main/resources/ExampleApplicationMetadata.xml
@@ -42,6 +42,10 @@
                     <type>string</type>
                 </column>
                 <column>
+                    <name>source</name>
+                    <type>string</type>
+                </column>
+                <column>
                     <name>value</name>
                     <type>double</type>
                     <defaultValue>0.0</defaultValue>
@@ -59,6 +63,10 @@
                     <type>string</type>
                 </column>
                 <column>
+                    <name>source</name>
+                    <type>string</type>
+                </column>
+                <column>
                     <name>value</name>
                     <type>double</type>
                     <defaultValue>0.0</defaultValue>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/eagle-examples/eagle-app-example/src/main/resources/META-INF/apps/example/metadata.xml
----------------------------------------------------------------------
diff --git a/eagle-examples/eagle-app-example/src/main/resources/META-INF/apps/example/metadata.xml b/eagle-examples/eagle-app-example/src/main/resources/META-INF/apps/example/metadata.xml
new file mode 100644
index 0000000..0e5536c
--- /dev/null
+++ b/eagle-examples/eagle-app-example/src/main/resources/META-INF/apps/example/metadata.xml
@@ -0,0 +1,109 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<application>
+    <type>EXAMPLE_APPLICATION_2</type>
+    <name>Example Monitoring Application</name>
+    <version>0.5.0-incubating</version>
+    <appClass>org.apache.eagle.app.example.ExampleApplication</appClass>
+    <viewPath>/apps/example</viewPath>
+    <configuration>
+        <property>
+            <name>message</name>
+            <displayName>Message</displayName>
+            <value>Hello, example application!</value>
+            <description>Just an sample configuration property</description>
+        </property>
+    </configuration>
+    <streams>
+        <stream>
+            <streamId>SAMPLE_STREAM_1</streamId>
+            <description>Sample output stream #1</description>
+            <validate>true</validate>
+            <timeseries>true</timeseries>
+            <columns>
+                <column>
+                    <name>metric</name>
+                    <type>string</type>
+                </column>
+                <column>
+                    <name>source</name>
+                    <type>string</type>
+                </column>
+                <column>
+                    <name>value</name>
+                    <type>double</type>
+                    <defaultValue>0.0</defaultValue>
+                </column>
+            </columns>
+        </stream>
+        <stream>
+            <streamId>SAMPLE_STREAM_2</streamId>
+            <description>Sample output stream #2</description>
+            <validate>true</validate>
+            <timeseries>true</timeseries>
+            <columns>
+                <column>
+                    <name>metric</name>
+                    <type>string</type>
+                </column>
+                <column>
+                    <name>source</name>
+                    <type>string</type>
+                </column>
+                <column>
+                    <name>value</name>
+                    <type>double</type>
+                    <defaultValue>0.0</defaultValue>
+                </column>
+            </columns>
+        </stream>
+    </streams>
+    <docs>
+        <install>
+# Step 1: Create source kafka topic named "${site}_example_source_topic"
+
+./bin/kafka-topics.sh --create --topic example_source_topic --replication-factor 1 --replication 1
+
+# Step 2: Set up data collector to flow data into kafka topic in
+
+./bin/logstash -f log_collector.conf
+
+## `log_collector.conf` sample as following:
+
+input {
+
+}
+filter {
+
+}
+output{
+
+}
+
+# Step 3: start application
+
+# Step 4: monitor with featured portal or alert with policies
+        </install>
+        <uninstall>
+# Step 1: stop and uninstall application
+# Step 2: delete kafka topic named "${site}_example_source_topic"
+# Step 3: stop logstash
+        </uninstall>
+    </docs>
+</application>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/eagle-examples/eagle-app-example/src/test/java/org/apache/eagle/app/example/ExampleApplicationTest.java
----------------------------------------------------------------------
diff --git a/eagle-examples/eagle-app-example/src/test/java/org/apache/eagle/app/example/ExampleApplicationTest.java b/eagle-examples/eagle-app-example/src/test/java/org/apache/eagle/app/example/ExampleApplicationTest.java
index 3002025..f29c6ff 100644
--- a/eagle-examples/eagle-app-example/src/test/java/org/apache/eagle/app/example/ExampleApplicationTest.java
+++ b/eagle-examples/eagle-app-example/src/test/java/org/apache/eagle/app/example/ExampleApplicationTest.java
@@ -18,8 +18,8 @@ package org.apache.eagle.app.example;
 
 import com.google.inject.Inject;
 import org.apache.eagle.app.resource.ApplicationResource;
-import org.apache.eagle.app.service.AppOperations;
-import org.apache.eagle.app.test.AppSimulator;
+import org.apache.eagle.app.service.ApplicationOperations;
+import org.apache.eagle.app.test.ApplicationSimulator;
 import org.apache.eagle.app.test.AppUnitTestRunner;
 import org.apache.eagle.metadata.model.ApplicationDesc;
 import org.apache.eagle.metadata.model.ApplicationEntity;
@@ -35,7 +35,7 @@ import java.util.Collection;
 public class ExampleApplicationTest {
     @Inject private SiteResource siteResource;
     @Inject private ApplicationResource applicationResource;
-    @Inject private AppSimulator simulator;
+    @Inject private ApplicationSimulator simulator;
 
     @Test
     public void testApplicationProviderLoading(){
@@ -64,13 +64,13 @@ public class ExampleApplicationTest {
         Assert.assertNotNull(siteEntity.getUuid());
 
         // Install application
-        ApplicationEntity applicationEntity = applicationResource.installApplication(new AppOperations.InstallOperation("test_site","EXAMPLE_APPLICATION", ApplicationEntity.Mode.LOCAL));
+        ApplicationEntity applicationEntity = applicationResource.installApplication(new ApplicationOperations.InstallOperation("test_site","EXAMPLE_APPLICATION", ApplicationEntity.Mode.LOCAL));
         // Start application
-        applicationResource.startApplication(new AppOperations.StartOperation(applicationEntity.getUuid()));
+        applicationResource.startApplication(new ApplicationOperations.StartOperation(applicationEntity.getUuid()));
         // Stop application
-        applicationResource.stopApplication(new AppOperations.StopOperation(applicationEntity.getUuid()));
+        applicationResource.stopApplication(new ApplicationOperations.StopOperation(applicationEntity.getUuid()));
         // Uninstall application
-        applicationResource.uninstallApplication(new AppOperations.UninstallOperation(applicationEntity.getUuid()));
+        applicationResource.uninstallApplication(new ApplicationOperations.UninstallOperation(applicationEntity.getUuid()));
         try {
             applicationResource.getApplicationEntityByUUID(applicationEntity.getUuid());
             Assert.fail("Application instance (UUID: "+applicationEntity.getUuid()+") should have been uninstalled");
@@ -88,4 +88,18 @@ public class ExampleApplicationTest {
     public void testApplicationQuickRunWithAppProvider(){
         simulator.submit(ExampleApplicationProvider.class);
     }
+
+    @Test
+    public void testApplicationQuickRunWithAppProvider2(){
+        simulator.submit(ExampleApplicationProvider2.class);
+    }
+
+
+    /**
+     * For DEBUG
+     * @param args
+     */
+    public static void main(String[] args){
+        ApplicationSimulator.getInstance().submit(ExampleApplicationProvider.class);
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/eagle-examples/eagle-app-example/src/test/java/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-examples/eagle-app-example/src/test/java/resources/application.conf b/eagle-examples/eagle-app-example/src/test/java/resources/application.conf
new file mode 100644
index 0000000..18677b2
--- /dev/null
+++ b/eagle-examples/eagle-app-example/src/test/java/resources/application.conf
@@ -0,0 +1,58 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+{
+	"coordinator" : {
+		"policiesPerBolt" : 5,
+		"boltParallelism" : 5,
+		"policyDefaultParallelism" : 5,
+		"boltLoadUpbound": 0.8,
+		"topologyLoadUpbound" : 0.8,
+		"numOfAlertBoltsPerTopology" : 5,
+		"zkConfig" : {
+			"zkQuorum" : "127.0.0.1:2181",
+			"zkRoot" : "/alert",
+			"zkSessionTimeoutMs" : 10000,
+			"connectionTimeoutMs" : 10000,
+			"zkRetryTimes" : 3,
+			"zkRetryInterval" : 3000
+		},
+		"metadataService" : {
+			"host" : "localhost",
+			"port" : 8080,
+			"context" : "/rest"
+		},
+		"metadataDynamicCheck" : {
+			"initDelayMillis" : 1000,
+			"delayMillis" : 30000
+		}
+	},
+	"metadata":{
+		"store": "org.apache.eagle.metadata.service.memory.MemoryMetadataStore"
+	},
+	"application":{
+		"sink":{
+			"type": "org.apache.eagle.app.sink.KafkaStreamSink",
+			"config": {
+				"kafkaBrokerHost" : "",
+				"kafkaZkConnection" : ""
+			}
+		},
+		"storm": {
+			"nimbusHost": "localhost"
+			"nimbusThriftPort": 6627
+		}
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/eagle-server/src/main/java/org/apache/eagle/server/ServerApplication.java
----------------------------------------------------------------------
diff --git a/eagle-server/src/main/java/org/apache/eagle/server/ServerApplication.java b/eagle-server/src/main/java/org/apache/eagle/server/ServerApplication.java
index c64fa83..d7bd14a 100644
--- a/eagle-server/src/main/java/org/apache/eagle/server/ServerApplication.java
+++ b/eagle-server/src/main/java/org/apache/eagle/server/ServerApplication.java
@@ -33,11 +33,12 @@ import org.apache.eagle.metadata.persistence.MetadataStore;
 import javax.servlet.DispatcherType;
 import java.util.EnumSet;
 
-public class ServerApplication extends Application<ServerConfig> {
+class ServerApplication extends Application<ServerConfig> {
+    private GuiceBundle<ServerConfig> guiceBundle;
 
     @Override
     public void initialize(Bootstrap<ServerConfig> bootstrap) {
-        GuiceBundle<ServerConfig> guiceBundle = GuiceBundle.<ServerConfig>newBuilder()
+        guiceBundle = GuiceBundle.<ServerConfig>newBuilder()
                 .addModule(new CommonGuiceModule())
                 .addModule(MetadataStore.getInstance())
                 .addModule(new ApplicationGuiceModule())
@@ -63,11 +64,14 @@ public class ServerApplication extends Application<ServerConfig> {
 
         // Swagger resources
         environment.jersey().register(ApiListingResource.class);
+
         BeanConfig swaggerConfig = new BeanConfig();
         swaggerConfig.setTitle(ServerConfig.getServerName());
-        swaggerConfig.setVersion(ServerConfig.getServerName());
+        swaggerConfig.setVersion(ServerConfig.getServerVersion());
         swaggerConfig.setBasePath(ServerConfig.getApiBasePath());
         swaggerConfig.setResourcePackage(ServerConfig.getResourcePackage());
+        swaggerConfig.setLicense(ServerConfig.getLicense());
+        swaggerConfig.setLicenseUrl(ServerConfig.getLicenseUrl());
         swaggerConfig.setScan(true);
 
         // Simple CORS filter

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/eagle-server/src/main/java/org/apache/eagle/server/ServerConfig.java
----------------------------------------------------------------------
diff --git a/eagle-server/src/main/java/org/apache/eagle/server/ServerConfig.java b/eagle-server/src/main/java/org/apache/eagle/server/ServerConfig.java
index 5ca9256..55c5c5d 100644
--- a/eagle-server/src/main/java/org/apache/eagle/server/ServerConfig.java
+++ b/eagle-server/src/main/java/org/apache/eagle/server/ServerConfig.java
@@ -20,33 +20,43 @@ import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import io.dropwizard.Configuration;
 
-public class ServerConfig extends Configuration {
+class ServerConfig extends Configuration {
     private final static String SERVER_NAME = "Apache Eagle";
     private final static String SERVER_VERSION = "0.5.0-incubating";
     private final static String API_BASE_PATH = "/rest/*";
     private final static String CONTEXT_PATH="/";
     private final static String RESOURCE_PACKAGE = "org.apache.eagle";
+    private final static String LICENSE = "Apache License (Version 2.0)";
+    private final static String LICENSE_URL = "http://www.apache.org/licenses/LICENSE-2.0";
 
     public Config getConfig(){
         return ConfigFactory.load();
     }
 
-    public static String getServerName(){
+    static String getServerName(){
         return SERVER_NAME;
     }
 
-    public static String getServerVersion(){
+    static String getServerVersion(){
         return SERVER_VERSION;
     }
 
-    public static String getApiBasePath(){
+    static String getApiBasePath(){
         return API_BASE_PATH;
     }
-    public static String getResourcePackage(){
+    static String getResourcePackage(){
         return RESOURCE_PACKAGE;
     }
 
-    public static String getContextPath(){
+    static String getContextPath(){
         return CONTEXT_PATH;
     }
+
+    public static String getLicense(){
+        return LICENSE;
+    }
+
+    static String getLicenseUrl(){
+        return LICENSE_URL;
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e73e35da/eagle-server/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-server/src/main/resources/application.conf b/eagle-server/src/main/resources/application.conf
index 48a2723..19b87b2 100644
--- a/eagle-server/src/main/resources/application.conf
+++ b/eagle-server/src/main/resources/application.conf
@@ -39,16 +39,13 @@
 			"delayMillis" : 30000
 		}
 	},
-	"datastore": {
-		"metadataDao": "org.apache.eagle.alert.metadata.impl.InMemMetadataDaoImpl"
-	}
 	"metadata":{
-		"store": "org.apache.eagle.metadata.persistence.MemoryMetadataStore"
+		"store": "org.apache.eagle.metadata.service.memory.MemoryMetadataStore"
 	},
 	"application":{
 		"sink":{
-			"type":"org.apache.eagle.app.sink.KafkaStreamSink"
-		}
+			"type": "org.apache.eagle.app.sink.KafkaStreamSink"
+		},
 		"storm": {
 			"nimbusHost": "localhost"
 			"nimbusThriftPort": 6627