You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2016/07/21 12:28:53 UTC

[09/11] incubator-eagle git commit: [EAGLE-382][EAGLE-385] Monitoring Application Framework Core

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e21b073f/eagle-core/eagle-app/eagle-app-base/src/test/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/test/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider b/eagle-core/eagle-app/eagle-app-base/src/test/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider
new file mode 100644
index 0000000..75096f8
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/test/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.eagle.app.TestApplicationImpl$Provider
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e21b073f/eagle-core/eagle-app/eagle-app-base/src/test/resources/TestApplicationConf.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/test/resources/TestApplicationConf.xml b/eagle-core/eagle-app/eagle-app-base/src/test/resources/TestApplicationConf.xml
new file mode 100644
index 0000000..4a31c5e
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/test/resources/TestApplicationConf.xml
@@ -0,0 +1,31 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<configuration>
+    <property>
+        <name>kafka.topic</name>
+        <displayName>Kafka Topic</displayName>
+        <value>hdfs_audit</value>
+        <description>Kafka Topic</description>
+    </property>
+    <property>
+        <name>zookeeper.server</name>
+        <displayName>Zookeeper Server</displayName>
+        <value>localhost:2181</value>
+        <description>Zookeeper Server address</description>
+    </property>
+</configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e21b073f/eagle-core/eagle-app/eagle-app-base/src/test/resources/TestApplicationMetadata.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/test/resources/TestApplicationMetadata.xml b/eagle-core/eagle-app/eagle-app-base/src/test/resources/TestApplicationMetadata.xml
new file mode 100644
index 0000000..36a64d0
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/test/resources/TestApplicationMetadata.xml
@@ -0,0 +1,103 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<application>
+    <type>TEST_APPLICATION</type>
+    <name>Test Monitoring Application</name>
+    <version>0.5.0-incubating</version>
+    <appClass>org.apache.eagle.app.TestApplicationImpl</appClass>
+    <viewPath>/apps/example</viewPath>
+    <configuration>
+        <property>
+            <name>message.content</name>
+            <displayName>Message</displayName>
+            <value>Hello, example application!</value>
+            <description>Just an sample configuration property</description>
+        </property>
+    </configuration>
+
+    <!-- Output components -->
+    <streams>
+        <stream>
+            <streamId>TEST_STREAM_1</streamId>
+            <description>Example output stream #1</description>
+            <validate>true</validate>
+            <timeseries>true</timeseries>
+            <columns>
+                <column>
+                    <name>metric</name>
+                    <type>string</type>
+                </column>
+                <column>
+                    <name>value</name>
+                    <type>double</type>
+                    <defaultValue>0.0</defaultValue>
+                </column>
+            </columns>
+        </stream>
+        <stream>
+            <streamId>TEST_STREAM_2</streamId>
+            <description>Example output stream #2</description>
+            <validate>true</validate>
+            <timeseries>true</timeseries>
+            <columns>
+                <column>
+                    <name>metric</name>
+                    <type>string</type>
+                </column>
+                <column>
+                    <name>value</name>
+                    <type>double</type>
+                    <defaultValue>0.0</defaultValue>
+                </column>
+            </columns>
+        </stream>
+    </streams>
+    <docs>
+        <install>
+# Step 1: Create source kafka topic named "${site}_example_source_topic"
+
+./bin/kafka-topics.sh --create --topic example_source_topic --replication-factor 1 --replication 1
+
+# Step 2: Set up data collector to flow data into kafka topic in
+
+./bin/logstash -f log_collector.conf
+
+## `log_collector.conf` sample as following:
+
+input {
+
+}
+filter {
+
+}
+output{
+
+}
+
+# Step 3: start application
+
+# Step 4: monitor with featured portal or alert with policies
+        </install>
+        <uninstall>
+# Step 1: stop and uninstall application
+# Step 2: delete kafka topic named "${site}_example_source_topic"
+# Step 3: stop logstash
+        </uninstall>
+    </docs>
+</application>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e21b073f/eagle-core/eagle-app/eagle-app-base/src/test/resources/TestStreamDefinitionConf.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/test/resources/TestStreamDefinitionConf.xml b/eagle-core/eagle-app/eagle-app-base/src/test/resources/TestStreamDefinitionConf.xml
new file mode 100644
index 0000000..0bb0012
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/test/resources/TestStreamDefinitionConf.xml
@@ -0,0 +1,37 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<streams>
+    <stream>
+        <streamId>OUTPUT_STREAM_1</streamId>
+        <description>Example output stream</description>
+        <validate>true</validate>
+        <timeseries>true</timeseries>
+        <columns>
+            <column>
+                <name>metric</name>
+                <type>string</type>
+            </column>
+            <column>
+                <name>value</name>
+                <type>double</type>
+                <defaultValue>0.0</defaultValue>
+            </column>
+        </columns>
+    </stream>
+</streams>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e21b073f/eagle-core/eagle-app/eagle-app-base/src/test/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/test/resources/application.conf b/eagle-core/eagle-app/eagle-app-base/src/test/resources/application.conf
new file mode 100644
index 0000000..49654c0
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/test/resources/application.conf
@@ -0,0 +1,54 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+{
+	"coordinator" : {
+		"policiesPerBolt" : 5,
+		"boltParallelism" : 5,
+		"policyDefaultParallelism" : 5,
+		"boltLoadUpbound": 0.8,
+		"topologyLoadUpbound" : 0.8,
+		"numOfAlertBoltsPerTopology" : 5,
+		"zkConfig" : {
+			"zkQuorum" : "127.0.0.1:2181",
+			"zkRoot" : "/alert",
+			"zkSessionTimeoutMs" : 10000,
+			"connectionTimeoutMs" : 10000,
+			"zkRetryTimes" : 3,
+			"zkRetryInterval" : 3000
+		},
+		"metadataService" : {
+			"host" : "localhost",
+			"port" : 8080,
+			"context" : "/rest"
+		},
+		"metadataDynamicCheck" : {
+			"initDelayMillis" : 1000,
+			"delayMillis" : 30000
+		}
+	},
+	"metadata":{
+		"store": "org.apache.eagle.metadata.persistence.MemoryMetadataStore"
+	},
+	"application":{
+		"sink":{
+			"type": "org.apache.eagle.app.sink.KafkaStreamSink"
+			"boostrap.server":"localhost:9092"
+		}
+		"provider":{
+			"loader":"org.apache.eagle.app.service.loader.ApplicationProviderSPILoader"
+		}
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e21b073f/eagle-core/eagle-app/eagle-app-base/src/test/resources/config_template.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/test/resources/config_template.xml b/eagle-core/eagle-app/eagle-app-base/src/test/resources/config_template.xml
new file mode 100644
index 0000000..da79abc
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/test/resources/config_template.xml
@@ -0,0 +1,35 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~  Unless required by applicable law or agreed to in writing, software
+  ~  distributed under the License is distributed on an "AS IS" BASIS,
+  ~  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~  See the License for the specific language governing permissions and
+  ~  limitations under the License.
+  ~
+  -->
+<configuration>
+    <obj>
+        some object
+    </obj>
+    <property>
+        <name>kafka.topic</name>
+        <displayName>Kafka Topic</displayName>
+        <value>hdfs_audit</value>
+        <description>Kafka Topic</description>
+    </property>
+    <property>
+        <name>zookeeper.server</name>
+        <displayName>Zookeeper Server</displayName>
+        <value>localhost:2181</value>
+        <description>Zookeeper Server address</description>
+    </property>
+</configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e21b073f/eagle-core/eagle-app/eagle-app-base/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/test/resources/log4j.properties b/eagle-core/eagle-app/eagle-app-base/src/test/resources/log4j.properties
new file mode 100644
index 0000000..fb13ad5
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/test/resources/log4j.properties
@@ -0,0 +1,21 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+log4j.rootLogger=DEBUG, stdout
+
+# standard output
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e21b073f/eagle-core/eagle-app/eagle-app-base/src/test/resources/providers.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/test/resources/providers.xml b/eagle-core/eagle-app/eagle-app-base/src/test/resources/providers.xml
new file mode 100644
index 0000000..3af7733
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-app-base/src/test/resources/providers.xml
@@ -0,0 +1,23 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<providers>
+    <provider>
+        <jarPath>target/apache-eagle-example-app.jar</jarPath>
+        <className>org.apache.eagle.app.TestApplicationImpl$Provider</className>
+    </provider>
+</providers>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e21b073f/eagle-core/eagle-app/eagle-application-service/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-application-service/pom.xml b/eagle-core/eagle-app/eagle-application-service/pom.xml
new file mode 100644
index 0000000..3e3e8f4
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-application-service/pom.xml
@@ -0,0 +1,54 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~  Unless required by applicable law or agreed to in writing, software
+  ~  distributed under the License is distributed on an "AS IS" BASIS,
+  ~  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~  See the License for the specific language governing permissions and
+  ~  limitations under the License.
+  ~
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>eagle-app-parent</artifactId>
+        <groupId>org.apache.eagle</groupId>
+        <version>0.5.0-incubating-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>eagle-application-service</artifactId>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.eagle</groupId>
+            <artifactId>eagle-policy-base</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.eagle</groupId>
+            <artifactId>eagle-service-base</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <configuration>
+                    <skipTests>true</skipTests>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e21b073f/eagle-core/eagle-app/eagle-application-service/src/main/java/org/apache/eagle/service/application/AppManagerConstants.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-application-service/src/main/java/org/apache/eagle/service/application/AppManagerConstants.java b/eagle-core/eagle-app/eagle-application-service/src/main/java/org/apache/eagle/service/application/AppManagerConstants.java
new file mode 100644
index 0000000..3aa3579
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-application-service/src/main/java/org/apache/eagle/service/application/AppManagerConstants.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.eagle.service.application;
+
+
+public class AppManagerConstants {
+    public final static String SITE_TAG = "site";
+    public final static String APPLICATION_TAG = "application";
+    public final static String OPERATION_TAG = "operation";
+    public final static String OPERATION_ID_TAG = "operationID";
+    public final static String TOPOLOGY_TAG = "topology";
+    public final static String FULLNAME = "fullName";
+    public final static String APPLICATION_ID = "id";
+
+    public final static String CLUSTER_ENV = "envContextConfig.env";
+    public final static String CLUSTER_URL = "envContextConfig.url";
+    public final static String DEFAULT_CLUSTER_URL = "http://sandbox.hortonworks.com:8744";
+
+    public final static String RUNNING_MODE = "envContextConfig.mode";
+    public final static String EAGLE_CLUSTER_STORM = "storm";
+    public final static String EAGLE_CLUSTER_SPARK = "spark";
+
+    public final static String APP_COMMAND_LOADER_ENABLED = "appCommandLoaderEnabled";
+    public final static String APP_COMMAND_LOADER_INTERVAL_SECS = "appCommandLoaderIntervalSecs";
+    public final static String APP_HEALTH_CHECK_INTERVAL_SECS = "appHealthCheckIntervalSecs";
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e21b073f/eagle-core/eagle-app/eagle-application-service/src/main/java/org/apache/eagle/service/application/ApplicationManagementResource.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-application-service/src/main/java/org/apache/eagle/service/application/ApplicationManagementResource.java b/eagle-core/eagle-app/eagle-application-service/src/main/java/org/apache/eagle/service/application/ApplicationManagementResource.java
new file mode 100644
index 0000000..6e4521d
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-application-service/src/main/java/org/apache/eagle/service/application/ApplicationManagementResource.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.eagle.service.application;
+
+
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
+import org.apache.eagle.policy.common.Constants;
+import org.apache.eagle.service.application.dao.ApplicationManagerDAO;
+import org.apache.eagle.service.application.dao.ApplicationManagerDaoImpl;
+import org.apache.eagle.service.application.entity.TopologyExecutionStatus;
+import org.apache.eagle.service.application.entity.TopologyOperationEntity;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.type.TypeFactory;
+
+import javax.ws.rs.*;
+import javax.ws.rs.core.MediaType;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.*;
+
+@Path(ApplicationManagementResource.ROOT_PATH)
+public class ApplicationManagementResource {
+    private final static ApplicationManagerDAO dao = new ApplicationManagerDaoImpl();
+    public final static String ROOT_PATH = "/app";
+
+    @Path("operation")
+    @POST
+    @Consumes(MediaType.APPLICATION_JSON)
+    @Produces(MediaType.APPLICATION_JSON)
+    public GenericServiceAPIResponseEntity createOperation(InputStream inputStream) {
+        GenericServiceAPIResponseEntity response = new GenericServiceAPIResponseEntity<>();
+        List<TopologyOperationEntity> operations = new LinkedList<>();
+        try {
+            List<TopologyOperationEntity> entities = (List<TopologyOperationEntity>) unmarshalOperationEntities(inputStream);
+            if (entities == null) {
+                throw new IllegalArgumentException("inputStream cannot convert to TopologyOperationEntity");
+            }
+            for (TopologyOperationEntity entity : entities) {
+                String status = dao.loadTopologyExecutionStatus(entity.getSite(), entity.getApplication(), entity.getTopology());
+                if(status == null) {
+                    throw new Exception(String.format("Fail to fetch the topology execution status by site=%s, application=%s, topology=%s", entity.getSite(), entity.getApplication(), entity.getTopology()));
+                }
+                int operationsInRunning = dao.loadTopologyOperationsInRunning(entity.getSite(), entity.getApplication(), entity.getTopology());
+                if(operationsInRunning !=0) {
+                    throw new Exception(operationsInRunning + "operations are running, please wait for a minute");
+                }
+                if (validateOperation(entity.getOperation(), status)) {
+                    Map<String, String> tags = entity.getTags();
+                    tags.put(AppManagerConstants.OPERATION_ID_TAG, UUID.randomUUID().toString());
+                    entity.setTags(tags);
+                    entity.setLastModifiedDate(System.currentTimeMillis());
+                    entity.setTimestamp(System.currentTimeMillis());
+                    operations.add(entity);
+                } else {
+                    throw new Exception(String.format("%s is an invalid operation, as the topology's current status is %s", entity.getOperation(), status));
+                }
+            }
+            response = dao.createOperation(operations);
+        } catch (Exception e) {
+            response.setSuccess(false);
+            response.setException(e);
+        }
+        return response;
+    }
+
+    private boolean validateOperation(String operation, String status) {
+        boolean ret = false;
+        switch (operation) {
+            case TopologyOperationEntity.OPERATION.START:
+                return TopologyExecutionStatus.isReadyToStart(status);
+            case TopologyOperationEntity.OPERATION.STOP:
+                return TopologyExecutionStatus.isReadyToStop(status);
+            default: break;
+        }
+        return ret;
+    }
+
+    private List<? extends TaggedLogAPIEntity> unmarshalOperationEntities(InputStream inputStream) throws IllegalAccessException, InstantiationException, IOException {
+        ObjectMapper objectMapper = new ObjectMapper();
+        return objectMapper.readValue(inputStream, TypeFactory.defaultInstance().constructCollectionType(LinkedList.class, TopologyOperationEntity.class));
+    }
+
+    @Path("topology")
+    @DELETE
+    @Consumes(MediaType.APPLICATION_JSON)
+    @Produces(MediaType.APPLICATION_JSON)
+    public GenericServiceAPIResponseEntity deleteTopology(@QueryParam("topology") String topology) {
+        return dao.deleteTopology(topology);
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e21b073f/eagle-core/eagle-app/eagle-application-service/src/main/java/org/apache/eagle/service/application/dao/ApplicationManagerDAO.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-application-service/src/main/java/org/apache/eagle/service/application/dao/ApplicationManagerDAO.java b/eagle-core/eagle-app/eagle-application-service/src/main/java/org/apache/eagle/service/application/dao/ApplicationManagerDAO.java
new file mode 100644
index 0000000..dfa261b
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-application-service/src/main/java/org/apache/eagle/service/application/dao/ApplicationManagerDAO.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.eagle.service.application.dao;
+
+
+import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
+import org.apache.eagle.service.application.entity.TopologyExecutionStatus;
+import org.apache.eagle.service.application.entity.TopologyOperationEntity;
+
+import java.util.List;
+
+public interface ApplicationManagerDAO {
+    String loadTopologyExecutionStatus(String site, String application, String topology);
+    int loadTopologyOperationsInRunning(String site, String application, String topology) throws Exception;
+    GenericServiceAPIResponseEntity createOperation(List<TopologyOperationEntity> entities) throws Exception;
+    GenericServiceAPIResponseEntity deleteTopology(String topology);
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e21b073f/eagle-core/eagle-app/eagle-application-service/src/main/java/org/apache/eagle/service/application/dao/ApplicationManagerDaoImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-application-service/src/main/java/org/apache/eagle/service/application/dao/ApplicationManagerDaoImpl.java b/eagle-core/eagle-app/eagle-application-service/src/main/java/org/apache/eagle/service/application/dao/ApplicationManagerDaoImpl.java
new file mode 100644
index 0000000..4881cf4
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-application-service/src/main/java/org/apache/eagle/service/application/dao/ApplicationManagerDaoImpl.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.eagle.service.application.dao;
+
+import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
+import org.apache.eagle.policy.common.Constants;
+import org.apache.eagle.service.application.entity.TopologyExecutionEntity;
+import org.apache.eagle.service.application.entity.TopologyOperationEntity;
+import org.apache.eagle.service.generic.GenericEntityServiceResource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+public class ApplicationManagerDaoImpl implements ApplicationManagerDAO {
+    private static Logger LOG = LoggerFactory.getLogger(ApplicationManagerDaoImpl.class);
+    GenericEntityServiceResource resource = new GenericEntityServiceResource();
+
+    @Override
+    public String loadTopologyExecutionStatus(String site, String application, String topology) {
+        String query = String.format("%s[@site=\"%s\" AND @application=\"%s\" AND @topology=\"%s\"]{*}", Constants.TOPOLOGY_EXECUTION_SERVICE_ENDPOINT_NAME, site, application, topology);
+        GenericServiceAPIResponseEntity<TopologyExecutionEntity> response = resource.search(query,  null, null, Integer.MAX_VALUE, null, false, false, 0L, 0, false, 0, null, false);
+        if(!response.isSuccess()) {
+            LOG.error(response.getException());
+            return null;
+        }
+        List<TopologyExecutionEntity> list = response.getObj();
+        if(list == null || list.size() != 1) {
+            LOG.error("ERROR: fetching 0 or more than 1 topology execution entities");
+            return null;
+        }
+        return list.get(0).getStatus();
+    }
+
+    @Override
+    public int loadTopologyOperationsInRunning(String site, String application, String topology) throws Exception {
+        int ret = 0;
+        String query = String.format("%s[@site=\"%s\" AND @application=\"%s\" AND @topology=\"%s\" AND (@status=\"%s\" OR @status=\"%s\")]{*}", Constants.TOPOLOGY_OPERATION_SERVICE_ENDPOINT_NAME, site, application, topology, TopologyOperationEntity.OPERATION_STATUS.INITIALIZED, TopologyOperationEntity.OPERATION_STATUS.PENDING);
+        GenericServiceAPIResponseEntity<TopologyExecutionEntity> response = resource.search(query, null, null, Integer.MAX_VALUE, null, false, false, 0L, 0, false, 0, null, false);
+        if(!response.isSuccess()) {
+            throw new Exception(response.getException());
+        }
+        if(response.getObj() != null && response.getObj().size() != 0) {
+            ret = response.getObj().size();
+        }
+        return ret;
+    }
+
+    @Override
+    public GenericServiceAPIResponseEntity createOperation(List<TopologyOperationEntity> entities) throws Exception {
+        if(entities.size() == 0) {
+            LOG.info("TopologyOperationEntity set is empty.");
+        }
+        GenericServiceAPIResponseEntity response = resource.updateEntities(entities, Constants.TOPOLOGY_OPERATION_SERVICE_ENDPOINT_NAME);
+        return response;
+    }
+
+    @Override
+    public GenericServiceAPIResponseEntity deleteTopology(String topology) {
+        String topologyQuery = Constants.TOPOLOGY_DESCRIPTION_SERVICE_ENDPOINT_NAME+ "[@topology=\"" + topology + "\"]{*}";
+        String executionQuery = Constants.TOPOLOGY_EXECUTION_SERVICE_ENDPOINT_NAME + "[@topology=\"" + topology + "\"]{*}";
+        int pageSize = Integer.MAX_VALUE;
+
+        GenericServiceAPIResponseEntity response = resource.deleteByQuery(topologyQuery, null, null, pageSize, null, false, false, 0L, 0, true, 0, null, false);
+        if(response.isSuccess()) {
+            response = resource.deleteByQuery(executionQuery, null, null, pageSize, null, false, false, 0L, 0, true, 0, null, false);
+        }
+        if(!response.isSuccess()) {
+            LOG.error(response.getException());
+        }
+        return response;
+    }
+
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e21b073f/eagle-core/eagle-app/eagle-application-service/src/main/java/org/apache/eagle/service/application/entity/ApplicationEntityRepo.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-application-service/src/main/java/org/apache/eagle/service/application/entity/ApplicationEntityRepo.java b/eagle-core/eagle-app/eagle-application-service/src/main/java/org/apache/eagle/service/application/entity/ApplicationEntityRepo.java
new file mode 100644
index 0000000..3226650
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-application-service/src/main/java/org/apache/eagle/service/application/entity/ApplicationEntityRepo.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.eagle.service.application.entity;
+
+
+import org.apache.eagle.log.entity.repo.EntityRepository;
+
+public class ApplicationEntityRepo  extends EntityRepository {
+    public ApplicationEntityRepo() {
+        this.registerEntity(TopologyDescriptionEntity.class);
+        this.registerEntity(TopologyExecutionEntity.class);
+        this.registerEntity(TopologyOperationEntity.class);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e21b073f/eagle-core/eagle-app/eagle-application-service/src/main/java/org/apache/eagle/service/application/entity/TopologyDescriptionEntity.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-application-service/src/main/java/org/apache/eagle/service/application/entity/TopologyDescriptionEntity.java b/eagle-core/eagle-app/eagle-application-service/src/main/java/org/apache/eagle/service/application/entity/TopologyDescriptionEntity.java
new file mode 100644
index 0000000..6442e6c
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-application-service/src/main/java/org/apache/eagle/service/application/entity/TopologyDescriptionEntity.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.eagle.service.application.entity;
+
+
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.meta.*;
+import org.apache.eagle.policy.common.Constants;
+import org.apache.eagle.service.application.AppManagerConstants;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+import java.util.HashMap;
+import java.util.Map;
+
+
+@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@Table("eagle_metadata")
+@ColumnFamily("f")
+@Prefix("topologyDescription")
+@Service(Constants.TOPOLOGY_DESCRIPTION_SERVICE_ENDPOINT_NAME)
+@JsonIgnoreProperties(ignoreUnknown = true)
+@TimeSeries(false)
+@Tags({"topology"})
+public class TopologyDescriptionEntity extends TaggedLogAPIEntity {
+    @Column("a")
+    private String exeClass;
+    @Column("b")
+    private String type;
+    @Column("c")
+    private String description;
+    @Column("d")
+    private String version;
+    private String context;
+    public String getContext() {
+        return context;
+    }
+
+    public void setContext(String context) {
+        this.context = context;
+    }
+
+    public String getExeClass() {
+        return exeClass;
+    }
+
+    public void setExeClass(String exeClass) {
+        this.exeClass = exeClass;
+        valueChanged("exeClass");
+    }
+
+    public String getType() {
+        return type;
+    }
+
+    public void setType(String type) {
+        this.type = type;
+        valueChanged("type");
+    }
+
+    public String getDescription() {
+        return description;
+    }
+
+    public void setDescription(String description) {
+        this.description = description;
+        valueChanged("description");
+    }
+
+    public String getVersion() {
+        return version;
+    }
+
+    public void setVersion(String version) {
+        this.version = version;
+        valueChanged("version");
+    }
+
+    public String getTopology() {
+        return this.getTags().get(AppManagerConstants.TOPOLOGY_TAG);
+    }
+
+    public final static class TYPE {
+        public final static String DYNAMIC = "DYNAMIC";
+        public final static String CLASS = "CLASS";
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e21b073f/eagle-core/eagle-app/eagle-application-service/src/main/java/org/apache/eagle/service/application/entity/TopologyExecutionEntity.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-application-service/src/main/java/org/apache/eagle/service/application/entity/TopologyExecutionEntity.java b/eagle-core/eagle-app/eagle-application-service/src/main/java/org/apache/eagle/service/application/entity/TopologyExecutionEntity.java
new file mode 100644
index 0000000..9991d3b
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-application-service/src/main/java/org/apache/eagle/service/application/entity/TopologyExecutionEntity.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.eagle.service.application.entity;
+
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.meta.*;
+import org.apache.eagle.policy.common.Constants;
+
+import org.apache.eagle.service.application.AppManagerConstants;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+import java.util.HashMap;
+import java.util.Map;
+
+
+@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@Table("eagle_metadata")
+@ColumnFamily("f")
+@Prefix("topologyExecution")
+@Service(Constants.TOPOLOGY_EXECUTION_SERVICE_ENDPOINT_NAME)
+@JsonIgnoreProperties(ignoreUnknown = true)
+@TimeSeries(false)
+@Tags({"site", "application", "topology"})
+public class TopologyExecutionEntity extends TaggedLogAPIEntity {
+    @Column("a")
+    private String fullName;
+    @Column("b")
+    private String url;
+    @Column("c")
+    private String description;
+    @Column("d")
+    private String status;
+    @Column("e")
+    private long lastModifiedDate;
+    @Column("f")
+    private String mode;
+    @Column("g")
+    private String environment;
+
+    public String getEnvironment() {
+        return environment;
+    }
+
+    public void setEnvironment(String environment) {
+        this.environment = environment;
+        valueChanged("environment");
+    }
+
+    public String getMode() {
+        return mode;
+    }
+
+    public void setMode(String mode) {
+        this.mode = mode;
+        valueChanged("mode");
+    }
+
+    public String getFullName() {
+        return fullName;
+    }
+
+    public void setFullName(String fullName) {
+        this.fullName = fullName;
+        valueChanged("fullName");
+    }
+
+    public String getUrl() {
+        return url;
+    }
+
+    public void setUrl(String url) {
+        this.url = url;
+        valueChanged("url");
+    }
+
+    public String getDescription() {
+        return description;
+    }
+
+    public void setDescription(String description) {
+        this.description = description;
+        valueChanged("description");
+    }
+
+    public String getStatus() {
+        return status;
+    }
+
+    public void setStatus(String status) {
+        this.status = status;
+        valueChanged("status");
+    }
+
+    public long getLastModifiedDate() {
+        return lastModifiedDate;
+    }
+
+    public void setLastModifiedDate(long lastModifiedDate) {
+        this.lastModifiedDate = lastModifiedDate;
+        valueChanged("lastModifiedDate");
+    }
+
+    public String getSite() {
+        return this.getTags().get(AppManagerConstants.SITE_TAG);
+    }
+
+    public String getApplication() {
+        return this.getTags().get(AppManagerConstants.APPLICATION_TAG);
+    }
+
+    public String getTopology() {
+        return this.getTags().get(AppManagerConstants.TOPOLOGY_TAG);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e21b073f/eagle-core/eagle-app/eagle-application-service/src/main/java/org/apache/eagle/service/application/entity/TopologyExecutionStatus.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-application-service/src/main/java/org/apache/eagle/service/application/entity/TopologyExecutionStatus.java b/eagle-core/eagle-app/eagle-application-service/src/main/java/org/apache/eagle/service/application/entity/TopologyExecutionStatus.java
new file mode 100644
index 0000000..f62ad8a
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-application-service/src/main/java/org/apache/eagle/service/application/entity/TopologyExecutionStatus.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.eagle.service.application.entity;
+
+
+public class TopologyExecutionStatus {
+    public final static String STOPPED = "STOPPED";
+    public final static String STARTED = "STARTED";
+    public final static String STARTING = "STARTING";
+    public final static String STOPPING = "STOPPING";
+    public final static String NEW = "NEW";
+
+    public static boolean isReadyToStart(String status){
+        return status.equals(STOPPED) || status.equals(NEW);
+    }
+
+    public static boolean isReadyToStop(String status){
+        return status.equals(STARTED);
+    }
+
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e21b073f/eagle-core/eagle-app/eagle-application-service/src/main/java/org/apache/eagle/service/application/entity/TopologyOperationEntity.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-application-service/src/main/java/org/apache/eagle/service/application/entity/TopologyOperationEntity.java b/eagle-core/eagle-app/eagle-application-service/src/main/java/org/apache/eagle/service/application/entity/TopologyOperationEntity.java
new file mode 100644
index 0000000..6d8f1a0
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-application-service/src/main/java/org/apache/eagle/service/application/entity/TopologyOperationEntity.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.eagle.service.application.entity;
+
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.apache.eagle.log.entity.meta.*;
+import org.apache.eagle.policy.common.Constants;
+
+import org.apache.eagle.service.application.AppManagerConstants;
+import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.map.annotate.JsonSerialize;
+
+import java.util.HashMap;
+import java.util.Map;
+
+
+@JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
+@Table("eagle_metadata")
+@ColumnFamily("f")
+@Prefix("topologyOperation")
+@Service(Constants.TOPOLOGY_OPERATION_SERVICE_ENDPOINT_NAME)
+@JsonIgnoreProperties(ignoreUnknown = true)
+@TimeSeries(true)
+@Tags({"site", "application", "topology", "operationID", "operation"})
+public class TopologyOperationEntity extends TaggedLogAPIEntity {
+    @Column("a")
+    private String status;
+    @Column("b")
+    private String message;
+    @Column("c")
+    private long lastModifiedDate;
+
+    public String getStatus() {
+        return status;
+    }
+
+    public void setStatus(String status) {
+        this.status = status;
+        valueChanged("status");
+    }
+
+    public String getMessage() {
+        return message;
+    }
+
+    public void setMessage(String message) {
+        this.message = message;
+        valueChanged("message");
+    }
+
+    public long getLastModifiedDate() {
+        return lastModifiedDate;
+    }
+
+    public void setLastModifiedDate(long lastModifiedDate) {
+        this.lastModifiedDate = lastModifiedDate;
+        valueChanged("lastModifiedDate");
+    }
+
+    public final static class OPERATION {
+        public final static String START = "START";
+        public final static String STOP = "STOP";
+        public final static String STATUS = "STATUS";
+    }
+
+    public final static class OPERATION_STATUS {
+        public final static String PENDING = "PENDING";
+        public final static String INITIALIZED = "INITIALIZED";
+        public final static String SUCCESS = "SUCCESS";
+        public final static String FAILED = "FAILED";
+    }
+
+    public String getSite() {
+        return this.getTags().get(AppManagerConstants.SITE_TAG);
+    }
+
+    public String getApplication() {
+        return this.getTags().get(AppManagerConstants.APPLICATION_TAG);
+    }
+
+    public String getTopology() {
+        return this.getTags().get(AppManagerConstants.TOPOLOGY_TAG);
+    }
+
+    public String getOperation() {
+        return this.getTags().get(AppManagerConstants.OPERATION_TAG);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e21b073f/eagle-core/eagle-app/eagle-stream-application-manager/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-stream-application-manager/pom.xml b/eagle-core/eagle-app/eagle-stream-application-manager/pom.xml
new file mode 100644
index 0000000..919188f
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-stream-application-manager/pom.xml
@@ -0,0 +1,142 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~  Unless required by applicable law or agreed to in writing, software
+  ~  distributed under the License is distributed on an "AS IS" BASIS,
+  ~  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~  See the License for the specific language governing permissions and
+  ~  limitations under the License.
+  ~
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>eagle-app-parent</artifactId>
+        <groupId>org.apache.eagle</groupId>
+        <version>0.5.0-incubating-SNAPSHOT</version>
+    </parent>
+
+    <modelVersion>4.0.0</modelVersion>
+    <artifactId>eagle-stream-application-manager</artifactId>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.eagle</groupId>
+            <artifactId>eagle-application-service</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.eagle</groupId>
+            <artifactId>eagle-stream-pipeline</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-core</artifactId>
+            <version>${storm.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>ch.qos.logback</groupId>
+                    <artifactId>logback-classic</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>log4j-over-slf4j</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.scala-lang</groupId>
+            <artifactId>scala-reflect</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.scala-lang</groupId>
+            <artifactId>scala-compiler</artifactId>
+            <version>${scala.version}.0</version>
+        </dependency>
+        <dependency>
+            <groupId>org.scalatest</groupId>
+            <artifactId>scalatest_${scala.version}</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.typesafe.akka</groupId>
+            <artifactId>akka-actor_${scala.version}</artifactId>
+            <version>${akka.actor.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.typesafe.akka</groupId>
+            <artifactId>akka-testkit_${scala.version}</artifactId>
+            <version>${akka.actor.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+            <scope>compile</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.scala-tools</groupId>
+                <artifactId>maven-scala-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>scala-compile-first</id>
+                        <phase>process-resources</phase>
+                        <goals>
+                            <goal>add-source</goal>
+                            <goal>compile</goal>
+                        </goals>
+                    </execution>
+                    <execution>
+                        <id>scala-test-compile</id>
+                        <phase>process-test-resources</phase>
+                        <goals>
+                            <goal>testCompile</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <configuration>
+                    <skipTests>true</skipTests>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.scalatest</groupId>
+                <artifactId>scalatest-maven-plugin</artifactId>
+                <configuration>
+                    <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
+                    <junitxml>.</junitxml>
+                    <filereports>TestSuite.txt</filereports>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>test</id>
+                        <goals>
+                            <goal>test</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e21b073f/eagle-core/eagle-app/eagle-stream-application-manager/src/main/java/org/apache/eagle/stream/application/TopologyException.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-stream-application-manager/src/main/java/org/apache/eagle/stream/application/TopologyException.java b/eagle-core/eagle-app/eagle-stream-application-manager/src/main/java/org/apache/eagle/stream/application/TopologyException.java
new file mode 100644
index 0000000..d382629
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-stream-application-manager/src/main/java/org/apache/eagle/stream/application/TopologyException.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.eagle.stream.application;
+
+
+public class TopologyException extends Exception {
+    public TopologyException(String s, Exception e) { super(s,e); }
+    public TopologyException(Exception e) { super(e); }
+    public TopologyException(String s) { super(s); }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e21b073f/eagle-core/eagle-app/eagle-stream-application-manager/src/main/java/org/apache/eagle/stream/application/TopologyExecutable.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-stream-application-manager/src/main/java/org/apache/eagle/stream/application/TopologyExecutable.java b/eagle-core/eagle-app/eagle-stream-application-manager/src/main/java/org/apache/eagle/stream/application/TopologyExecutable.java
new file mode 100644
index 0000000..8f625c7
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-stream-application-manager/src/main/java/org/apache/eagle/stream/application/TopologyExecutable.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.eagle.stream.application;
+
+
+import com.typesafe.config.Config;
+
+
+public interface TopologyExecutable {
+    void submit(String topology, Config config);
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e21b073f/eagle-core/eagle-app/eagle-stream-application-manager/src/main/java/org/apache/eagle/stream/application/TopologyFactory.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-stream-application-manager/src/main/java/org/apache/eagle/stream/application/TopologyFactory.java b/eagle-core/eagle-app/eagle-stream-application-manager/src/main/java/org/apache/eagle/stream/application/TopologyFactory.java
new file mode 100644
index 0000000..e32f48e
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-stream-application-manager/src/main/java/org/apache/eagle/stream/application/TopologyFactory.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.eagle.stream.application;
+
+
+import com.typesafe.config.Config;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+
+public final class TopologyFactory {
+    public static Logger LOG = LoggerFactory.getLogger(TopologyFactory.class);
+    private final static Map<String, TopologyExecutable> topologyCache = Collections.synchronizedMap(new HashMap<String, TopologyExecutable>());
+    public static TopologyExecutable getTopologyInstance(String topologyClass) throws TopologyException {
+        TopologyExecutable instance;
+        if(topologyCache.containsKey(topologyClass)){
+            instance = topologyCache.get(topologyClass);
+        } else {
+            try {
+                LOG.info("load class " + topologyClass + "with classLoader " + TopologyFactory.class.getClassLoader().toString());
+                instance = (TopologyExecutable) Class.forName(topologyClass).newInstance();
+                topologyCache.put(topologyClass, instance);
+            } catch (ClassNotFoundException e) {
+                throw new TopologyException("Topology in type of " + topologyClass + " is not found",e);
+            } catch (InstantiationException | IllegalAccessException e) {
+                throw new TopologyException(e);
+            }
+        }
+        return instance;
+    }
+
+    public static void submit(String topologyClass, Config config) throws TopologyException {
+        TopologyExecutable topology = getTopologyInstance(topologyClass);
+        topology.submit(topologyClass, config);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e21b073f/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/AbstractDynamicApplication.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/AbstractDynamicApplication.scala b/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/AbstractDynamicApplication.scala
new file mode 100644
index 0000000..3e918cc
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/AbstractDynamicApplication.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.eagle.stream.application
+
+import com.typesafe.config.Config
+import org.apache.eagle.datastream.core.StreamContext
+import org.apache.eagle.stream.pipeline.Pipeline
+
+
+trait AbstractDynamicApplication extends TopologyExecutable {
+  def compileStream(application: String, config: Config): StreamContext = {
+    val pipeline = Pipeline.parseStringWithConfig(application, config)
+    Pipeline.compile(pipeline)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e21b073f/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/ApplicationManager.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/ApplicationManager.scala b/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/ApplicationManager.scala
new file mode 100644
index 0000000..bbfaedd
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/ApplicationManager.scala
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.eagle.stream.application
+
+import java.util
+
+import com.google.common.base.Preconditions
+import org.apache.eagle.service.application.entity.TopologyExecutionStatus
+import org.apache.eagle.stream.application.impl.StormExecutionPlatform
+import org.slf4j.{Logger, LoggerFactory}
+
+import scala.collection.JavaConversions
+
+
+object ApplicationManager {
+  private val LOG: Logger = LoggerFactory.getLogger(ApplicationManager.getClass)
+  private val workerMap: util.Map[AnyRef, TaskExecutor] = new util.TreeMap[AnyRef, TaskExecutor]
+
+  def getWorkerMap: util.Map[AnyRef, TaskExecutor] = {
+    return workerMap
+  }
+
+  def submit(id: AnyRef, runnable: Runnable): TaskExecutor = {
+    if (workerMap.containsKey(id)) {
+      val executor: Thread = workerMap.get(id)
+      if (!executor.isAlive || executor.getState.equals() ) {
+        LOG.info("Replacing dead executor: {}", executor)
+        workerMap.remove(id)
+      }
+      else {
+        throw new IllegalArgumentException("Duplicated id '" + id + "'")
+      }
+    }
+    val worker: TaskExecutor = new TaskExecutor(runnable)
+    LOG.info("Registering new executor %s: %s".format(id, worker))
+    workerMap.put(id, worker)
+    worker.setName(id.toString)
+    worker.setDaemon(true)
+    worker.start
+    return worker
+  }
+
+  def get(id: AnyRef): TaskExecutor = {
+    Preconditions.checkArgument(workerMap.containsKey(id))
+    return workerMap.get(id)
+  }
+
+  @throws(classOf[Exception])
+  def stop(id: AnyRef): TaskExecutor = {
+    val worker: TaskExecutor = get(id)
+    worker.interrupt
+    //this.workerMap.remove(id)
+    return worker
+  }
+
+  def getWorkerStatus(state: Thread.State): String = {
+    if (whereIn(state, java.lang.Thread.State.RUNNABLE, java.lang.Thread.State.TIMED_WAITING, java.lang.Thread.State.WAITING)) {
+      return TopologyExecutionStatus.STARTED
+    }
+    else if (whereIn(state, java.lang.Thread.State.NEW)) {
+      return TopologyExecutionStatus.STARTING
+    }
+    else if (whereIn(state, java.lang.Thread.State.TERMINATED)) {
+      return TopologyExecutionStatus.STOPPED
+    }
+    throw new IllegalStateException("Unknown state: " + state)
+  }
+
+  def getTopologyStatus(status: String): String = {
+    if(whereIn(status, StormExecutionPlatform.KILLED))
+      return TopologyExecutionStatus.STOPPING
+    return TopologyExecutionStatus.STARTED
+  }
+
+  private def whereIn(status: String, inStatuses: String*): Boolean = {
+    for (_status <- inStatuses) {
+      if (_status.equalsIgnoreCase(status)) {
+        return true
+      }
+    }
+    return false
+  }
+  private def whereIn(state: Thread.State, inStates: Thread.State*): Boolean = {
+    for (_state <- inStates) {
+      if (_state eq state) {
+        return true
+      }
+    }
+    return false
+  }
+
+  def remove(id: AnyRef) {
+    val executor: TaskExecutor = this.get(id)
+    if (executor.isAlive) {
+      throw new RuntimeException("Failed to remove alive executor '" + id + "'")
+    }
+    else {
+      this.workerMap.remove(id)
+    }
+  }
+
+  def stopAll(): Unit ={
+    JavaConversions.collectionAsScalaIterable(workerMap.values()) foreach { worker =>
+      if(!worker.isInterrupted) {
+        worker.interrupt()
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e21b073f/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/ApplicationManagerUtils.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/ApplicationManagerUtils.scala b/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/ApplicationManagerUtils.scala
new file mode 100644
index 0000000..4c2df77
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/ApplicationManagerUtils.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.eagle.stream.application
+
+import com.typesafe.config.Config
+import org.apache.eagle.service.application.AppManagerConstants
+import org.apache.eagle.service.application.entity.TopologyExecutionEntity
+
+
+object ApplicationManagerUtils {
+
+  def generateTopologyFullName(topologyExecution: TopologyExecutionEntity) = {
+    val fullName = "eagle-%s-%s-%s".format(topologyExecution.getSite, topologyExecution.getApplication, topologyExecution.getTopology)
+    fullName
+  }
+
+  def buildStormTopologyURL(config: Config, topologyID: String): String = {
+    val clusterURL = if(config.hasPath(AppManagerConstants.CLUSTER_URL)) config.getString(AppManagerConstants.CLUSTER_URL) else AppManagerConstants.DEFAULT_CLUSTER_URL
+    val topologyURL = clusterURL + "/topology.html?id=" + topologyID
+    topologyURL
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e21b073f/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/ApplicationSchedulerAsyncDAO.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/ApplicationSchedulerAsyncDAO.scala b/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/ApplicationSchedulerAsyncDAO.scala
new file mode 100644
index 0000000..ae0f6e8
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/ApplicationSchedulerAsyncDAO.scala
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.eagle.stream.application
+
+import java.util
+import java.util.concurrent.Callable
+
+import akka.dispatch.Futures
+import com.typesafe.config.Config
+import org.apache.eagle.alert.entity.SiteApplicationServiceEntity
+import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity
+import org.apache.eagle.policy.common.Constants
+import org.apache.eagle.service.application.entity.{TopologyDescriptionEntity, TopologyExecutionEntity, TopologyExecutionStatus, TopologyOperationEntity}
+import org.apache.eagle.service.client.EagleServiceConnector
+import org.apache.eagle.service.client.impl.EagleServiceClientImpl
+import org.slf4j.{Logger, LoggerFactory}
+
+import scala.collection.JavaConversions
+import scala.concurrent.ExecutionContext
+
+
+class ApplicationSchedulerAsyncDAO(config: Config, ex: ExecutionContext) {
+  private val LOG: Logger = LoggerFactory.getLogger(classOf[ApplicationSchedulerAsyncDAO])
+  private val connector: EagleServiceConnector = new EagleServiceConnector(config)
+
+  def getEagleServiceClient(): EagleServiceClientImpl = {
+    return new EagleServiceClientImpl(connector)
+  }
+
+  def readOperationsByStatus(status: String) = {
+    Futures.future(new Callable[util.List[TopologyOperationEntity]]{
+      override def call(): util.List[TopologyOperationEntity] = {
+        val client = getEagleServiceClient()
+        val query = "%s[@status=\"%s\"]{*}".format(Constants.TOPOLOGY_OPERATION_SERVICE_ENDPOINT_NAME, status)
+        val response: GenericServiceAPIResponseEntity[TopologyOperationEntity] = client.search(query).pageSize(Int.MaxValue).send()
+        if(client != null) client.close()
+        if(!response.isSuccess || response.getObj == null)
+          throw new Exception(s"Fail to load operations with status $status")
+        response.getObj
+      }
+    }, ex)
+  }
+
+  def loadAllTopologyExecutionEntities() = {
+    Futures.future(new Callable[util.List[TopologyExecutionEntity]]{
+      override def call(): util.List[TopologyExecutionEntity] = {
+        val client = getEagleServiceClient()
+        val query = "%s[@status != \"%s\"]{*}".format(Constants.TOPOLOGY_EXECUTION_SERVICE_ENDPOINT_NAME, TopologyExecutionStatus.NEW)
+        val response: GenericServiceAPIResponseEntity[TopologyExecutionEntity] = client.search(query).pageSize(Int.MaxValue).send()
+        if(client != null) client.close()
+        if(!response.isSuccess || response.getObj == null) throw new Exception(response.getException)
+        response.getObj
+      }
+    }, ex)
+  }
+
+  def loadTopologyExecutionByName(site: String, appName: String, topologyName: String) = {
+    Futures.future(new Callable[TopologyExecutionEntity]{
+      override def call(): TopologyExecutionEntity = {
+        val client = getEagleServiceClient()
+        val query = "%s[@site=\"%s\" AND @application=\"%s\" AND @topology=\"%s\"]{*}".format(Constants.TOPOLOGY_EXECUTION_SERVICE_ENDPOINT_NAME, site, appName, topologyName)
+        LOG.info(s"query=$query")
+        val response: GenericServiceAPIResponseEntity[TopologyExecutionEntity] = client.search(query).pageSize(Int.MaxValue).send()
+        if(client != null) client.close()
+        if(!response.isSuccess || response.getObj == null)
+          throw new Exception(s"Fail to load topologyExecutionEntity with application=$appName topology=$topologyName due to Exception: ${response.getException}")
+        if(response.getObj.size() == 0 || response.getObj.size() > 1) {
+          throw new Exception(s"Get 0 or more than 1 topologyExecutionEntity with application=$appName topology=$topologyName")
+        }
+        response.getObj.get(0)
+      }
+    }, ex)
+  }
+
+  def loadTopologyDescriptionByName(site: String, application: String, topologyName: String) = {
+    Futures.future(new Callable[TopologyDescriptionEntity]{
+      override def call(): TopologyDescriptionEntity = {
+        val client = getEagleServiceClient()
+        var query = "%s[@topology=\"%s\"]{*}".format(Constants.TOPOLOGY_DESCRIPTION_SERVICE_ENDPOINT_NAME, topologyName)
+        val response: GenericServiceAPIResponseEntity[TopologyDescriptionEntity] = client.search(query).pageSize(Int.MaxValue).send()
+        if(!response.isSuccess || response.getObj == null || response.getObj.size() == 0)
+          throw new Exception(s"Fail to load TopologyDescriptionEntity with site=$site application=$application topology=$topologyName due to Exception: ${response.getException}")
+        val topologyDescriptionEntity = response.getObj.get(0)
+
+        query = "%s[@site=\"%s\" AND @application=\"%s\"]{*}".format(Constants.SITE_APPLICATION_SERVICE_ENDPOINT_NAME, site, application)
+        val configResponse: GenericServiceAPIResponseEntity[SiteApplicationServiceEntity] = client.search(query).pageSize(Int.MaxValue).send()
+        if (client != null) client.close()
+        if(!configResponse.isSuccess || configResponse.getObj == null || configResponse.getObj.size() == 0)
+          throw new Exception(s"Fail to load topology configuration with query=$query due to Exception: ${configResponse.getException}")
+        val siteApplicationEntity = configResponse.getObj.get(0)
+        topologyDescriptionEntity.setContext(siteApplicationEntity.getConfig)
+        topologyDescriptionEntity
+      }
+    }, ex)
+  }
+
+  def updateOperationStatus(operation: TopologyOperationEntity) = {
+    Futures.future(new Callable[GenericServiceAPIResponseEntity[String]]{
+      override def call(): GenericServiceAPIResponseEntity[String] = {
+        if(LOG.isDebugEnabled()) LOG.debug(s"Updating status of command[$operation] as ${operation.getStatus}")
+        val client = getEagleServiceClient()
+        operation.setLastModifiedDate(System.currentTimeMillis())
+        val response= client.update(java.util.Arrays.asList(operation), classOf[TopologyOperationEntity])
+        if(client != null) client.close()
+        if(response.isSuccess) {
+          LOG.info(s"Updated operation status [$operation] as: ${operation.getStatus}")
+        } else {
+          LOG.error(s"Failed to update status as ${operation.getStatus} of command[$operation]")
+          throw new RuntimeException(s"Failed to update command due to exception: ${response.getException}")
+        }
+        response
+      }
+    }, ex)
+  }
+
+  def updateTopologyExecutionStatus(topology: TopologyExecutionEntity) = {
+    Futures.future(new Callable[GenericServiceAPIResponseEntity[String]]{
+      override def call(): GenericServiceAPIResponseEntity[String] = {
+        if(LOG.isDebugEnabled()) LOG.debug(s"Updating status of app[$topology] as ${topology.getStatus}")
+        val client = getEagleServiceClient()
+        topology.setLastModifiedDate(System.currentTimeMillis())
+        if(client != null) client.close()
+        val response= client.update(java.util.Arrays.asList(topology), classOf[TopologyExecutionEntity])
+        if(response.isSuccess) {
+          LOG.info(s"Updated status application[$topology] as: ${topology.getStatus}")
+        } else {
+          LOG.error(s"Failed to update status as ${topology.getStatus} of application[$topology] due to ${response.getException}")
+        }
+        response
+      }
+    }, ex)
+  }
+
+  def clearPendingOperations() = {
+    Futures.future(new Callable[GenericServiceAPIResponseEntity[String]]{
+      override def call(): GenericServiceAPIResponseEntity[String] = {
+        LOG.info("start to clear operation")
+        val query: String = "%s[@status=\"%s\"]{*}".format(Constants.TOPOLOGY_OPERATION_SERVICE_ENDPOINT_NAME, TopologyOperationEntity.OPERATION_STATUS.PENDING)
+        val client = getEagleServiceClient()
+        val response: GenericServiceAPIResponseEntity[TopologyOperationEntity] = client.search(query).pageSize(Int.MaxValue).send()
+        var ret: GenericServiceAPIResponseEntity[String] = new GenericServiceAPIResponseEntity[String]()
+        if (response.isSuccess && response.getObj.size != 0) {
+          val pendingOperations: util.List[TopologyOperationEntity] = response.getObj
+          val failedOperations: util.List[TopologyOperationEntity] = new util.ArrayList[TopologyOperationEntity]
+          JavaConversions.collectionAsScalaIterable(pendingOperations) foreach { operation =>
+            operation.setStatus(TopologyOperationEntity.OPERATION_STATUS.FAILED)
+            failedOperations.add(operation)
+          }
+          ret = client.update(failedOperations, Constants.TOPOLOGY_OPERATION_SERVICE_ENDPOINT_NAME)
+          if (client != null) client.close()
+          if (ret.isSuccess) {
+            LOG.info(s"Successfully clear ${failedOperations.size()} pending operations")
+          } else {
+            LOG.error(s"Failed to clear pending operations due to exception:" + ret.getException)
+          }
+        }
+        ret
+      }
+    }, ex)
+  }
+}
+
+

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/e21b073f/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/ExecutionPlatform.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/ExecutionPlatform.scala b/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/ExecutionPlatform.scala
new file mode 100644
index 0000000..88271bb
--- /dev/null
+++ b/eagle-core/eagle-app/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/ExecutionPlatform.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.eagle.stream.application
+
+import com.typesafe.config.Config
+import org.apache.eagle.service.application.entity.{TopologyExecutionEntity, TopologyDescriptionEntity}
+
+
+trait ExecutionPlatform {
+  def start(topology: TopologyDescriptionEntity, topologyExecution: TopologyExecutionEntity, config: Config)
+  def stop(topologyExecution: TopologyExecutionEntity, config: Config)
+  def status(topologyExecutions: java.util.List[TopologyExecutionEntity], config: Config)
+  def status(topologyExecution: TopologyExecutionEntity, config: Config)
+}