You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by mw...@apache.org on 2016/07/25 09:36:40 UTC

[03/47] incubator-eagle git commit: EAGLE-271 Topology management in remote/local mode including start/stop operations

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ecf75b28/eagle-core/eagle-application-management/eagle-stream-application-manager/src/main/java/org/apache/eagle/stream/application/TopologyFactory.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-application-management/eagle-stream-application-manager/src/main/java/org/apache/eagle/stream/application/TopologyFactory.java b/eagle-core/eagle-application-management/eagle-stream-application-manager/src/main/java/org/apache/eagle/stream/application/TopologyFactory.java
new file mode 100644
index 0000000..e32f48e
--- /dev/null
+++ b/eagle-core/eagle-application-management/eagle-stream-application-manager/src/main/java/org/apache/eagle/stream/application/TopologyFactory.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.eagle.stream.application;
+
+
+import com.typesafe.config.Config;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+
+public final class TopologyFactory {
+    public static Logger LOG = LoggerFactory.getLogger(TopologyFactory.class);
+    private final static Map<String, TopologyExecutable> topologyCache = Collections.synchronizedMap(new HashMap<String, TopologyExecutable>());
+    public static TopologyExecutable getTopologyInstance(String topologyClass) throws TopologyException {
+        TopologyExecutable instance;
+        if(topologyCache.containsKey(topologyClass)){
+            instance = topologyCache.get(topologyClass);
+        } else {
+            try {
+                LOG.info("load class " + topologyClass + "with classLoader " + TopologyFactory.class.getClassLoader().toString());
+                instance = (TopologyExecutable) Class.forName(topologyClass).newInstance();
+                topologyCache.put(topologyClass, instance);
+            } catch (ClassNotFoundException e) {
+                throw new TopologyException("Topology in type of " + topologyClass + " is not found",e);
+            } catch (InstantiationException | IllegalAccessException e) {
+                throw new TopologyException(e);
+            }
+        }
+        return instance;
+    }
+
+    public static void submit(String topologyClass, Config config) throws TopologyException {
+        TopologyExecutable topology = getTopologyInstance(topologyClass);
+        topology.submit(topologyClass, config);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ecf75b28/eagle-core/eagle-application-management/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/AbstractDynamicApplication.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-application-management/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/AbstractDynamicApplication.scala b/eagle-core/eagle-application-management/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/AbstractDynamicApplication.scala
new file mode 100644
index 0000000..3e918cc
--- /dev/null
+++ b/eagle-core/eagle-application-management/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/AbstractDynamicApplication.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.eagle.stream.application
+
+import com.typesafe.config.Config
+import org.apache.eagle.datastream.core.StreamContext
+import org.apache.eagle.stream.pipeline.Pipeline
+
+
+trait AbstractDynamicApplication extends TopologyExecutable {
+  def compileStream(application: String, config: Config): StreamContext = {
+    val pipeline = Pipeline.parseStringWithConfig(application, config)
+    Pipeline.compile(pipeline)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ecf75b28/eagle-core/eagle-application-management/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/ApplicationManager.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-application-management/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/ApplicationManager.scala b/eagle-core/eagle-application-management/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/ApplicationManager.scala
new file mode 100644
index 0000000..bbfaedd
--- /dev/null
+++ b/eagle-core/eagle-application-management/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/ApplicationManager.scala
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.eagle.stream.application
+
+import java.util
+
+import com.google.common.base.Preconditions
+import org.apache.eagle.service.application.entity.TopologyExecutionStatus
+import org.apache.eagle.stream.application.impl.StormExecutionPlatform
+import org.slf4j.{Logger, LoggerFactory}
+
+import scala.collection.JavaConversions
+
+
+object ApplicationManager {
+  private val LOG: Logger = LoggerFactory.getLogger(ApplicationManager.getClass)
+  private val workerMap: util.Map[AnyRef, TaskExecutor] = new util.TreeMap[AnyRef, TaskExecutor]
+
+  def getWorkerMap: util.Map[AnyRef, TaskExecutor] = {
+    return workerMap
+  }
+
+  def submit(id: AnyRef, runnable: Runnable): TaskExecutor = {
+    if (workerMap.containsKey(id)) {
+      val executor: Thread = workerMap.get(id)
+      if (!executor.isAlive || executor.getState.equals() ) {
+        LOG.info("Replacing dead executor: {}", executor)
+        workerMap.remove(id)
+      }
+      else {
+        throw new IllegalArgumentException("Duplicated id '" + id + "'")
+      }
+    }
+    val worker: TaskExecutor = new TaskExecutor(runnable)
+    LOG.info("Registering new executor %s: %s".format(id, worker))
+    workerMap.put(id, worker)
+    worker.setName(id.toString)
+    worker.setDaemon(true)
+    worker.start
+    return worker
+  }
+
+  def get(id: AnyRef): TaskExecutor = {
+    Preconditions.checkArgument(workerMap.containsKey(id))
+    return workerMap.get(id)
+  }
+
+  @throws(classOf[Exception])
+  def stop(id: AnyRef): TaskExecutor = {
+    val worker: TaskExecutor = get(id)
+    worker.interrupt
+    //this.workerMap.remove(id)
+    return worker
+  }
+
+  def getWorkerStatus(state: Thread.State): String = {
+    if (whereIn(state, java.lang.Thread.State.RUNNABLE, java.lang.Thread.State.TIMED_WAITING, java.lang.Thread.State.WAITING)) {
+      return TopologyExecutionStatus.STARTED
+    }
+    else if (whereIn(state, java.lang.Thread.State.NEW)) {
+      return TopologyExecutionStatus.STARTING
+    }
+    else if (whereIn(state, java.lang.Thread.State.TERMINATED)) {
+      return TopologyExecutionStatus.STOPPED
+    }
+    throw new IllegalStateException("Unknown state: " + state)
+  }
+
+  def getTopologyStatus(status: String): String = {
+    if(whereIn(status, StormExecutionPlatform.KILLED))
+      return TopologyExecutionStatus.STOPPING
+    return TopologyExecutionStatus.STARTED
+  }
+
+  private def whereIn(status: String, inStatuses: String*): Boolean = {
+    for (_status <- inStatuses) {
+      if (_status.equalsIgnoreCase(status)) {
+        return true
+      }
+    }
+    return false
+  }
+  private def whereIn(state: Thread.State, inStates: Thread.State*): Boolean = {
+    for (_state <- inStates) {
+      if (_state eq state) {
+        return true
+      }
+    }
+    return false
+  }
+
+  def remove(id: AnyRef) {
+    val executor: TaskExecutor = this.get(id)
+    if (executor.isAlive) {
+      throw new RuntimeException("Failed to remove alive executor '" + id + "'")
+    }
+    else {
+      this.workerMap.remove(id)
+    }
+  }
+
+  def stopAll(): Unit ={
+    JavaConversions.collectionAsScalaIterable(workerMap.values()) foreach { worker =>
+      if(!worker.isInterrupted) {
+        worker.interrupt()
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ecf75b28/eagle-core/eagle-application-management/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/ApplicationManagerUtils.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-application-management/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/ApplicationManagerUtils.scala b/eagle-core/eagle-application-management/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/ApplicationManagerUtils.scala
new file mode 100644
index 0000000..4c2df77
--- /dev/null
+++ b/eagle-core/eagle-application-management/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/ApplicationManagerUtils.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.eagle.stream.application
+
+import com.typesafe.config.Config
+import org.apache.eagle.service.application.AppManagerConstants
+import org.apache.eagle.service.application.entity.TopologyExecutionEntity
+
+
+object ApplicationManagerUtils {
+
+  def generateTopologyFullName(topologyExecution: TopologyExecutionEntity) = {
+    val fullName = "eagle-%s-%s-%s".format(topologyExecution.getSite, topologyExecution.getApplication, topologyExecution.getTopology)
+    fullName
+  }
+
+  def buildStormTopologyURL(config: Config, topologyID: String): String = {
+    val clusterURL = if(config.hasPath(AppManagerConstants.CLUSTER_URL)) config.getString(AppManagerConstants.CLUSTER_URL) else AppManagerConstants.DEFAULT_CLUSTER_URL
+    val topologyURL = clusterURL + "/topology.html?id=" + topologyID
+    topologyURL
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ecf75b28/eagle-core/eagle-application-management/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/ApplicationSchedulerAsyncDAO.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-application-management/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/ApplicationSchedulerAsyncDAO.scala b/eagle-core/eagle-application-management/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/ApplicationSchedulerAsyncDAO.scala
new file mode 100644
index 0000000..ae0f6e8
--- /dev/null
+++ b/eagle-core/eagle-application-management/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/ApplicationSchedulerAsyncDAO.scala
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.eagle.stream.application
+
+import java.util
+import java.util.concurrent.Callable
+
+import akka.dispatch.Futures
+import com.typesafe.config.Config
+import org.apache.eagle.alert.entity.SiteApplicationServiceEntity
+import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity
+import org.apache.eagle.policy.common.Constants
+import org.apache.eagle.service.application.entity.{TopologyDescriptionEntity, TopologyExecutionEntity, TopologyExecutionStatus, TopologyOperationEntity}
+import org.apache.eagle.service.client.EagleServiceConnector
+import org.apache.eagle.service.client.impl.EagleServiceClientImpl
+import org.slf4j.{Logger, LoggerFactory}
+
+import scala.collection.JavaConversions
+import scala.concurrent.ExecutionContext
+
+
+class ApplicationSchedulerAsyncDAO(config: Config, ex: ExecutionContext) {
+  private val LOG: Logger = LoggerFactory.getLogger(classOf[ApplicationSchedulerAsyncDAO])
+  private val connector: EagleServiceConnector = new EagleServiceConnector(config)
+
+  def getEagleServiceClient(): EagleServiceClientImpl = {
+    return new EagleServiceClientImpl(connector)
+  }
+
+  def readOperationsByStatus(status: String) = {
+    Futures.future(new Callable[util.List[TopologyOperationEntity]]{
+      override def call(): util.List[TopologyOperationEntity] = {
+        val client = getEagleServiceClient()
+        val query = "%s[@status=\"%s\"]{*}".format(Constants.TOPOLOGY_OPERATION_SERVICE_ENDPOINT_NAME, status)
+        val response: GenericServiceAPIResponseEntity[TopologyOperationEntity] = client.search(query).pageSize(Int.MaxValue).send()
+        if(client != null) client.close()
+        if(!response.isSuccess || response.getObj == null)
+          throw new Exception(s"Fail to load operations with status $status")
+        response.getObj
+      }
+    }, ex)
+  }
+
+  def loadAllTopologyExecutionEntities() = {
+    Futures.future(new Callable[util.List[TopologyExecutionEntity]]{
+      override def call(): util.List[TopologyExecutionEntity] = {
+        val client = getEagleServiceClient()
+        val query = "%s[@status != \"%s\"]{*}".format(Constants.TOPOLOGY_EXECUTION_SERVICE_ENDPOINT_NAME, TopologyExecutionStatus.NEW)
+        val response: GenericServiceAPIResponseEntity[TopologyExecutionEntity] = client.search(query).pageSize(Int.MaxValue).send()
+        if(client != null) client.close()
+        if(!response.isSuccess || response.getObj == null) throw new Exception(response.getException)
+        response.getObj
+      }
+    }, ex)
+  }
+
+  def loadTopologyExecutionByName(site: String, appName: String, topologyName: String) = {
+    Futures.future(new Callable[TopologyExecutionEntity]{
+      override def call(): TopologyExecutionEntity = {
+        val client = getEagleServiceClient()
+        val query = "%s[@site=\"%s\" AND @application=\"%s\" AND @topology=\"%s\"]{*}".format(Constants.TOPOLOGY_EXECUTION_SERVICE_ENDPOINT_NAME, site, appName, topologyName)
+        LOG.info(s"query=$query")
+        val response: GenericServiceAPIResponseEntity[TopologyExecutionEntity] = client.search(query).pageSize(Int.MaxValue).send()
+        if(client != null) client.close()
+        if(!response.isSuccess || response.getObj == null)
+          throw new Exception(s"Fail to load topologyExecutionEntity with application=$appName topology=$topologyName due to Exception: ${response.getException}")
+        if(response.getObj.size() == 0 || response.getObj.size() > 1) {
+          throw new Exception(s"Get 0 or more than 1 topologyExecutionEntity with application=$appName topology=$topologyName")
+        }
+        response.getObj.get(0)
+      }
+    }, ex)
+  }
+
+  def loadTopologyDescriptionByName(site: String, application: String, topologyName: String) = {
+    Futures.future(new Callable[TopologyDescriptionEntity]{
+      override def call(): TopologyDescriptionEntity = {
+        val client = getEagleServiceClient()
+        var query = "%s[@topology=\"%s\"]{*}".format(Constants.TOPOLOGY_DESCRIPTION_SERVICE_ENDPOINT_NAME, topologyName)
+        val response: GenericServiceAPIResponseEntity[TopologyDescriptionEntity] = client.search(query).pageSize(Int.MaxValue).send()
+        if(!response.isSuccess || response.getObj == null || response.getObj.size() == 0)
+          throw new Exception(s"Fail to load TopologyDescriptionEntity with site=$site application=$application topology=$topologyName due to Exception: ${response.getException}")
+        val topologyDescriptionEntity = response.getObj.get(0)
+
+        query = "%s[@site=\"%s\" AND @application=\"%s\"]{*}".format(Constants.SITE_APPLICATION_SERVICE_ENDPOINT_NAME, site, application)
+        val configResponse: GenericServiceAPIResponseEntity[SiteApplicationServiceEntity] = client.search(query).pageSize(Int.MaxValue).send()
+        if (client != null) client.close()
+        if(!configResponse.isSuccess || configResponse.getObj == null || configResponse.getObj.size() == 0)
+          throw new Exception(s"Fail to load topology configuration with query=$query due to Exception: ${configResponse.getException}")
+        val siteApplicationEntity = configResponse.getObj.get(0)
+        topologyDescriptionEntity.setContext(siteApplicationEntity.getConfig)
+        topologyDescriptionEntity
+      }
+    }, ex)
+  }
+
+  def updateOperationStatus(operation: TopologyOperationEntity) = {
+    Futures.future(new Callable[GenericServiceAPIResponseEntity[String]]{
+      override def call(): GenericServiceAPIResponseEntity[String] = {
+        if(LOG.isDebugEnabled()) LOG.debug(s"Updating status of command[$operation] as ${operation.getStatus}")
+        val client = getEagleServiceClient()
+        operation.setLastModifiedDate(System.currentTimeMillis())
+        val response= client.update(java.util.Arrays.asList(operation), classOf[TopologyOperationEntity])
+        if(client != null) client.close()
+        if(response.isSuccess) {
+          LOG.info(s"Updated operation status [$operation] as: ${operation.getStatus}")
+        } else {
+          LOG.error(s"Failed to update status as ${operation.getStatus} of command[$operation]")
+          throw new RuntimeException(s"Failed to update command due to exception: ${response.getException}")
+        }
+        response
+      }
+    }, ex)
+  }
+
+  def updateTopologyExecutionStatus(topology: TopologyExecutionEntity) = {
+    Futures.future(new Callable[GenericServiceAPIResponseEntity[String]]{
+      override def call(): GenericServiceAPIResponseEntity[String] = {
+        if(LOG.isDebugEnabled()) LOG.debug(s"Updating status of app[$topology] as ${topology.getStatus}")
+        val client = getEagleServiceClient()
+        topology.setLastModifiedDate(System.currentTimeMillis())
+        if(client != null) client.close()
+        val response= client.update(java.util.Arrays.asList(topology), classOf[TopologyExecutionEntity])
+        if(response.isSuccess) {
+          LOG.info(s"Updated status application[$topology] as: ${topology.getStatus}")
+        } else {
+          LOG.error(s"Failed to update status as ${topology.getStatus} of application[$topology] due to ${response.getException}")
+        }
+        response
+      }
+    }, ex)
+  }
+
+  def clearPendingOperations() = {
+    Futures.future(new Callable[GenericServiceAPIResponseEntity[String]]{
+      override def call(): GenericServiceAPIResponseEntity[String] = {
+        LOG.info("start to clear operation")
+        val query: String = "%s[@status=\"%s\"]{*}".format(Constants.TOPOLOGY_OPERATION_SERVICE_ENDPOINT_NAME, TopologyOperationEntity.OPERATION_STATUS.PENDING)
+        val client = getEagleServiceClient()
+        val response: GenericServiceAPIResponseEntity[TopologyOperationEntity] = client.search(query).pageSize(Int.MaxValue).send()
+        var ret: GenericServiceAPIResponseEntity[String] = new GenericServiceAPIResponseEntity[String]()
+        if (response.isSuccess && response.getObj.size != 0) {
+          val pendingOperations: util.List[TopologyOperationEntity] = response.getObj
+          val failedOperations: util.List[TopologyOperationEntity] = new util.ArrayList[TopologyOperationEntity]
+          JavaConversions.collectionAsScalaIterable(pendingOperations) foreach { operation =>
+            operation.setStatus(TopologyOperationEntity.OPERATION_STATUS.FAILED)
+            failedOperations.add(operation)
+          }
+          ret = client.update(failedOperations, Constants.TOPOLOGY_OPERATION_SERVICE_ENDPOINT_NAME)
+          if (client != null) client.close()
+          if (ret.isSuccess) {
+            LOG.info(s"Successfully clear ${failedOperations.size()} pending operations")
+          } else {
+            LOG.error(s"Failed to clear pending operations due to exception:" + ret.getException)
+          }
+        }
+        ret
+      }
+    }, ex)
+  }
+}
+
+

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ecf75b28/eagle-core/eagle-application-management/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/ExecutionPlatform.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-application-management/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/ExecutionPlatform.scala b/eagle-core/eagle-application-management/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/ExecutionPlatform.scala
new file mode 100644
index 0000000..88271bb
--- /dev/null
+++ b/eagle-core/eagle-application-management/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/ExecutionPlatform.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.eagle.stream.application
+
+import com.typesafe.config.Config
+import org.apache.eagle.service.application.entity.{TopologyExecutionEntity, TopologyDescriptionEntity}
+
+
+trait ExecutionPlatform {
+  def start(topology: TopologyDescriptionEntity, topologyExecution: TopologyExecutionEntity, config: Config)
+  def stop(topologyExecution: TopologyExecutionEntity, config: Config)
+  def status(topologyExecutions: java.util.List[TopologyExecutionEntity], config: Config)
+  def status(topologyExecution: TopologyExecutionEntity, config: Config)
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ecf75b28/eagle-core/eagle-application-management/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/ExecutionPlatformFactory.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-application-management/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/ExecutionPlatformFactory.scala b/eagle-core/eagle-application-management/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/ExecutionPlatformFactory.scala
new file mode 100644
index 0000000..6b9c033
--- /dev/null
+++ b/eagle-core/eagle-application-management/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/ExecutionPlatformFactory.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.eagle.stream.application
+
+import org.apache.eagle.service.application.AppManagerConstants
+import org.apache.eagle.stream.application.impl.StormExecutionPlatform
+import org.slf4j.{LoggerFactory, Logger}
+
+import scala.collection.mutable
+
+
+object ExecutionPlatformFactory {
+  private val LOG: Logger = LoggerFactory.getLogger(ExecutionPlatformFactory.getClass)
+
+  var managerCache = new mutable.HashMap[String, ExecutionPlatform] with
+    mutable.SynchronizedMap[String, ExecutionPlatform]
+
+  def getApplicationManager(managerType: String): ExecutionPlatform = {
+    if(managerCache.contains(managerType)) {
+      managerCache.get(managerType).get
+    } else {
+      managerType match {
+        case AppManagerConstants.EAGLE_CLUSTER_STORM =>
+          val instance = new StormExecutionPlatform
+          managerCache.put(managerType, instance)
+          instance
+        case _ =>
+          throw new Exception(s"Invalid managerType $managerType")
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ecf75b28/eagle-core/eagle-application-management/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/TaskExecutor.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-application-management/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/TaskExecutor.scala b/eagle-core/eagle-application-management/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/TaskExecutor.scala
new file mode 100644
index 0000000..07737ac
--- /dev/null
+++ b/eagle-core/eagle-application-management/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/TaskExecutor.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.eagle.stream.application
+
+import org.codehaus.jackson.annotate.JsonIgnore
+
+class TaskExecutor(runnable: Runnable) extends Thread(runnable) {
+
+  @JsonIgnore override def getContextClassLoader: ClassLoader = {
+    return super.getContextClassLoader
+  }
+
+  @JsonIgnore override def getUncaughtExceptionHandler: Thread.UncaughtExceptionHandler = {
+    return super.getUncaughtExceptionHandler
+  }
+
+  def shutdown {
+    this.interrupt
+  }
+
+  def restart {
+    this.interrupt
+    this.start
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ecf75b28/eagle-core/eagle-application-management/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/impl/StormDynamicTopology.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-application-management/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/impl/StormDynamicTopology.scala b/eagle-core/eagle-application-management/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/impl/StormDynamicTopology.scala
new file mode 100644
index 0000000..7d52649
--- /dev/null
+++ b/eagle-core/eagle-application-management/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/impl/StormDynamicTopology.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.eagle.stream.application.impl
+
+import com.typesafe.config.Config
+import org.apache.eagle.datastream.ExecutionEnvironments
+import org.apache.eagle.datastream.storm.StormExecutionEnvironment
+import org.apache.eagle.stream.application.AbstractDynamicApplication
+import org.slf4j.LoggerFactory
+
+
+object StormDynamicTopology extends AbstractDynamicApplication {
+  val LOG = LoggerFactory.getLogger(classOf[AbstractDynamicApplication])
+
+  override def submit(application: String, config: Config) {
+    val stream = compileStream(application, config)
+    var ret = true
+
+    try {
+      val stormEnv = ExecutionEnvironments.getWithConfig[StormExecutionEnvironment](stream.getConfig)
+      stream.submit(stormEnv)
+    } catch {
+      case e: Throwable =>
+        ret = false
+        LOG.error(e.toString)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ecf75b28/eagle-core/eagle-application-management/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/impl/StormExecutionPlatform.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-application-management/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/impl/StormExecutionPlatform.scala b/eagle-core/eagle-application-management/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/impl/StormExecutionPlatform.scala
new file mode 100644
index 0000000..5b1bb48
--- /dev/null
+++ b/eagle-core/eagle-application-management/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/impl/StormExecutionPlatform.scala
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.eagle.stream.application.impl
+
+import java.net.URLDecoder
+import java.nio.file.{Files, Paths}
+
+import backtype.storm.generated.InvalidTopologyException
+import backtype.storm.utils.{NimbusClient, Utils}
+import com.typesafe.config.{Config, ConfigFactory}
+import org.apache.eagle.common.config.EagleConfigConstants
+import org.apache.eagle.service.application.AppManagerConstants
+import org.apache.eagle.service.application.entity.{TopologyDescriptionEntity, TopologyExecutionEntity, TopologyExecutionStatus}
+import org.apache.eagle.stream.application.{ApplicationManager, ApplicationManagerUtils, ExecutionPlatform, TopologyFactory}
+import org.slf4j.LoggerFactory
+
+import scala.collection.JavaConversions
+
+object StormExecutionPlatform {
+  val ACTIVE: String = "ACTIVE"
+  val INACTIVE: String = "INACTIVE"
+  val KILLED: String = "KILLED"
+  val REBALANCING: String = "REBALANCING"
+}
+
+class StormExecutionPlatform extends ExecutionPlatform {
+  val LOG = LoggerFactory.getLogger(classOf[StormExecutionPlatform])
+
+  private def getNimbusClient(appConfig: com.typesafe.config.Config): NimbusClient = {
+    val conf = Utils.readStormConfig().asInstanceOf[java.util.HashMap[String, Object]]
+    conf.putAll(Utils.readCommandLineOpts().asInstanceOf[java.util.HashMap[String, Object]])
+
+    if(appConfig.hasPath("envContextConfig.nimbusHost")) {
+      LOG.info(s"Setting ${backtype.storm.Config.NIMBUS_HOST} as ${appConfig.getString("envContextConfig.nimbusHost")}")
+      conf.put(backtype.storm.Config.NIMBUS_HOST, appConfig.getString("envContextConfig.nimbusHost"))
+    }
+
+    if(appConfig.hasPath("envContextConfig.nimbusThriftPort")) {
+      LOG.info(s"Setting ${backtype.storm.Config.NIMBUS_THRIFT_PORT} as ${appConfig.getString("envContextConfig.nimbusThriftPort")}")
+      conf.put(backtype.storm.Config.NIMBUS_THRIFT_PORT, appConfig.getNumber("envContextConfig.nimbusThriftPort"))
+    }
+    NimbusClient.getConfiguredClient(conf)
+  }
+
+  def startLocal(topologyName: String, topology: TopologyDescriptionEntity, topologyExecution: TopologyExecutionEntity, config: Config): Unit = {
+    val worker: Thread = ApplicationManager.submit(topologyName, new Runnable {
+      override def run(): Unit = {
+        try {
+          val topologyType = topology.getType.toUpperCase()
+          topologyType match {
+            case TopologyDescriptionEntity.TYPE.CLASS =>
+              TopologyFactory.submit(topology.getExeClass, config)
+            case TopologyDescriptionEntity.TYPE.DYNAMIC =>
+              StormDynamicTopology.submit(topology.getExeClass, config)
+            case m@_ =>
+              LOG.error("Unsupported topology type: " + topology.getType)
+          }
+        } catch {
+          case ex: Throwable =>
+            LOG.error(s"topology $topologyName in local mode is interrupted with ${ex.toString}")
+        }
+      }
+    })
+    topologyExecution.setFullName(topologyName)
+    topologyExecution.setStatus(ApplicationManager.getWorkerStatus(worker.getState))
+    topologyExecution.setDescription("Running inside " + worker.toString + " in local mode")
+  }
+
+  override def start(topology: TopologyDescriptionEntity, topologyExecution: TopologyExecutionEntity, config: Config): Unit = {
+    val stormJarPath: String = URLDecoder.decode(classOf[ExecutionPlatform].getProtectionDomain.getCodeSource.getLocation.getPath, "UTF-8")
+    if (stormJarPath == null || !Files.exists(Paths.get(stormJarPath)) || !stormJarPath.endsWith(".jar")) {
+      val errMsg = s"storm jar file $stormJarPath does not exists, or is a invalid jar file"
+      LOG.error(errMsg)
+      throw new Exception(errMsg)
+    }
+    LOG.info(s"Detected a storm.jar location at: $stormJarPath")
+    System.setProperty("storm.jar", stormJarPath)
+
+    val fullName = ApplicationManagerUtils.generateTopologyFullName(topologyExecution)
+    val extConfigStr = "envContextConfig.topologyName=%s".format(fullName)
+    val extConfig = ConfigFactory.parseString(extConfigStr)
+    val newConfig = extConfig.withFallback(config)
+
+    val mode = if(config.hasPath(AppManagerConstants.RUNNING_MODE)) config.getString(AppManagerConstants.RUNNING_MODE) else EagleConfigConstants.LOCAL_MODE
+    topologyExecution.setMode(mode)
+    if (topologyExecution.getMode.equalsIgnoreCase(EagleConfigConstants.LOCAL_MODE)) {
+      startLocal(fullName, topology, topologyExecution, newConfig)
+      return
+    }
+
+    val topologyType = topology.getType.toUpperCase()
+    topologyType match {
+      case TopologyDescriptionEntity.TYPE.CLASS =>
+        TopologyFactory.submit(topology.getExeClass, newConfig)
+      case TopologyDescriptionEntity.TYPE.DYNAMIC =>
+        StormDynamicTopology.submit(topology.getExeClass, newConfig)
+      case m@_ =>
+        throw new InvalidTopologyException("Unsupported topology type: " + topology.getType)
+    }
+    topologyExecution.setFullName(fullName)
+    //topologyExecution.setStatus(TopologyExecutionStatus.STARTED)
+  }
+
+  override def stop(topologyExecution: TopologyExecutionEntity, config: Config): Unit = {
+    val name: String = ApplicationManagerUtils.generateTopologyFullName(topologyExecution)
+
+    if(topologyExecution.getMode.equalsIgnoreCase(EagleConfigConstants.LOCAL_MODE)) {
+      stopLocal(name, topologyExecution)
+    } else {
+      getNimbusClient(config).getClient.killTopology(name)
+      topologyExecution.setStatus(TopologyExecutionStatus.STOPPING)
+      //topologyExecution.setDescription("")
+    }
+  }
+
+  def stopLocal(name: String, topologyExecution: TopologyExecutionEntity): Unit = {
+      val taskWorker = ApplicationManager.stop(name)
+      topologyExecution.setStatus(ApplicationManager.getWorkerStatus(taskWorker.getState))
+      topologyExecution.setDescription(s"topology status is ${taskWorker.getState}")
+      /*try{
+        ApplicationManager.remove(name)
+      } catch {
+        case ex: IllegalArgumentException =>
+          LOG.warn(s"ApplicationManager.remove($name) failed as it has been removed")
+      }*/
+  }
+
+
+  def getTopology(topologyName: String, config: Config) = {
+    val topologySummery = getNimbusClient(config).getClient.getClusterInfo.get_topologies
+    JavaConversions.collectionAsScalaIterable(topologySummery).find { t => t.get_name.equals(topologyName) }
+    match {
+      case Some(t) => Some(t)
+      case None    => None
+    }
+  }
+
+  override def status(topologyExecution: TopologyExecutionEntity, config: Config): Unit = {
+    val name: String = ApplicationManagerUtils.generateTopologyFullName(topologyExecution)
+
+    if(topologyExecution.getMode.equalsIgnoreCase(EagleConfigConstants.LOCAL_MODE)) {
+      statusLocal(name, topologyExecution)
+    } else {
+      val topology = getTopology(name, config)
+      topology match {
+        case Some(topology) =>
+          topologyExecution.setStatus(ApplicationManager.getTopologyStatus(topology.get_status()))
+          topologyExecution.setUrl(ApplicationManagerUtils.buildStormTopologyURL(config, topology.get_id()))
+          topologyExecution.setDescription(topology.toString)
+        case None =>
+          topologyExecution.setStatus(TopologyExecutionStatus.STOPPED)
+          topologyExecution.setUrl("")
+          topologyExecution.setDescription(s"Fail to find topology: $name")
+      }
+    }
+  }
+
+  def statusLocal(name: String, topologyExecution: TopologyExecutionEntity): Unit = {
+    try {
+      val currentStatus = topologyExecution.getStatus()
+      val newStatus = ApplicationManager.getWorkerStatus(ApplicationManager.get(name).getState())
+      if (!currentStatus.equals(newStatus)) {
+        LOG.info("Status of topology: %s changed from %s to %s".format(topologyExecution.getFullName, currentStatus, newStatus))
+        topologyExecution.setStatus(newStatus)
+        topologyExecution.setDescription(String.format("Status of topology: %s changed from %s to %s", name, currentStatus, newStatus))
+      } else if(currentStatus.equalsIgnoreCase(TopologyExecutionStatus.STOPPED)) {
+        ApplicationManager.remove(name)
+      }
+    }catch {
+      case ex: Throwable =>
+        topologyExecution.setDescription("")
+        topologyExecution.setStatus(TopologyExecutionStatus.STOPPED)
+    }
+  }
+
+  override def status(topologyExecutions: java.util.List[TopologyExecutionEntity], config: Config): Unit = {
+    JavaConversions.collectionAsScalaIterable(topologyExecutions) foreach {
+      topologyExecution => status(topologyExecution, config)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ecf75b28/eagle-core/eagle-application-management/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/scheduler/AppCommandExecutor.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-application-management/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/scheduler/AppCommandExecutor.scala b/eagle-core/eagle-application-management/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/scheduler/AppCommandExecutor.scala
new file mode 100644
index 0000000..8fbf60d
--- /dev/null
+++ b/eagle-core/eagle-application-management/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/scheduler/AppCommandExecutor.scala
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.eagle.stream.application.scheduler
+
+import java.util.concurrent.Callable
+
+import akka.actor.{Actor, ActorLogging}
+import akka.dispatch.Futures
+import com.typesafe.config.{Config, ConfigFactory, ConfigParseOptions, ConfigSyntax}
+import org.apache.eagle.common.config.EagleConfigConstants
+import org.apache.eagle.service.application.AppManagerConstants
+import org.apache.eagle.service.application.entity.TopologyOperationEntity.OPERATION
+import org.apache.eagle.service.application.entity.{TopologyExecutionEntity, TopologyExecutionStatus, TopologyOperationEntity}
+import org.apache.eagle.stream.application.{ApplicationSchedulerAsyncDAO, ExecutionPlatformFactory}
+
+import scala.collection.JavaConversions
+import scala.util.{Failure, Success}
+
+
+private[scheduler] class AppCommandExecutor extends Actor with ActorLogging {
+  @volatile var _config: Config = _
+  @volatile var _dao: ApplicationSchedulerAsyncDAO = _
+
+  import context.dispatcher
+
+  def start(topologyExecution: TopologyExecutionEntity, topologyOperation: TopologyOperationEntity) = {
+    val options: ConfigParseOptions = ConfigParseOptions.defaults.setSyntax(ConfigSyntax.PROPERTIES).setAllowMissing(false)
+    _dao.loadTopologyDescriptionByName(topologyOperation.getSite, topologyOperation.getApplication, topologyOperation.getTopology) onComplete {
+      case Success(topology) =>
+        val topologyConfig: Config = ConfigFactory.parseString(topology.getContext, options)
+
+        if(!topologyConfig.hasPath(EagleConfigConstants.APP_CONFIG)) {
+          topologyOperation.setMessage("Fail to detect topology configuration")
+          topologyOperation.setStatus(TopologyOperationEntity.OPERATION_STATUS.FAILED)
+          _dao.updateOperationStatus(topologyOperation)
+        } else {
+          val config = topologyConfig.getConfig(EagleConfigConstants.APP_CONFIG).withFallback(_config)
+          val clusterType = if(config.hasPath(AppManagerConstants.CLUSTER_ENV)) config.getString(AppManagerConstants.CLUSTER_ENV) else AppManagerConstants.EAGLE_CLUSTER_STORM
+          topologyExecution.setEnvironment(clusterType)
+
+          Futures.future(new Callable[TopologyExecutionEntity]{
+            override def call(): TopologyExecutionEntity = {
+              topologyExecution.setStatus(TopologyExecutionStatus.STARTING)
+              _dao.updateTopologyExecutionStatus(topologyExecution)
+              ExecutionPlatformFactory.getApplicationManager(clusterType).start(topology, topologyExecution, config)
+              topologyOperation.setStatus(TopologyOperationEntity.OPERATION_STATUS.SUCCESS)
+              topologyExecution
+            }
+          }, context.dispatcher) onComplete {
+            case Success(topologyExecutionEntity) =>
+              topologyOperation.setStatus(TopologyOperationEntity.OPERATION_STATUS.SUCCESS)
+              updateStatus(topologyExecution, topologyOperation)
+            case Failure(ex) =>
+              topologyOperation.setMessage(ex.getMessage)
+              topologyOperation.setStatus(TopologyOperationEntity.OPERATION_STATUS.FAILED)
+              _dao.updateOperationStatus(topologyOperation)
+          }
+        }
+
+      case Failure(ex) =>
+        topologyOperation.setMessage(ex.getMessage)
+        topologyOperation.setStatus(TopologyOperationEntity.OPERATION_STATUS.FAILED)
+        _dao.updateOperationStatus(topologyOperation)
+    }
+  }
+
+  def stop(topologyExecution: TopologyExecutionEntity, topologyOperation: TopologyOperationEntity) = {
+    val clusterType = topologyExecution.getEnvironment
+
+    Futures.future(new Callable[TopologyExecutionEntity]{
+      override def call(): TopologyExecutionEntity = {
+        topologyExecution.setStatus(TopologyExecutionStatus.STOPPING)
+        _dao.updateTopologyExecutionStatus(topologyExecution)
+        ExecutionPlatformFactory.getApplicationManager(clusterType).stop(topologyExecution, _config)
+        topologyOperation.setStatus(TopologyOperationEntity.OPERATION_STATUS.SUCCESS)
+        topologyExecution
+      }
+    }, context.dispatcher) onComplete {
+      case Success(topologyExecutionEntity) =>
+        topologyOperation.setStatus(TopologyOperationEntity.OPERATION_STATUS.SUCCESS)
+        updateStatus(topologyExecution, topologyOperation)
+      case Failure(ex) =>
+        topologyOperation.setMessage(ex.toString)
+        topologyOperation.setStatus(TopologyOperationEntity.OPERATION_STATUS.FAILED)
+        _dao.updateOperationStatus(topologyOperation)
+    }
+  }
+
+  def status(topologyExecution: TopologyExecutionEntity) = {
+    val clusterType = topologyExecution.getEnvironment
+
+    Futures.future(new Callable[TopologyExecutionEntity]{
+      override def call(): TopologyExecutionEntity = {
+        ExecutionPlatformFactory.getApplicationManager(clusterType).status(topologyExecution, _config)
+        topologyExecution
+      }
+    }, context.dispatcher) onComplete {
+      case _ =>
+        _dao.updateTopologyExecutionStatus(topologyExecution)
+    }
+  }
+
+  def updateStatus(topologyExecution: TopologyExecutionEntity, topologyOperation: TopologyOperationEntity): Unit = {
+    _dao.updateOperationStatus(topologyOperation)
+    _dao.updateTopologyExecutionStatus(topologyExecution)
+  }
+
+  def execute(topologyExecution: TopologyExecutionEntity, topologyOperation: TopologyOperationEntity): Unit = {
+    try {
+      topologyOperation.getOperation match {
+        case OPERATION.START =>
+          start(topologyExecution, topologyOperation)
+        case OPERATION.STOP =>
+          stop(topologyExecution, topologyOperation)
+        case m@_ =>
+          log.warning("Unsupported operation: " + topologyOperation)
+          throw new Exception(s"Unsupported operation: ${topologyOperation.getOperation}, possible values are START/STOP")
+      }
+    } catch {
+      case e: Throwable =>
+        topologyOperation.setMessage(e.getMessage)
+        topologyOperation.setStatus(TopologyOperationEntity.OPERATION_STATUS.FAILED)
+        _dao.updateOperationStatus(topologyOperation)
+    }
+  }
+
+  override def receive = {
+    case InitializationEvent(config: Config) =>
+      _config = config
+      _dao = new ApplicationSchedulerAsyncDAO(config, context.dispatcher)
+    case SchedulerCommand(topologyExecution, topologyOperation) =>
+      execute(topologyExecution, topologyOperation)
+    case HealthCheckerEvent =>
+      _dao.loadAllTopologyExecutionEntities() onComplete {
+        case Success(topologyExecutions) =>
+          log.info(s"Load ${topologyExecutions.size()} topologies in execution")
+          JavaConversions.collectionAsScalaIterable(topologyExecutions) foreach { topologyExecution =>
+            try{
+              status(topologyExecution)
+            } catch {
+              case ex: Throwable =>
+                log.error(ex.getMessage)
+            }
+          }
+        case Failure(ex) =>
+          log.error(s"Fail to load any topologyExecutionEntity due to Exception: ${ex.getMessage}")
+      }
+    case TerminatedEvent =>
+      context.stop(self)
+    case m@_ =>
+      log.warning("Unsupported operation $m")
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ecf75b28/eagle-core/eagle-application-management/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/scheduler/AppCommandLoader.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-application-management/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/scheduler/AppCommandLoader.scala b/eagle-core/eagle-application-management/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/scheduler/AppCommandLoader.scala
new file mode 100644
index 0000000..c731846
--- /dev/null
+++ b/eagle-core/eagle-application-management/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/scheduler/AppCommandLoader.scala
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.eagle.stream.application.scheduler
+
+import akka.actor.{Actor, ActorLogging}
+import com.typesafe.config.Config
+import org.apache.eagle.service.application.entity.TopologyOperationEntity.OPERATION_STATUS
+import org.apache.eagle.stream.application.ApplicationSchedulerAsyncDAO
+
+import scala.collection.JavaConversions
+import scala.util.{Failure, Success}
+
+
+private[scheduler] class AppCommandLoader extends Actor with ActorLogging {
+  @volatile var _config: Config = null
+  @volatile var _dao: ApplicationSchedulerAsyncDAO = null
+
+  import context.dispatcher
+
+  override def receive = {
+    case InitializationEvent(config: Config) =>
+      _config = config
+      _dao = new ApplicationSchedulerAsyncDAO(config, context.dispatcher)
+    case ClearPendingOperation =>
+      if(_dao == null) _dao = new ApplicationSchedulerAsyncDAO(_config, context.dispatcher)
+      _dao.clearPendingOperations()
+    case CommandLoaderEvent => {
+      val _sender = sender()
+      _dao.readOperationsByStatus(OPERATION_STATUS.INITIALIZED) onComplete {
+        case Success(commands) => {
+          log.info(s"Load ${commands.size()} new commands")
+          JavaConversions.collectionAsScalaIterable(commands) foreach { command =>
+            command.setStatus(OPERATION_STATUS.PENDING)
+            _dao.updateOperationStatus(command) onComplete {
+              case Success(response) =>
+                _dao.loadTopologyExecutionByName(command.getSite, command.getApplication, command.getTopology) onComplete {
+                  case Success(topologyExecution) => {
+                    _sender ! SchedulerCommand(topologyExecution, command)
+                  }
+                  case Failure(ex) =>
+                    log.error(ex.getMessage)
+                    command.setMessage(ex.getMessage)
+                    command.setStatus(OPERATION_STATUS.FAILED)
+                    _dao.updateOperationStatus(command)
+                }
+              case Failure(ex) =>
+                log.error(s"Got an exception to update command status $command: ${ex.getMessage}")
+                command.setMessage(ex.getMessage)
+                command.setStatus(OPERATION_STATUS.FAILED)
+                _dao.updateOperationStatus(command)
+            }
+          }
+        }
+        case Failure(ex) =>
+          log.error(s"Failed to get commands due to exception ${ex.getMessage}")
+      }
+    }
+    case TerminatedEvent =>
+      context.stop(self)
+    case m@_ => throw new UnsupportedOperationException(s"Event is not supported $m")
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ecf75b28/eagle-core/eagle-application-management/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/scheduler/ApplicationScheduler.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-application-management/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/scheduler/ApplicationScheduler.scala b/eagle-core/eagle-application-management/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/scheduler/ApplicationScheduler.scala
new file mode 100644
index 0000000..476a3fb
--- /dev/null
+++ b/eagle-core/eagle-application-management/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/scheduler/ApplicationScheduler.scala
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.eagle.stream.application.scheduler
+
+import akka.actor.{ActorSystem, Props}
+import com.typesafe.config.Config
+import org.apache.eagle.service.application.AppManagerConstants
+import org.apache.eagle.service.application.entity.{TopologyExecutionEntity, TopologyOperationEntity}
+import org.apache.eagle.stream.application.ApplicationManager
+
+import scala.concurrent.duration._
+
+
+private[scheduler] class ScheduleEvent
+private[scheduler] case class InitializationEvent(config: Config) extends ScheduleEvent
+private[scheduler] case class TerminatedEvent() extends ScheduleEvent
+private[scheduler] case class CommandLoaderEvent() extends ScheduleEvent
+private[scheduler] case class HealthCheckerEvent() extends ScheduleEvent
+private[scheduler] case class ClearPendingOperation() extends ScheduleEvent
+private[scheduler] case class SchedulerCommand(topologyExecution: TopologyExecutionEntity, topologyOperation: TopologyOperationEntity) extends ScheduleEvent
+
+case class EagleServiceUnavailableException(message:String) extends Exception(message)
+case class DuplicatedDefinitionException(message:String) extends Exception(message)
+case class LoadTopologyFailureException(message:String) extends Exception(message)
+
+
+/**
+ * 1. Sync command from eagle service
+ * 2. Coordinate command to different actor
+ * 3. Actor execute command as requested
+ */
+class ApplicationScheduler {
+  //val config = ConfigFactory.load()
+  val DEFAULT_COMMAND_LOADER_INTERVAL_SECS = 2
+  val DEFAULT_HEALTH_CHECK_INTERVAL_SECS = 5
+
+  def start(config: Config) = {
+    val system = ActorSystem("application-manager-scheduler", config)
+    system.log.info(s"Started actor system: $system")
+
+    import system.dispatcher
+
+    val commandLoaderIntervalSecs: Long = if(config.hasPath(AppManagerConstants.APP_COMMAND_LOADER_INTERVAL_SECS)) config.getLong(AppManagerConstants.APP_COMMAND_LOADER_INTERVAL_SECS) else DEFAULT_COMMAND_LOADER_INTERVAL_SECS
+    val healthCheckIntervalSecs: Long = if(config.hasPath(AppManagerConstants.APP_HEALTH_CHECK_INTERVAL_SECS)) config.getLong(AppManagerConstants.APP_HEALTH_CHECK_INTERVAL_SECS) else DEFAULT_HEALTH_CHECK_INTERVAL_SECS
+
+    val coordinator = system.actorOf(Props[StreamAppCoordinator])
+    system.scheduler.scheduleOnce(0 seconds, coordinator, InitializationEvent(config))
+    system.scheduler.scheduleOnce(1 seconds, coordinator, ClearPendingOperation)
+    system.scheduler.schedule(2.seconds, commandLoaderIntervalSecs.seconds, coordinator, CommandLoaderEvent)
+    system.scheduler.schedule(10.seconds, healthCheckIntervalSecs.seconds, coordinator, HealthCheckerEvent)
+
+    /*
+     registerOnTermination is called when you have shut down the ActorSystem (system.shutdown),
+     and the callbacks will be executed after all actors have been stopped.
+     */
+    system.registerOnTermination(new Runnable {
+      override def run(): Unit = {
+        coordinator ! TerminatedEvent
+        ApplicationManager.stopAll()
+      }
+    })
+    system
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ecf75b28/eagle-core/eagle-application-management/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/scheduler/StreamAppCoordinator.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-application-management/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/scheduler/StreamAppCoordinator.scala b/eagle-core/eagle-application-management/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/scheduler/StreamAppCoordinator.scala
new file mode 100644
index 0000000..17006ee
--- /dev/null
+++ b/eagle-core/eagle-application-management/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/scheduler/StreamAppCoordinator.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.eagle.stream.application.scheduler
+
+import akka.actor.{Actor, ActorLogging, ActorRef, Props}
+
+private[scheduler] class StreamAppCoordinator extends Actor with ActorLogging {
+  var commandLoader: ActorRef = null
+  var commandExecutor: ActorRef = null
+
+
+  override def preStart(): Unit = {
+    commandLoader = context.actorOf(Props[AppCommandLoader], "command-loader")
+    commandExecutor = context.actorOf(Props[AppCommandExecutor], "command-worker")
+  }
+
+  override def receive = {
+    case InitializationEvent(config) => {
+      log.info(s"Config updated: $config")
+      commandLoader ! InitializationEvent(config)
+      commandExecutor ! InitializationEvent(config)
+    }
+    case ClearPendingOperation =>
+      commandLoader ! ClearPendingOperation
+    case CommandLoaderEvent =>
+      commandLoader ! CommandLoaderEvent
+    case command: SchedulerCommand =>
+      log.info(s"Executing command: $SchedulerCommand")
+      commandExecutor ! command
+    case HealthCheckerEvent =>
+      commandExecutor ! HealthCheckerEvent
+    case TerminatedEvent =>
+      log.info("Coordinator exit ...")
+      context.stop(self)
+    case m@_ =>
+      log.warning(s"Coordinator Unsupported message: $m")
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ecf75b28/eagle-core/eagle-application-management/eagle-stream-application-manager/src/test/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-application-management/eagle-stream-application-manager/src/test/resources/application.conf b/eagle-core/eagle-application-management/eagle-stream-application-manager/src/test/resources/application.conf
new file mode 100644
index 0000000..4c21a7c
--- /dev/null
+++ b/eagle-core/eagle-application-management/eagle-stream-application-manager/src/test/resources/application.conf
@@ -0,0 +1,42 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+### scheduler propertise
+appCommandLoaderIntervalSecs = 1
+appHealthCheckIntervalSecs = 5
+
+### execution platform properties
+envContextConfig.env = "storm"
+envContextConfig.url = "http://sandbox.hortonworks.com:8744"
+envContextConfig.nimbusHost = "sandbox.hortonworks.com"
+envContextConfig.nimbusThriftPort = 6627
+envContextConfig.jarFile = "/dir-to-jar/eagle-topology-0.3.0-incubating-assembly.jar"
+
+### default topology properties
+eagleProps.mailHost = "mailHost.com"
+eagleProps.mailSmtpPort = "25"
+eagleProps.mailDebug = "true"
+eagleProps.eagleService.host = "localhost"
+eagleProps.eagleService.port = 9099
+eagleProps.eagleService.username = "admin"
+eagleProps.eagleService.password = "secret"
+eagleProps.dataJoinPollIntervalSec = 30
+
+dynamicConfigSource.enabled = true
+dynamicConfigSource.initDelayMillis = 0
+dynamicConfigSource.delayMillis = 30000
+
+

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ecf75b28/eagle-core/eagle-application-management/eagle-stream-application-manager/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-application-management/eagle-stream-application-manager/src/test/resources/log4j.properties b/eagle-core/eagle-application-management/eagle-stream-application-manager/src/test/resources/log4j.properties
new file mode 100644
index 0000000..25331ab
--- /dev/null
+++ b/eagle-core/eagle-application-management/eagle-stream-application-manager/src/test/resources/log4j.properties
@@ -0,0 +1,35 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+log4j.rootLogger=INFO, stdout
+
+ eagle.log.dir=../logs
+ eagle.log.file=eagle.log
+
+# standard output
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n
+
+# Daily Rolling File Appender
+ log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender
+ log4j.appender.DRFA.File=${eagle.log.dir}/${eagle.log.file}
+ log4j.appender.DRFA.DatePattern=.yyyy-MM-dd
+## 30-day backup
+# log4j.appender.DRFA.MaxBackupIndex=30
+ log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout
+
+# Pattern format: Date LogLevel LoggerName LogMessage
+log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ecf75b28/eagle-core/eagle-application-management/eagle-stream-application-manager/src/test/scala/org/apache/eagle/stream/application/scheduler/MockTopology.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-application-management/eagle-stream-application-manager/src/test/scala/org/apache/eagle/stream/application/scheduler/MockTopology.scala b/eagle-core/eagle-application-management/eagle-stream-application-manager/src/test/scala/org/apache/eagle/stream/application/scheduler/MockTopology.scala
new file mode 100644
index 0000000..e87ee92
--- /dev/null
+++ b/eagle-core/eagle-application-management/eagle-stream-application-manager/src/test/scala/org/apache/eagle/stream/application/scheduler/MockTopology.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.eagle.stream.application.scheduler
+
+import com.typesafe.config.Config
+import org.apache.eagle.stream.application.TopologyExecutable
+import org.slf4j.LoggerFactory
+
+class MockTopology extends TopologyExecutable {
+  private val LOG = LoggerFactory.getLogger(classOf[MockTopology])
+  override def submit(topology: String, config: Config): Unit = {
+    LOG.info(s"$topology is running")
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ecf75b28/eagle-core/eagle-application-management/eagle-stream-application-manager/src/test/scala/org/apache/eagle/stream/application/scheduler/StormApplicationManagerSpec.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-application-management/eagle-stream-application-manager/src/test/scala/org/apache/eagle/stream/application/scheduler/StormApplicationManagerSpec.scala b/eagle-core/eagle-application-management/eagle-stream-application-manager/src/test/scala/org/apache/eagle/stream/application/scheduler/StormApplicationManagerSpec.scala
new file mode 100644
index 0000000..1cad3a7
--- /dev/null
+++ b/eagle-core/eagle-application-management/eagle-stream-application-manager/src/test/scala/org/apache/eagle/stream/application/scheduler/StormApplicationManagerSpec.scala
@@ -0,0 +1,40 @@
+package org.apache.eagle.stream.application.scheduler
+
+import com.typesafe.config.ConfigFactory
+import org.apache.eagle.common.config.EagleConfigConstants
+import org.apache.eagle.stream.application.ExecutionPlatform
+import org.apache.eagle.stream.application.impl.StormExecutionPlatform
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+
+object StormApplicationManagerSpec extends App {
+  val manager: ExecutionPlatform = new StormExecutionPlatform
+  val baseConfig = ConfigFactory.load()
+  val topoConfigStr: String = "webConfig{\"hbase.zookeeper.property.clientPort\":\"2181\", \"hbase.zookeeper.quorum\":\"localhost\"}\nappConfig{\n  \"envContextConfig\" : {\n    \"env\" : \"storm\",\n    \"mode\" : \"cluster\",\n    \"topologyName\" : \"sandbox-hbaseSecurityLog-topology\",\n    \"stormConfigFile\" : \"security-auditlog-storm.yaml\",\n    \"parallelismConfig\" : {\n      \"kafkaMsgConsumer\" : 1,\n      \"hbaseSecurityLogAlertExecutor*\" : 1\n    }\n  },\n  \"dataSourceConfig\": {\n    \"topic\" : \"sandbox_hbase_security_log\",\n    \"zkConnection\" : \"127.0.0.1:2181\",\n    \"zkConnectionTimeoutMS\" : 15000,\n    \"brokerZkPath\" : \"/brokers\",\n    \"fetchSize\" : 1048586,\n    \"deserializerClass\" : \"org.apache.eagle.security.hbase.parse.HbaseAuditLogKafkaDeserializer\",\n    \"transactionZKServers\" : \"127.0.0.1\",\n    \"transactionZKPort\" : 2181,\n    \"transactionZKRoot\" : \"/consumers\",\n    \"consumerGroupId\" : \"eagle.hbasesecurity.consumer\",\n  
   \"transactionStateUpdateMS\" : 2000\n  },\n  \"alertExecutorConfigs\" : {\n     \"hbaseSecurityLogAlertExecutor\" : {\n       \"parallelism\" : 1,\n       \"partitioner\" : \"org.apache.eagle.policy.DefaultPolicyPartitioner\"\n       \"needValidation\" : \"true\"\n     }\n  },\n  \"eagleProps\" : {\n    \"site\" : \"sandbox\",\n    \"application\": \"hbaseSecurityLog\",\n    \"dataJoinPollIntervalSec\" : 30,\n    \"mailHost\" : \"mailHost.com\",\n    \"mailSmtpPort\":\"25\",\n    \"mailDebug\" : \"true\",\n    \"eagleService\": {\n      \"host\": \"localhost\",\n      \"port\": 9099\n      \"username\": \"admin\",\n      \"password\": \"secret\"\n    }\n  },\n  \"dynamicConfigSource\" : {\n    \"enabled\" : true,\n    \"initDelayMillis\" : 0,\n    \"delayMillis\" : 30000\n  }\n}"
+
+  val topoConfig = ConfigFactory.parseString(topoConfigStr)
+  val conf = topoConfig.getConfig(EagleConfigConstants.APP_CONFIG).withFallback(baseConfig)
+
+  //val (ret, nextState) = manager.execute("START", topologyDescModel, null, conf)
+  //println(s"Result: ret=$ret, nextState=$nextState")
+}
+
+
+

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ecf75b28/eagle-core/eagle-application-management/eagle-stream-application-manager/src/test/scala/org/apache/eagle/stream/application/scheduler/TestScheduler.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-application-management/eagle-stream-application-manager/src/test/scala/org/apache/eagle/stream/application/scheduler/TestScheduler.scala b/eagle-core/eagle-application-management/eagle-stream-application-manager/src/test/scala/org/apache/eagle/stream/application/scheduler/TestScheduler.scala
new file mode 100644
index 0000000..3db2d67
--- /dev/null
+++ b/eagle-core/eagle-application-management/eagle-stream-application-manager/src/test/scala/org/apache/eagle/stream/application/scheduler/TestScheduler.scala
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.stream.application.scheduler
+
+import akka.actor.{ActorSystem, Props}
+import akka.testkit.{TestActorRef, TestKit}
+import com.typesafe.config.ConfigFactory
+import org.scalatest.{Ignore, BeforeAndAfterAll, MustMatchers, WordSpecLike}
+
+@Ignore
+class TestSchedulerSpec extends TestKit(ActorSystem("stream-app-scheduler"))
+with WordSpecLike with MustMatchers with BeforeAndAfterAll {
+
+  "A Scheduler actor" must {
+    "Forward a message it receives" in {
+      val coordinator = TestActorRef[StreamAppCoordinator]
+      coordinator ! CommandLoaderEvent
+      expectNoMsg()
+    }
+  }
+
+  "A Integrated test" must {
+    "run end-to-end" in {
+      val coordinator = system.actorOf(Props[StreamAppCoordinator])
+      coordinator ! CommandLoaderEvent
+      expectNoMsg()
+    }
+  }
+
+  override def afterAll(): Unit = {
+    super.afterAll()
+    system.shutdown()
+  }
+}
+
+@Ignore
+object TestStreamAppScheduler extends App {
+  val conf: String = """
+                          akka.loglevel = "DEBUG"
+                          akka.actor.debug {
+                            receive = on
+                            lifecycle = on
+                          }
+                     """
+  new ApplicationScheduler().start(ConfigFactory.parseString(conf))
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ecf75b28/eagle-core/eagle-application-management/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-application-management/pom.xml b/eagle-core/eagle-application-management/pom.xml
new file mode 100644
index 0000000..0637d7e
--- /dev/null
+++ b/eagle-core/eagle-application-management/pom.xml
@@ -0,0 +1,40 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~  Unless required by applicable law or agreed to in writing, software
+  ~  distributed under the License is distributed on an "AS IS" BASIS,
+  ~  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~  See the License for the specific language governing permissions and
+  ~  limitations under the License.
+  ~
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>eagle-core</artifactId>
+        <groupId>org.apache.eagle</groupId>
+        <version>0.3.0-incubating</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>eagle-application-management</artifactId>
+    <packaging>pom</packaging>
+    <description>Eagle Application Management</description>
+
+    <modules>
+        <module>eagle-stream-application-manager</module>
+        <module>eagle-application-service</module>
+    </modules>
+
+
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ecf75b28/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/parser/Pipeline.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/parser/Pipeline.scala b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/parser/Pipeline.scala
index cc1e009..eb09156 100644
--- a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/parser/Pipeline.scala
+++ b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/parser/Pipeline.scala
@@ -78,6 +78,12 @@ trait PipelineParser{
   }
 
   def parseString(config:String):Pipeline = parse(ConfigFactory.parseString(config))
+
+  def parseStringWithConfig(dataFlow:String, config: Config) = {
+    val pConfig = config.withFallback(ConfigFactory.parseString(dataFlow))
+    parse(pConfig)
+  }
+
   def parseResource(resource:String):Pipeline = {
     // TODO: Load environment, currently hard-code with storm
     if(resource.startsWith("/") || resource.startsWith("./")){

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ecf75b28/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormTopologyExecutorImpl.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormTopologyExecutorImpl.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormTopologyExecutorImpl.scala
index 5d64c4c..54d09e6 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormTopologyExecutorImpl.scala
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormTopologyExecutorImpl.scala
@@ -22,6 +22,7 @@ import _root_.storm.trident.spout.RichSpoutBatchExecutor
 import backtype.storm.generated.StormTopology
 import backtype.storm.utils.Utils
 import backtype.storm.{Config, LocalCluster, StormSubmitter}
+import org.apache.eagle.common.config.EagleConfigConstants
 import org.apache.eagle.datastream.core.AbstractTopologyExecutor
 import org.apache.thrift7.transport.TTransportException
 import org.slf4j.LoggerFactory
@@ -31,7 +32,7 @@ case class StormTopologyExecutorImpl(topology: StormTopology, config: com.typesa
   val LOG = LoggerFactory.getLogger(classOf[StormTopologyExecutorImpl])
   @throws(classOf[Exception])
   def execute {
-    val localMode: Boolean = config.getString("envContextConfig.mode").equalsIgnoreCase("local")
+    val localMode: Boolean = config.getString("envContextConfig.mode").equalsIgnoreCase(EagleConfigConstants.LOCAL_MODE)
     val conf: Config = new Config
     conf.put(RichSpoutBatchExecutor.MAX_BATCH_SIZE_CONF, Int.box(64 * 1024))
     conf.put(Config.TOPOLOGY_RECEIVER_BUFFER_SIZE, Int.box(8))
@@ -91,12 +92,15 @@ case class StormTopologyExecutorImpl(topology: StormTopology, config: com.typesa
       LOG.info("Submitting as local mode")
       val cluster: LocalCluster = new LocalCluster
       cluster.submitTopology(topologyName, conf, topology)
-      while(true) {
-        try {
+      try {
+        while(true) {
           Utils.sleep(Integer.MAX_VALUE)
-        } catch {
-          case _: Throwable => () // Do nothing
         }
+      } catch {
+        case ex: Throwable =>
+          LOG.warn("Sleep is interrupted with " + ex.toString)
+          cluster.killTopology(topologyName)
+          cluster.shutdown
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ecf75b28/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/common/Constants.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/common/Constants.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/common/Constants.java
index ca65669..2a33460 100644
--- a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/common/Constants.java
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/common/Constants.java
@@ -32,6 +32,10 @@ public class Constants {
 	public static final String APPLICATION_DESCRIPTION_SERVICE_ENDPOINT_NAME = "ApplicationDescService";
 	public static final String FEATURE_DESCRIPTION_SERVICE_ENDPOINT_NAME = "FeatureDescService";
 
+	public static final String TOPOLOGY_EXECUTION_SERVICE_ENDPOINT_NAME = "TopologyExecutionService";
+	public static final String TOPOLOGY_OPERATION_SERVICE_ENDPOINT_NAME = "TopologyOperationService";
+	public static final String TOPOLOGY_DESCRIPTION_SERVICE_ENDPOINT_NAME = "TopologyDescriptionService";
+
 	public static final String GENERIC_RESOURCE_SERVICE_ENDPOINT_NAME = "GenericResourceService";
 	
 	public final static String AGGREGATE_DEFINITION_SERVICE_ENDPOINT_NAME = "AggregateDefinitionService";

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ecf75b28/eagle-core/eagle-query/eagle-common/src/main/java/org/apache/eagle/common/config/EagleConfigConstants.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-common/src/main/java/org/apache/eagle/common/config/EagleConfigConstants.java b/eagle-core/eagle-query/eagle-common/src/main/java/org/apache/eagle/common/config/EagleConfigConstants.java
index 0a035b8..26d7b49 100644
--- a/eagle-core/eagle-query/eagle-common/src/main/java/org/apache/eagle/common/config/EagleConfigConstants.java
+++ b/eagle-core/eagle-query/eagle-common/src/main/java/org/apache/eagle/common/config/EagleConfigConstants.java
@@ -58,5 +58,8 @@ public final class EagleConfigConstants {
 
     public final static String WEB_CONFIG = "web";
     public final static String APP_CONFIG = "app";
+    public final static String CLASSIFICATION_CONFIG = "classification";
 
+    public final static String LOCAL_MODE = "local";
+    public final static String CLUSTER_MODE = "cluster";
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ecf75b28/eagle-core/eagle-query/eagle-storage-base/src/main/java/org/apache/eagle/storage/operation/CompiledQuery.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-storage-base/src/main/java/org/apache/eagle/storage/operation/CompiledQuery.java b/eagle-core/eagle-query/eagle-storage-base/src/main/java/org/apache/eagle/storage/operation/CompiledQuery.java
index 34761cb..e3368ab 100644
--- a/eagle-core/eagle-query/eagle-storage-base/src/main/java/org/apache/eagle/storage/operation/CompiledQuery.java
+++ b/eagle-core/eagle-query/eagle-storage-base/src/main/java/org/apache/eagle/storage/operation/CompiledQuery.java
@@ -214,10 +214,23 @@ public class CompiledQuery {
         EntityDefinition ed = EntityDefinitionManager.getEntityByServiceName(serviceName);
         if(ed.isTimeSeries()){
             // TODO check Time exists for timeseries or topology data
-            this.searchCondition.setStartTime(this.rawQuery.getStartTime());
-            this.searchCondition.setEndTime(this.rawQuery.getEndTime());
-            this.setStartTime(DateTimeUtil.humanDateToSeconds(this.getRawQuery().getStartTime()) * 1000);
-            this.setEndTime(DateTimeUtil.humanDateToSeconds(this.getRawQuery().getEndTime()) * 1000);
+            long endTimeMillis = System.currentTimeMillis();
+            long startTimeMills = endTimeMillis - 30 * DateTimeUtil.ONEDAY;
+            String endTime = DateTimeUtil.millisecondsToHumanDateWithSeconds(endTimeMillis);
+            String startTime = DateTimeUtil.millisecondsToHumanDateWithSeconds(startTimeMills);
+
+            if(this.rawQuery.getStartTime() != null && this.rawQuery.getEndTime() != null) {
+                endTime = this.rawQuery.getEndTime();
+                startTime = this.rawQuery.getStartTime();
+                endTimeMillis = DateTimeUtil.humanDateToSeconds(endTime) * 1000;
+                startTimeMills = DateTimeUtil.humanDateToSeconds(startTime) * 1000;
+            } else {
+                LOG.warn("startTime or endTime is not given, use [currentSystemTime - 30 days, currentSystemTime]");
+            }
+            this.searchCondition.setStartTime(startTime);
+            this.searchCondition.setEndTime(endTime);
+            this.setStartTime(startTimeMills);
+            this.setEndTime(endTimeMillis);
         }else{
             this.searchCondition.setStartTime("0");
             this.searchCondition.setEndTime("1");