You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by mw...@apache.org on 2016/07/25 09:36:40 UTC
[03/47] incubator-eagle git commit: EAGLE-271 Topology management in
remote/local mode including start/stop operations
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ecf75b28/eagle-core/eagle-application-management/eagle-stream-application-manager/src/main/java/org/apache/eagle/stream/application/TopologyFactory.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-application-management/eagle-stream-application-manager/src/main/java/org/apache/eagle/stream/application/TopologyFactory.java b/eagle-core/eagle-application-management/eagle-stream-application-manager/src/main/java/org/apache/eagle/stream/application/TopologyFactory.java
new file mode 100644
index 0000000..e32f48e
--- /dev/null
+++ b/eagle-core/eagle-application-management/eagle-stream-application-manager/src/main/java/org/apache/eagle/stream/application/TopologyFactory.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.stream.application;
+
+
+import com.typesafe.config.Config;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+
+public final class TopologyFactory {
+ public static Logger LOG = LoggerFactory.getLogger(TopologyFactory.class);
+ private final static Map<String, TopologyExecutable> topologyCache = Collections.synchronizedMap(new HashMap<String, TopologyExecutable>());
+ public static TopologyExecutable getTopologyInstance(String topologyClass) throws TopologyException {
+ TopologyExecutable instance;
+ if(topologyCache.containsKey(topologyClass)){
+ instance = topologyCache.get(topologyClass);
+ } else {
+ try {
+ LOG.info("load class " + topologyClass + "with classLoader " + TopologyFactory.class.getClassLoader().toString());
+ instance = (TopologyExecutable) Class.forName(topologyClass).newInstance();
+ topologyCache.put(topologyClass, instance);
+ } catch (ClassNotFoundException e) {
+ throw new TopologyException("Topology in type of " + topologyClass + " is not found",e);
+ } catch (InstantiationException | IllegalAccessException e) {
+ throw new TopologyException(e);
+ }
+ }
+ return instance;
+ }
+
+ public static void submit(String topologyClass, Config config) throws TopologyException {
+ TopologyExecutable topology = getTopologyInstance(topologyClass);
+ topology.submit(topologyClass, config);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ecf75b28/eagle-core/eagle-application-management/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/AbstractDynamicApplication.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-application-management/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/AbstractDynamicApplication.scala b/eagle-core/eagle-application-management/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/AbstractDynamicApplication.scala
new file mode 100644
index 0000000..3e918cc
--- /dev/null
+++ b/eagle-core/eagle-application-management/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/AbstractDynamicApplication.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.stream.application
+
+import com.typesafe.config.Config
+import org.apache.eagle.datastream.core.StreamContext
+import org.apache.eagle.stream.pipeline.Pipeline
+
+
+trait AbstractDynamicApplication extends TopologyExecutable {
+ def compileStream(application: String, config: Config): StreamContext = {
+ val pipeline = Pipeline.parseStringWithConfig(application, config)
+ Pipeline.compile(pipeline)
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ecf75b28/eagle-core/eagle-application-management/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/ApplicationManager.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-application-management/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/ApplicationManager.scala b/eagle-core/eagle-application-management/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/ApplicationManager.scala
new file mode 100644
index 0000000..bbfaedd
--- /dev/null
+++ b/eagle-core/eagle-application-management/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/ApplicationManager.scala
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.stream.application
+
+import java.util
+
+import com.google.common.base.Preconditions
+import org.apache.eagle.service.application.entity.TopologyExecutionStatus
+import org.apache.eagle.stream.application.impl.StormExecutionPlatform
+import org.slf4j.{Logger, LoggerFactory}
+
+import scala.collection.JavaConversions
+
+
+object ApplicationManager {
+ private val LOG: Logger = LoggerFactory.getLogger(ApplicationManager.getClass)
+ private val workerMap: util.Map[AnyRef, TaskExecutor] = new util.TreeMap[AnyRef, TaskExecutor]
+
+ def getWorkerMap: util.Map[AnyRef, TaskExecutor] = {
+ return workerMap
+ }
+
+ def submit(id: AnyRef, runnable: Runnable): TaskExecutor = {
+ if (workerMap.containsKey(id)) {
+ val executor: Thread = workerMap.get(id)
+ if (!executor.isAlive || executor.getState.equals() ) {
+ LOG.info("Replacing dead executor: {}", executor)
+ workerMap.remove(id)
+ }
+ else {
+ throw new IllegalArgumentException("Duplicated id '" + id + "'")
+ }
+ }
+ val worker: TaskExecutor = new TaskExecutor(runnable)
+ LOG.info("Registering new executor %s: %s".format(id, worker))
+ workerMap.put(id, worker)
+ worker.setName(id.toString)
+ worker.setDaemon(true)
+ worker.start
+ return worker
+ }
+
+ def get(id: AnyRef): TaskExecutor = {
+ Preconditions.checkArgument(workerMap.containsKey(id))
+ return workerMap.get(id)
+ }
+
+ @throws(classOf[Exception])
+ def stop(id: AnyRef): TaskExecutor = {
+ val worker: TaskExecutor = get(id)
+ worker.interrupt
+ //this.workerMap.remove(id)
+ return worker
+ }
+
+ def getWorkerStatus(state: Thread.State): String = {
+ if (whereIn(state, java.lang.Thread.State.RUNNABLE, java.lang.Thread.State.TIMED_WAITING, java.lang.Thread.State.WAITING)) {
+ return TopologyExecutionStatus.STARTED
+ }
+ else if (whereIn(state, java.lang.Thread.State.NEW)) {
+ return TopologyExecutionStatus.STARTING
+ }
+ else if (whereIn(state, java.lang.Thread.State.TERMINATED)) {
+ return TopologyExecutionStatus.STOPPED
+ }
+ throw new IllegalStateException("Unknown state: " + state)
+ }
+
+ def getTopologyStatus(status: String): String = {
+ if(whereIn(status, StormExecutionPlatform.KILLED))
+ return TopologyExecutionStatus.STOPPING
+ return TopologyExecutionStatus.STARTED
+ }
+
+ private def whereIn(status: String, inStatuses: String*): Boolean = {
+ for (_status <- inStatuses) {
+ if (_status.equalsIgnoreCase(status)) {
+ return true
+ }
+ }
+ return false
+ }
+ private def whereIn(state: Thread.State, inStates: Thread.State*): Boolean = {
+ for (_state <- inStates) {
+ if (_state eq state) {
+ return true
+ }
+ }
+ return false
+ }
+
+ def remove(id: AnyRef) {
+ val executor: TaskExecutor = this.get(id)
+ if (executor.isAlive) {
+ throw new RuntimeException("Failed to remove alive executor '" + id + "'")
+ }
+ else {
+ this.workerMap.remove(id)
+ }
+ }
+
+ def stopAll(): Unit ={
+ JavaConversions.collectionAsScalaIterable(workerMap.values()) foreach { worker =>
+ if(!worker.isInterrupted) {
+ worker.interrupt()
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ecf75b28/eagle-core/eagle-application-management/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/ApplicationManagerUtils.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-application-management/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/ApplicationManagerUtils.scala b/eagle-core/eagle-application-management/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/ApplicationManagerUtils.scala
new file mode 100644
index 0000000..4c2df77
--- /dev/null
+++ b/eagle-core/eagle-application-management/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/ApplicationManagerUtils.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.stream.application
+
+import com.typesafe.config.Config
+import org.apache.eagle.service.application.AppManagerConstants
+import org.apache.eagle.service.application.entity.TopologyExecutionEntity
+
+
+object ApplicationManagerUtils {
+
+ def generateTopologyFullName(topologyExecution: TopologyExecutionEntity) = {
+ val fullName = "eagle-%s-%s-%s".format(topologyExecution.getSite, topologyExecution.getApplication, topologyExecution.getTopology)
+ fullName
+ }
+
+ def buildStormTopologyURL(config: Config, topologyID: String): String = {
+ val clusterURL = if(config.hasPath(AppManagerConstants.CLUSTER_URL)) config.getString(AppManagerConstants.CLUSTER_URL) else AppManagerConstants.DEFAULT_CLUSTER_URL
+ val topologyURL = clusterURL + "/topology.html?id=" + topologyID
+ topologyURL
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ecf75b28/eagle-core/eagle-application-management/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/ApplicationSchedulerAsyncDAO.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-application-management/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/ApplicationSchedulerAsyncDAO.scala b/eagle-core/eagle-application-management/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/ApplicationSchedulerAsyncDAO.scala
new file mode 100644
index 0000000..ae0f6e8
--- /dev/null
+++ b/eagle-core/eagle-application-management/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/ApplicationSchedulerAsyncDAO.scala
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.stream.application
+
+import java.util
+import java.util.concurrent.Callable
+
+import akka.dispatch.Futures
+import com.typesafe.config.Config
+import org.apache.eagle.alert.entity.SiteApplicationServiceEntity
+import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity
+import org.apache.eagle.policy.common.Constants
+import org.apache.eagle.service.application.entity.{TopologyDescriptionEntity, TopologyExecutionEntity, TopologyExecutionStatus, TopologyOperationEntity}
+import org.apache.eagle.service.client.EagleServiceConnector
+import org.apache.eagle.service.client.impl.EagleServiceClientImpl
+import org.slf4j.{Logger, LoggerFactory}
+
+import scala.collection.JavaConversions
+import scala.concurrent.ExecutionContext
+
+
+class ApplicationSchedulerAsyncDAO(config: Config, ex: ExecutionContext) {
+ private val LOG: Logger = LoggerFactory.getLogger(classOf[ApplicationSchedulerAsyncDAO])
+ private val connector: EagleServiceConnector = new EagleServiceConnector(config)
+
+ def getEagleServiceClient(): EagleServiceClientImpl = {
+ return new EagleServiceClientImpl(connector)
+ }
+
+ def readOperationsByStatus(status: String) = {
+ Futures.future(new Callable[util.List[TopologyOperationEntity]]{
+ override def call(): util.List[TopologyOperationEntity] = {
+ val client = getEagleServiceClient()
+ val query = "%s[@status=\"%s\"]{*}".format(Constants.TOPOLOGY_OPERATION_SERVICE_ENDPOINT_NAME, status)
+ val response: GenericServiceAPIResponseEntity[TopologyOperationEntity] = client.search(query).pageSize(Int.MaxValue).send()
+ if(client != null) client.close()
+ if(!response.isSuccess || response.getObj == null)
+ throw new Exception(s"Fail to load operations with status $status")
+ response.getObj
+ }
+ }, ex)
+ }
+
+ def loadAllTopologyExecutionEntities() = {
+ Futures.future(new Callable[util.List[TopologyExecutionEntity]]{
+ override def call(): util.List[TopologyExecutionEntity] = {
+ val client = getEagleServiceClient()
+ val query = "%s[@status != \"%s\"]{*}".format(Constants.TOPOLOGY_EXECUTION_SERVICE_ENDPOINT_NAME, TopologyExecutionStatus.NEW)
+ val response: GenericServiceAPIResponseEntity[TopologyExecutionEntity] = client.search(query).pageSize(Int.MaxValue).send()
+ if(client != null) client.close()
+ if(!response.isSuccess || response.getObj == null) throw new Exception(response.getException)
+ response.getObj
+ }
+ }, ex)
+ }
+
+ def loadTopologyExecutionByName(site: String, appName: String, topologyName: String) = {
+ Futures.future(new Callable[TopologyExecutionEntity]{
+ override def call(): TopologyExecutionEntity = {
+ val client = getEagleServiceClient()
+ val query = "%s[@site=\"%s\" AND @application=\"%s\" AND @topology=\"%s\"]{*}".format(Constants.TOPOLOGY_EXECUTION_SERVICE_ENDPOINT_NAME, site, appName, topologyName)
+ LOG.info(s"query=$query")
+ val response: GenericServiceAPIResponseEntity[TopologyExecutionEntity] = client.search(query).pageSize(Int.MaxValue).send()
+ if(client != null) client.close()
+ if(!response.isSuccess || response.getObj == null)
+ throw new Exception(s"Fail to load topologyExecutionEntity with application=$appName topology=$topologyName due to Exception: ${response.getException}")
+ if(response.getObj.size() == 0 || response.getObj.size() > 1) {
+ throw new Exception(s"Get 0 or more than 1 topologyExecutionEntity with application=$appName topology=$topologyName")
+ }
+ response.getObj.get(0)
+ }
+ }, ex)
+ }
+
+ def loadTopologyDescriptionByName(site: String, application: String, topologyName: String) = {
+ Futures.future(new Callable[TopologyDescriptionEntity]{
+ override def call(): TopologyDescriptionEntity = {
+ val client = getEagleServiceClient()
+ var query = "%s[@topology=\"%s\"]{*}".format(Constants.TOPOLOGY_DESCRIPTION_SERVICE_ENDPOINT_NAME, topologyName)
+ val response: GenericServiceAPIResponseEntity[TopologyDescriptionEntity] = client.search(query).pageSize(Int.MaxValue).send()
+ if(!response.isSuccess || response.getObj == null || response.getObj.size() == 0)
+ throw new Exception(s"Fail to load TopologyDescriptionEntity with site=$site application=$application topology=$topologyName due to Exception: ${response.getException}")
+ val topologyDescriptionEntity = response.getObj.get(0)
+
+ query = "%s[@site=\"%s\" AND @application=\"%s\"]{*}".format(Constants.SITE_APPLICATION_SERVICE_ENDPOINT_NAME, site, application)
+ val configResponse: GenericServiceAPIResponseEntity[SiteApplicationServiceEntity] = client.search(query).pageSize(Int.MaxValue).send()
+ if (client != null) client.close()
+ if(!configResponse.isSuccess || configResponse.getObj == null || configResponse.getObj.size() == 0)
+ throw new Exception(s"Fail to load topology configuration with query=$query due to Exception: ${configResponse.getException}")
+ val siteApplicationEntity = configResponse.getObj.get(0)
+ topologyDescriptionEntity.setContext(siteApplicationEntity.getConfig)
+ topologyDescriptionEntity
+ }
+ }, ex)
+ }
+
+ def updateOperationStatus(operation: TopologyOperationEntity) = {
+ Futures.future(new Callable[GenericServiceAPIResponseEntity[String]]{
+ override def call(): GenericServiceAPIResponseEntity[String] = {
+ if(LOG.isDebugEnabled()) LOG.debug(s"Updating status of command[$operation] as ${operation.getStatus}")
+ val client = getEagleServiceClient()
+ operation.setLastModifiedDate(System.currentTimeMillis())
+ val response= client.update(java.util.Arrays.asList(operation), classOf[TopologyOperationEntity])
+ if(client != null) client.close()
+ if(response.isSuccess) {
+ LOG.info(s"Updated operation status [$operation] as: ${operation.getStatus}")
+ } else {
+ LOG.error(s"Failed to update status as ${operation.getStatus} of command[$operation]")
+ throw new RuntimeException(s"Failed to update command due to exception: ${response.getException}")
+ }
+ response
+ }
+ }, ex)
+ }
+
+ def updateTopologyExecutionStatus(topology: TopologyExecutionEntity) = {
+ Futures.future(new Callable[GenericServiceAPIResponseEntity[String]]{
+ override def call(): GenericServiceAPIResponseEntity[String] = {
+ if(LOG.isDebugEnabled()) LOG.debug(s"Updating status of app[$topology] as ${topology.getStatus}")
+ val client = getEagleServiceClient()
+ topology.setLastModifiedDate(System.currentTimeMillis())
+ if(client != null) client.close()
+ val response= client.update(java.util.Arrays.asList(topology), classOf[TopologyExecutionEntity])
+ if(response.isSuccess) {
+ LOG.info(s"Updated status application[$topology] as: ${topology.getStatus}")
+ } else {
+ LOG.error(s"Failed to update status as ${topology.getStatus} of application[$topology] due to ${response.getException}")
+ }
+ response
+ }
+ }, ex)
+ }
+
+ def clearPendingOperations() = {
+ Futures.future(new Callable[GenericServiceAPIResponseEntity[String]]{
+ override def call(): GenericServiceAPIResponseEntity[String] = {
+ LOG.info("start to clear operation")
+ val query: String = "%s[@status=\"%s\"]{*}".format(Constants.TOPOLOGY_OPERATION_SERVICE_ENDPOINT_NAME, TopologyOperationEntity.OPERATION_STATUS.PENDING)
+ val client = getEagleServiceClient()
+ val response: GenericServiceAPIResponseEntity[TopologyOperationEntity] = client.search(query).pageSize(Int.MaxValue).send()
+ var ret: GenericServiceAPIResponseEntity[String] = new GenericServiceAPIResponseEntity[String]()
+ if (response.isSuccess && response.getObj.size != 0) {
+ val pendingOperations: util.List[TopologyOperationEntity] = response.getObj
+ val failedOperations: util.List[TopologyOperationEntity] = new util.ArrayList[TopologyOperationEntity]
+ JavaConversions.collectionAsScalaIterable(pendingOperations) foreach { operation =>
+ operation.setStatus(TopologyOperationEntity.OPERATION_STATUS.FAILED)
+ failedOperations.add(operation)
+ }
+ ret = client.update(failedOperations, Constants.TOPOLOGY_OPERATION_SERVICE_ENDPOINT_NAME)
+ if (client != null) client.close()
+ if (ret.isSuccess) {
+ LOG.info(s"Successfully clear ${failedOperations.size()} pending operations")
+ } else {
+ LOG.error(s"Failed to clear pending operations due to exception:" + ret.getException)
+ }
+ }
+ ret
+ }
+ }, ex)
+ }
+}
+
+
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ecf75b28/eagle-core/eagle-application-management/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/ExecutionPlatform.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-application-management/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/ExecutionPlatform.scala b/eagle-core/eagle-application-management/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/ExecutionPlatform.scala
new file mode 100644
index 0000000..88271bb
--- /dev/null
+++ b/eagle-core/eagle-application-management/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/ExecutionPlatform.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.stream.application
+
+import com.typesafe.config.Config
+import org.apache.eagle.service.application.entity.{TopologyExecutionEntity, TopologyDescriptionEntity}
+
+
+trait ExecutionPlatform {
+ def start(topology: TopologyDescriptionEntity, topologyExecution: TopologyExecutionEntity, config: Config)
+ def stop(topologyExecution: TopologyExecutionEntity, config: Config)
+ def status(topologyExecutions: java.util.List[TopologyExecutionEntity], config: Config)
+ def status(topologyExecution: TopologyExecutionEntity, config: Config)
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ecf75b28/eagle-core/eagle-application-management/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/ExecutionPlatformFactory.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-application-management/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/ExecutionPlatformFactory.scala b/eagle-core/eagle-application-management/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/ExecutionPlatformFactory.scala
new file mode 100644
index 0000000..6b9c033
--- /dev/null
+++ b/eagle-core/eagle-application-management/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/ExecutionPlatformFactory.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.stream.application
+
+import org.apache.eagle.service.application.AppManagerConstants
+import org.apache.eagle.stream.application.impl.StormExecutionPlatform
+import org.slf4j.{LoggerFactory, Logger}
+
+import scala.collection.mutable
+
+
+object ExecutionPlatformFactory {
+ private val LOG: Logger = LoggerFactory.getLogger(ExecutionPlatformFactory.getClass)
+
+ var managerCache = new mutable.HashMap[String, ExecutionPlatform] with
+ mutable.SynchronizedMap[String, ExecutionPlatform]
+
+ def getApplicationManager(managerType: String): ExecutionPlatform = {
+ if(managerCache.contains(managerType)) {
+ managerCache.get(managerType).get
+ } else {
+ managerType match {
+ case AppManagerConstants.EAGLE_CLUSTER_STORM =>
+ val instance = new StormExecutionPlatform
+ managerCache.put(managerType, instance)
+ instance
+ case _ =>
+ throw new Exception(s"Invalid managerType $managerType")
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ecf75b28/eagle-core/eagle-application-management/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/TaskExecutor.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-application-management/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/TaskExecutor.scala b/eagle-core/eagle-application-management/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/TaskExecutor.scala
new file mode 100644
index 0000000..07737ac
--- /dev/null
+++ b/eagle-core/eagle-application-management/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/TaskExecutor.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.stream.application
+
+import org.codehaus.jackson.annotate.JsonIgnore
+
+class TaskExecutor(runnable: Runnable) extends Thread(runnable) {
+
+ @JsonIgnore override def getContextClassLoader: ClassLoader = {
+ return super.getContextClassLoader
+ }
+
+ @JsonIgnore override def getUncaughtExceptionHandler: Thread.UncaughtExceptionHandler = {
+ return super.getUncaughtExceptionHandler
+ }
+
+ def shutdown {
+ this.interrupt
+ }
+
+ def restart {
+ this.interrupt
+ this.start
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ecf75b28/eagle-core/eagle-application-management/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/impl/StormDynamicTopology.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-application-management/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/impl/StormDynamicTopology.scala b/eagle-core/eagle-application-management/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/impl/StormDynamicTopology.scala
new file mode 100644
index 0000000..7d52649
--- /dev/null
+++ b/eagle-core/eagle-application-management/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/impl/StormDynamicTopology.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.stream.application.impl
+
+import com.typesafe.config.Config
+import org.apache.eagle.datastream.ExecutionEnvironments
+import org.apache.eagle.datastream.storm.StormExecutionEnvironment
+import org.apache.eagle.stream.application.AbstractDynamicApplication
+import org.slf4j.LoggerFactory
+
+
+object StormDynamicTopology extends AbstractDynamicApplication {
+ val LOG = LoggerFactory.getLogger(classOf[AbstractDynamicApplication])
+
+ override def submit(application: String, config: Config) {
+ val stream = compileStream(application, config)
+ var ret = true
+
+ try {
+ val stormEnv = ExecutionEnvironments.getWithConfig[StormExecutionEnvironment](stream.getConfig)
+ stream.submit(stormEnv)
+ } catch {
+ case e: Throwable =>
+ ret = false
+ LOG.error(e.toString)
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ecf75b28/eagle-core/eagle-application-management/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/impl/StormExecutionPlatform.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-application-management/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/impl/StormExecutionPlatform.scala b/eagle-core/eagle-application-management/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/impl/StormExecutionPlatform.scala
new file mode 100644
index 0000000..5b1bb48
--- /dev/null
+++ b/eagle-core/eagle-application-management/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/impl/StormExecutionPlatform.scala
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.stream.application.impl
+
+import java.net.URLDecoder
+import java.nio.file.{Files, Paths}
+
+import backtype.storm.generated.InvalidTopologyException
+import backtype.storm.utils.{NimbusClient, Utils}
+import com.typesafe.config.{Config, ConfigFactory}
+import org.apache.eagle.common.config.EagleConfigConstants
+import org.apache.eagle.service.application.AppManagerConstants
+import org.apache.eagle.service.application.entity.{TopologyDescriptionEntity, TopologyExecutionEntity, TopologyExecutionStatus}
+import org.apache.eagle.stream.application.{ApplicationManager, ApplicationManagerUtils, ExecutionPlatform, TopologyFactory}
+import org.slf4j.LoggerFactory
+
+import scala.collection.JavaConversions
+
+object StormExecutionPlatform {
+ val ACTIVE: String = "ACTIVE"
+ val INACTIVE: String = "INACTIVE"
+ val KILLED: String = "KILLED"
+ val REBALANCING: String = "REBALANCING"
+}
+
+class StormExecutionPlatform extends ExecutionPlatform {
+ val LOG = LoggerFactory.getLogger(classOf[StormExecutionPlatform])
+
+ private def getNimbusClient(appConfig: com.typesafe.config.Config): NimbusClient = {
+ val conf = Utils.readStormConfig().asInstanceOf[java.util.HashMap[String, Object]]
+ conf.putAll(Utils.readCommandLineOpts().asInstanceOf[java.util.HashMap[String, Object]])
+
+ if(appConfig.hasPath("envContextConfig.nimbusHost")) {
+ LOG.info(s"Setting ${backtype.storm.Config.NIMBUS_HOST} as ${appConfig.getString("envContextConfig.nimbusHost")}")
+ conf.put(backtype.storm.Config.NIMBUS_HOST, appConfig.getString("envContextConfig.nimbusHost"))
+ }
+
+ if(appConfig.hasPath("envContextConfig.nimbusThriftPort")) {
+ LOG.info(s"Setting ${backtype.storm.Config.NIMBUS_THRIFT_PORT} as ${appConfig.getString("envContextConfig.nimbusThriftPort")}")
+ conf.put(backtype.storm.Config.NIMBUS_THRIFT_PORT, appConfig.getNumber("envContextConfig.nimbusThriftPort"))
+ }
+ NimbusClient.getConfiguredClient(conf)
+ }
+
+ def startLocal(topologyName: String, topology: TopologyDescriptionEntity, topologyExecution: TopologyExecutionEntity, config: Config): Unit = {
+ val worker: Thread = ApplicationManager.submit(topologyName, new Runnable {
+ override def run(): Unit = {
+ try {
+ val topologyType = topology.getType.toUpperCase()
+ topologyType match {
+ case TopologyDescriptionEntity.TYPE.CLASS =>
+ TopologyFactory.submit(topology.getExeClass, config)
+ case TopologyDescriptionEntity.TYPE.DYNAMIC =>
+ StormDynamicTopology.submit(topology.getExeClass, config)
+ case m@_ =>
+ LOG.error("Unsupported topology type: " + topology.getType)
+ }
+ } catch {
+ case ex: Throwable =>
+ LOG.error(s"topology $topologyName in local mode is interrupted with ${ex.toString}")
+ }
+ }
+ })
+ topologyExecution.setFullName(topologyName)
+ topologyExecution.setStatus(ApplicationManager.getWorkerStatus(worker.getState))
+ topologyExecution.setDescription("Running inside " + worker.toString + " in local mode")
+ }
+
+ override def start(topology: TopologyDescriptionEntity, topologyExecution: TopologyExecutionEntity, config: Config): Unit = {
+ val stormJarPath: String = URLDecoder.decode(classOf[ExecutionPlatform].getProtectionDomain.getCodeSource.getLocation.getPath, "UTF-8")
+ if (stormJarPath == null || !Files.exists(Paths.get(stormJarPath)) || !stormJarPath.endsWith(".jar")) {
+ val errMsg = s"storm jar file $stormJarPath does not exists, or is a invalid jar file"
+ LOG.error(errMsg)
+ throw new Exception(errMsg)
+ }
+ LOG.info(s"Detected a storm.jar location at: $stormJarPath")
+ System.setProperty("storm.jar", stormJarPath)
+
+ val fullName = ApplicationManagerUtils.generateTopologyFullName(topologyExecution)
+ val extConfigStr = "envContextConfig.topologyName=%s".format(fullName)
+ val extConfig = ConfigFactory.parseString(extConfigStr)
+ val newConfig = extConfig.withFallback(config)
+
+ val mode = if(config.hasPath(AppManagerConstants.RUNNING_MODE)) config.getString(AppManagerConstants.RUNNING_MODE) else EagleConfigConstants.LOCAL_MODE
+ topologyExecution.setMode(mode)
+ if (topologyExecution.getMode.equalsIgnoreCase(EagleConfigConstants.LOCAL_MODE)) {
+ startLocal(fullName, topology, topologyExecution, newConfig)
+ return
+ }
+
+ val topologyType = topology.getType.toUpperCase()
+ topologyType match {
+ case TopologyDescriptionEntity.TYPE.CLASS =>
+ TopologyFactory.submit(topology.getExeClass, newConfig)
+ case TopologyDescriptionEntity.TYPE.DYNAMIC =>
+ StormDynamicTopology.submit(topology.getExeClass, newConfig)
+ case m@_ =>
+ throw new InvalidTopologyException("Unsupported topology type: " + topology.getType)
+ }
+ topologyExecution.setFullName(fullName)
+ //topologyExecution.setStatus(TopologyExecutionStatus.STARTED)
+ }
+
+ override def stop(topologyExecution: TopologyExecutionEntity, config: Config): Unit = {
+ val name: String = ApplicationManagerUtils.generateTopologyFullName(topologyExecution)
+
+ if(topologyExecution.getMode.equalsIgnoreCase(EagleConfigConstants.LOCAL_MODE)) {
+ stopLocal(name, topologyExecution)
+ } else {
+ getNimbusClient(config).getClient.killTopology(name)
+ topologyExecution.setStatus(TopologyExecutionStatus.STOPPING)
+ //topologyExecution.setDescription("")
+ }
+ }
+
+ def stopLocal(name: String, topologyExecution: TopologyExecutionEntity): Unit = {
+ val taskWorker = ApplicationManager.stop(name)
+ topologyExecution.setStatus(ApplicationManager.getWorkerStatus(taskWorker.getState))
+ topologyExecution.setDescription(s"topology status is ${taskWorker.getState}")
+ /*try{
+ ApplicationManager.remove(name)
+ } catch {
+ case ex: IllegalArgumentException =>
+ LOG.warn(s"ApplicationManager.remove($name) failed as it has been removed")
+ }*/
+ }
+
+
+ def getTopology(topologyName: String, config: Config) = {
+ val topologySummery = getNimbusClient(config).getClient.getClusterInfo.get_topologies
+ JavaConversions.collectionAsScalaIterable(topologySummery).find { t => t.get_name.equals(topologyName) }
+ match {
+ case Some(t) => Some(t)
+ case None => None
+ }
+ }
+
+ override def status(topologyExecution: TopologyExecutionEntity, config: Config): Unit = {
+ val name: String = ApplicationManagerUtils.generateTopologyFullName(topologyExecution)
+
+ if(topologyExecution.getMode.equalsIgnoreCase(EagleConfigConstants.LOCAL_MODE)) {
+ statusLocal(name, topologyExecution)
+ } else {
+ val topology = getTopology(name, config)
+ topology match {
+ case Some(topology) =>
+ topologyExecution.setStatus(ApplicationManager.getTopologyStatus(topology.get_status()))
+ topologyExecution.setUrl(ApplicationManagerUtils.buildStormTopologyURL(config, topology.get_id()))
+ topologyExecution.setDescription(topology.toString)
+ case None =>
+ topologyExecution.setStatus(TopologyExecutionStatus.STOPPED)
+ topologyExecution.setUrl("")
+ topologyExecution.setDescription(s"Fail to find topology: $name")
+ }
+ }
+ }
+
+ def statusLocal(name: String, topologyExecution: TopologyExecutionEntity): Unit = {
+ try {
+ val currentStatus = topologyExecution.getStatus()
+ val newStatus = ApplicationManager.getWorkerStatus(ApplicationManager.get(name).getState())
+ if (!currentStatus.equals(newStatus)) {
+ LOG.info("Status of topology: %s changed from %s to %s".format(topologyExecution.getFullName, currentStatus, newStatus))
+ topologyExecution.setStatus(newStatus)
+ topologyExecution.setDescription(String.format("Status of topology: %s changed from %s to %s", name, currentStatus, newStatus))
+ } else if(currentStatus.equalsIgnoreCase(TopologyExecutionStatus.STOPPED)) {
+ ApplicationManager.remove(name)
+ }
+ }catch {
+ case ex: Throwable =>
+ topologyExecution.setDescription("")
+ topologyExecution.setStatus(TopologyExecutionStatus.STOPPED)
+ }
+ }
+
+ override def status(topologyExecutions: java.util.List[TopologyExecutionEntity], config: Config): Unit = {
+ JavaConversions.collectionAsScalaIterable(topologyExecutions) foreach {
+ topologyExecution => status(topologyExecution, config)
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ecf75b28/eagle-core/eagle-application-management/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/scheduler/AppCommandExecutor.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-application-management/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/scheduler/AppCommandExecutor.scala b/eagle-core/eagle-application-management/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/scheduler/AppCommandExecutor.scala
new file mode 100644
index 0000000..8fbf60d
--- /dev/null
+++ b/eagle-core/eagle-application-management/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/scheduler/AppCommandExecutor.scala
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.stream.application.scheduler
+
+import java.util.concurrent.Callable
+
+import akka.actor.{Actor, ActorLogging}
+import akka.dispatch.Futures
+import com.typesafe.config.{Config, ConfigFactory, ConfigParseOptions, ConfigSyntax}
+import org.apache.eagle.common.config.EagleConfigConstants
+import org.apache.eagle.service.application.AppManagerConstants
+import org.apache.eagle.service.application.entity.TopologyOperationEntity.OPERATION
+import org.apache.eagle.service.application.entity.{TopologyExecutionEntity, TopologyExecutionStatus, TopologyOperationEntity}
+import org.apache.eagle.stream.application.{ApplicationSchedulerAsyncDAO, ExecutionPlatformFactory}
+
+import scala.collection.JavaConversions
+import scala.util.{Failure, Success}
+
+
+private[scheduler] class AppCommandExecutor extends Actor with ActorLogging {
+ @volatile var _config: Config = _
+ @volatile var _dao: ApplicationSchedulerAsyncDAO = _
+
+ import context.dispatcher
+
+ def start(topologyExecution: TopologyExecutionEntity, topologyOperation: TopologyOperationEntity) = {
+ val options: ConfigParseOptions = ConfigParseOptions.defaults.setSyntax(ConfigSyntax.PROPERTIES).setAllowMissing(false)
+ _dao.loadTopologyDescriptionByName(topologyOperation.getSite, topologyOperation.getApplication, topologyOperation.getTopology) onComplete {
+ case Success(topology) =>
+ val topologyConfig: Config = ConfigFactory.parseString(topology.getContext, options)
+
+ if(!topologyConfig.hasPath(EagleConfigConstants.APP_CONFIG)) {
+ topologyOperation.setMessage("Fail to detect topology configuration")
+ topologyOperation.setStatus(TopologyOperationEntity.OPERATION_STATUS.FAILED)
+ _dao.updateOperationStatus(topologyOperation)
+ } else {
+ val config = topologyConfig.getConfig(EagleConfigConstants.APP_CONFIG).withFallback(_config)
+ val clusterType = if(config.hasPath(AppManagerConstants.CLUSTER_ENV)) config.getString(AppManagerConstants.CLUSTER_ENV) else AppManagerConstants.EAGLE_CLUSTER_STORM
+ topologyExecution.setEnvironment(clusterType)
+
+ Futures.future(new Callable[TopologyExecutionEntity]{
+ override def call(): TopologyExecutionEntity = {
+ topologyExecution.setStatus(TopologyExecutionStatus.STARTING)
+ _dao.updateTopologyExecutionStatus(topologyExecution)
+ ExecutionPlatformFactory.getApplicationManager(clusterType).start(topology, topologyExecution, config)
+ topologyOperation.setStatus(TopologyOperationEntity.OPERATION_STATUS.SUCCESS)
+ topologyExecution
+ }
+ }, context.dispatcher) onComplete {
+ case Success(topologyExecutionEntity) =>
+ topologyOperation.setStatus(TopologyOperationEntity.OPERATION_STATUS.SUCCESS)
+ updateStatus(topologyExecution, topologyOperation)
+ case Failure(ex) =>
+ topologyOperation.setMessage(ex.getMessage)
+ topologyOperation.setStatus(TopologyOperationEntity.OPERATION_STATUS.FAILED)
+ _dao.updateOperationStatus(topologyOperation)
+ }
+ }
+
+ case Failure(ex) =>
+ topologyOperation.setMessage(ex.getMessage)
+ topologyOperation.setStatus(TopologyOperationEntity.OPERATION_STATUS.FAILED)
+ _dao.updateOperationStatus(topologyOperation)
+ }
+ }
+
+ def stop(topologyExecution: TopologyExecutionEntity, topologyOperation: TopologyOperationEntity) = {
+ val clusterType = topologyExecution.getEnvironment
+
+ Futures.future(new Callable[TopologyExecutionEntity]{
+ override def call(): TopologyExecutionEntity = {
+ topologyExecution.setStatus(TopologyExecutionStatus.STOPPING)
+ _dao.updateTopologyExecutionStatus(topologyExecution)
+ ExecutionPlatformFactory.getApplicationManager(clusterType).stop(topologyExecution, _config)
+ topologyOperation.setStatus(TopologyOperationEntity.OPERATION_STATUS.SUCCESS)
+ topologyExecution
+ }
+ }, context.dispatcher) onComplete {
+ case Success(topologyExecutionEntity) =>
+ topologyOperation.setStatus(TopologyOperationEntity.OPERATION_STATUS.SUCCESS)
+ updateStatus(topologyExecution, topologyOperation)
+ case Failure(ex) =>
+ topologyOperation.setMessage(ex.toString)
+ topologyOperation.setStatus(TopologyOperationEntity.OPERATION_STATUS.FAILED)
+ _dao.updateOperationStatus(topologyOperation)
+ }
+ }
+
+ def status(topologyExecution: TopologyExecutionEntity) = {
+ val clusterType = topologyExecution.getEnvironment
+
+ Futures.future(new Callable[TopologyExecutionEntity]{
+ override def call(): TopologyExecutionEntity = {
+ ExecutionPlatformFactory.getApplicationManager(clusterType).status(topologyExecution, _config)
+ topologyExecution
+ }
+ }, context.dispatcher) onComplete {
+ case _ =>
+ _dao.updateTopologyExecutionStatus(topologyExecution)
+ }
+ }
+
+ def updateStatus(topologyExecution: TopologyExecutionEntity, topologyOperation: TopologyOperationEntity): Unit = {
+ _dao.updateOperationStatus(topologyOperation)
+ _dao.updateTopologyExecutionStatus(topologyExecution)
+ }
+
+ def execute(topologyExecution: TopologyExecutionEntity, topologyOperation: TopologyOperationEntity): Unit = {
+ try {
+ topologyOperation.getOperation match {
+ case OPERATION.START =>
+ start(topologyExecution, topologyOperation)
+ case OPERATION.STOP =>
+ stop(topologyExecution, topologyOperation)
+ case m@_ =>
+ log.warning("Unsupported operation: " + topologyOperation)
+ throw new Exception(s"Unsupported operation: ${topologyOperation.getOperation}, possible values are START/STOP")
+ }
+ } catch {
+ case e: Throwable =>
+ topologyOperation.setMessage(e.getMessage)
+ topologyOperation.setStatus(TopologyOperationEntity.OPERATION_STATUS.FAILED)
+ _dao.updateOperationStatus(topologyOperation)
+ }
+ }
+
+ override def receive = {
+ case InitializationEvent(config: Config) =>
+ _config = config
+ _dao = new ApplicationSchedulerAsyncDAO(config, context.dispatcher)
+ case SchedulerCommand(topologyExecution, topologyOperation) =>
+ execute(topologyExecution, topologyOperation)
+ case HealthCheckerEvent =>
+ _dao.loadAllTopologyExecutionEntities() onComplete {
+ case Success(topologyExecutions) =>
+ log.info(s"Load ${topologyExecutions.size()} topologies in execution")
+ JavaConversions.collectionAsScalaIterable(topologyExecutions) foreach { topologyExecution =>
+ try{
+ status(topologyExecution)
+ } catch {
+ case ex: Throwable =>
+ log.error(ex.getMessage)
+ }
+ }
+ case Failure(ex) =>
+ log.error(s"Fail to load any topologyExecutionEntity due to Exception: ${ex.getMessage}")
+ }
+ case TerminatedEvent =>
+ context.stop(self)
+ case m@_ =>
+ log.warning("Unsupported operation $m")
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ecf75b28/eagle-core/eagle-application-management/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/scheduler/AppCommandLoader.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-application-management/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/scheduler/AppCommandLoader.scala b/eagle-core/eagle-application-management/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/scheduler/AppCommandLoader.scala
new file mode 100644
index 0000000..c731846
--- /dev/null
+++ b/eagle-core/eagle-application-management/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/scheduler/AppCommandLoader.scala
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.stream.application.scheduler
+
+import akka.actor.{Actor, ActorLogging}
+import com.typesafe.config.Config
+import org.apache.eagle.service.application.entity.TopologyOperationEntity.OPERATION_STATUS
+import org.apache.eagle.stream.application.ApplicationSchedulerAsyncDAO
+
+import scala.collection.JavaConversions
+import scala.util.{Failure, Success}
+
+
+private[scheduler] class AppCommandLoader extends Actor with ActorLogging {
+ @volatile var _config: Config = null
+ @volatile var _dao: ApplicationSchedulerAsyncDAO = null
+
+ import context.dispatcher
+
+ override def receive = {
+ case InitializationEvent(config: Config) =>
+ _config = config
+ _dao = new ApplicationSchedulerAsyncDAO(config, context.dispatcher)
+ case ClearPendingOperation =>
+ if(_dao == null) _dao = new ApplicationSchedulerAsyncDAO(_config, context.dispatcher)
+ _dao.clearPendingOperations()
+ case CommandLoaderEvent => {
+ val _sender = sender()
+ _dao.readOperationsByStatus(OPERATION_STATUS.INITIALIZED) onComplete {
+ case Success(commands) => {
+ log.info(s"Load ${commands.size()} new commands")
+ JavaConversions.collectionAsScalaIterable(commands) foreach { command =>
+ command.setStatus(OPERATION_STATUS.PENDING)
+ _dao.updateOperationStatus(command) onComplete {
+ case Success(response) =>
+ _dao.loadTopologyExecutionByName(command.getSite, command.getApplication, command.getTopology) onComplete {
+ case Success(topologyExecution) => {
+ _sender ! SchedulerCommand(topologyExecution, command)
+ }
+ case Failure(ex) =>
+ log.error(ex.getMessage)
+ command.setMessage(ex.getMessage)
+ command.setStatus(OPERATION_STATUS.FAILED)
+ _dao.updateOperationStatus(command)
+ }
+ case Failure(ex) =>
+ log.error(s"Got an exception to update command status $command: ${ex.getMessage}")
+ command.setMessage(ex.getMessage)
+ command.setStatus(OPERATION_STATUS.FAILED)
+ _dao.updateOperationStatus(command)
+ }
+ }
+ }
+ case Failure(ex) =>
+ log.error(s"Failed to get commands due to exception ${ex.getMessage}")
+ }
+ }
+ case TerminatedEvent =>
+ context.stop(self)
+ case m@_ => throw new UnsupportedOperationException(s"Event is not supported $m")
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ecf75b28/eagle-core/eagle-application-management/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/scheduler/ApplicationScheduler.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-application-management/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/scheduler/ApplicationScheduler.scala b/eagle-core/eagle-application-management/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/scheduler/ApplicationScheduler.scala
new file mode 100644
index 0000000..476a3fb
--- /dev/null
+++ b/eagle-core/eagle-application-management/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/scheduler/ApplicationScheduler.scala
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.stream.application.scheduler
+
+import akka.actor.{ActorSystem, Props}
+import com.typesafe.config.Config
+import org.apache.eagle.service.application.AppManagerConstants
+import org.apache.eagle.service.application.entity.{TopologyExecutionEntity, TopologyOperationEntity}
+import org.apache.eagle.stream.application.ApplicationManager
+
+import scala.concurrent.duration._
+
+
+private[scheduler] class ScheduleEvent
+private[scheduler] case class InitializationEvent(config: Config) extends ScheduleEvent
+private[scheduler] case class TerminatedEvent() extends ScheduleEvent
+private[scheduler] case class CommandLoaderEvent() extends ScheduleEvent
+private[scheduler] case class HealthCheckerEvent() extends ScheduleEvent
+private[scheduler] case class ClearPendingOperation() extends ScheduleEvent
+private[scheduler] case class SchedulerCommand(topologyExecution: TopologyExecutionEntity, topologyOperation: TopologyOperationEntity) extends ScheduleEvent
+
+case class EagleServiceUnavailableException(message:String) extends Exception(message)
+case class DuplicatedDefinitionException(message:String) extends Exception(message)
+case class LoadTopologyFailureException(message:String) extends Exception(message)
+
+
+/**
+ * 1. Sync command from eagle service
+ * 2. Coordinate command to different actor
+ * 3. Actor execute command as requested
+ */
+class ApplicationScheduler {
+ //val config = ConfigFactory.load()
+ val DEFAULT_COMMAND_LOADER_INTERVAL_SECS = 2
+ val DEFAULT_HEALTH_CHECK_INTERVAL_SECS = 5
+
+ def start(config: Config) = {
+ val system = ActorSystem("application-manager-scheduler", config)
+ system.log.info(s"Started actor system: $system")
+
+ import system.dispatcher
+
+ val commandLoaderIntervalSecs: Long = if(config.hasPath(AppManagerConstants.APP_COMMAND_LOADER_INTERVAL_SECS)) config.getLong(AppManagerConstants.APP_COMMAND_LOADER_INTERVAL_SECS) else DEFAULT_COMMAND_LOADER_INTERVAL_SECS
+ val healthCheckIntervalSecs: Long = if(config.hasPath(AppManagerConstants.APP_HEALTH_CHECK_INTERVAL_SECS)) config.getLong(AppManagerConstants.APP_HEALTH_CHECK_INTERVAL_SECS) else DEFAULT_HEALTH_CHECK_INTERVAL_SECS
+
+ val coordinator = system.actorOf(Props[StreamAppCoordinator])
+ system.scheduler.scheduleOnce(0 seconds, coordinator, InitializationEvent(config))
+ system.scheduler.scheduleOnce(1 seconds, coordinator, ClearPendingOperation)
+ system.scheduler.schedule(2.seconds, commandLoaderIntervalSecs.seconds, coordinator, CommandLoaderEvent)
+ system.scheduler.schedule(10.seconds, healthCheckIntervalSecs.seconds, coordinator, HealthCheckerEvent)
+
+ /*
+ registerOnTermination is called when you have shut down the ActorSystem (system.shutdown),
+ and the callbacks will be executed after all actors have been stopped.
+ */
+ system.registerOnTermination(new Runnable {
+ override def run(): Unit = {
+ coordinator ! TerminatedEvent
+ ApplicationManager.stopAll()
+ }
+ })
+ system
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ecf75b28/eagle-core/eagle-application-management/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/scheduler/StreamAppCoordinator.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-application-management/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/scheduler/StreamAppCoordinator.scala b/eagle-core/eagle-application-management/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/scheduler/StreamAppCoordinator.scala
new file mode 100644
index 0000000..17006ee
--- /dev/null
+++ b/eagle-core/eagle-application-management/eagle-stream-application-manager/src/main/scala/org/apache/eagle/stream/application/scheduler/StreamAppCoordinator.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.stream.application.scheduler
+
+import akka.actor.{Actor, ActorLogging, ActorRef, Props}
+
+private[scheduler] class StreamAppCoordinator extends Actor with ActorLogging {
+ var commandLoader: ActorRef = null
+ var commandExecutor: ActorRef = null
+
+
+ override def preStart(): Unit = {
+ commandLoader = context.actorOf(Props[AppCommandLoader], "command-loader")
+ commandExecutor = context.actorOf(Props[AppCommandExecutor], "command-worker")
+ }
+
+ override def receive = {
+ case InitializationEvent(config) => {
+ log.info(s"Config updated: $config")
+ commandLoader ! InitializationEvent(config)
+ commandExecutor ! InitializationEvent(config)
+ }
+ case ClearPendingOperation =>
+ commandLoader ! ClearPendingOperation
+ case CommandLoaderEvent =>
+ commandLoader ! CommandLoaderEvent
+ case command: SchedulerCommand =>
+ log.info(s"Executing command: $SchedulerCommand")
+ commandExecutor ! command
+ case HealthCheckerEvent =>
+ commandExecutor ! HealthCheckerEvent
+ case TerminatedEvent =>
+ log.info("Coordinator exit ...")
+ context.stop(self)
+ case m@_ =>
+ log.warning(s"Coordinator Unsupported message: $m")
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ecf75b28/eagle-core/eagle-application-management/eagle-stream-application-manager/src/test/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-application-management/eagle-stream-application-manager/src/test/resources/application.conf b/eagle-core/eagle-application-management/eagle-stream-application-manager/src/test/resources/application.conf
new file mode 100644
index 0000000..4c21a7c
--- /dev/null
+++ b/eagle-core/eagle-application-management/eagle-stream-application-manager/src/test/resources/application.conf
@@ -0,0 +1,42 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+### scheduler propertise
+appCommandLoaderIntervalSecs = 1
+appHealthCheckIntervalSecs = 5
+
+### execution platform properties
+envContextConfig.env = "storm"
+envContextConfig.url = "http://sandbox.hortonworks.com:8744"
+envContextConfig.nimbusHost = "sandbox.hortonworks.com"
+envContextConfig.nimbusThriftPort = 6627
+envContextConfig.jarFile = "/dir-to-jar/eagle-topology-0.3.0-incubating-assembly.jar"
+
+### default topology properties
+eagleProps.mailHost = "mailHost.com"
+eagleProps.mailSmtpPort = "25"
+eagleProps.mailDebug = "true"
+eagleProps.eagleService.host = "localhost"
+eagleProps.eagleService.port = 9099
+eagleProps.eagleService.username = "admin"
+eagleProps.eagleService.password = "secret"
+eagleProps.dataJoinPollIntervalSec = 30
+
+dynamicConfigSource.enabled = true
+dynamicConfigSource.initDelayMillis = 0
+dynamicConfigSource.delayMillis = 30000
+
+
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ecf75b28/eagle-core/eagle-application-management/eagle-stream-application-manager/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-application-management/eagle-stream-application-manager/src/test/resources/log4j.properties b/eagle-core/eagle-application-management/eagle-stream-application-manager/src/test/resources/log4j.properties
new file mode 100644
index 0000000..25331ab
--- /dev/null
+++ b/eagle-core/eagle-application-management/eagle-stream-application-manager/src/test/resources/log4j.properties
@@ -0,0 +1,35 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+log4j.rootLogger=INFO, stdout
+
+ eagle.log.dir=../logs
+ eagle.log.file=eagle.log
+
+# standard output
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n
+
+# Daily Rolling File Appender
+ log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender
+ log4j.appender.DRFA.File=${eagle.log.dir}/${eagle.log.file}
+ log4j.appender.DRFA.DatePattern=.yyyy-MM-dd
+## 30-day backup
+# log4j.appender.DRFA.MaxBackupIndex=30
+ log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout
+
+# Pattern format: Date LogLevel LoggerName LogMessage
+log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ecf75b28/eagle-core/eagle-application-management/eagle-stream-application-manager/src/test/scala/org/apache/eagle/stream/application/scheduler/MockTopology.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-application-management/eagle-stream-application-manager/src/test/scala/org/apache/eagle/stream/application/scheduler/MockTopology.scala b/eagle-core/eagle-application-management/eagle-stream-application-manager/src/test/scala/org/apache/eagle/stream/application/scheduler/MockTopology.scala
new file mode 100644
index 0000000..e87ee92
--- /dev/null
+++ b/eagle-core/eagle-application-management/eagle-stream-application-manager/src/test/scala/org/apache/eagle/stream/application/scheduler/MockTopology.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.stream.application.scheduler
+
+import com.typesafe.config.Config
+import org.apache.eagle.stream.application.TopologyExecutable
+import org.slf4j.LoggerFactory
+
+class MockTopology extends TopologyExecutable {
+ private val LOG = LoggerFactory.getLogger(classOf[MockTopology])
+ override def submit(topology: String, config: Config): Unit = {
+ LOG.info(s"$topology is running")
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ecf75b28/eagle-core/eagle-application-management/eagle-stream-application-manager/src/test/scala/org/apache/eagle/stream/application/scheduler/StormApplicationManagerSpec.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-application-management/eagle-stream-application-manager/src/test/scala/org/apache/eagle/stream/application/scheduler/StormApplicationManagerSpec.scala b/eagle-core/eagle-application-management/eagle-stream-application-manager/src/test/scala/org/apache/eagle/stream/application/scheduler/StormApplicationManagerSpec.scala
new file mode 100644
index 0000000..1cad3a7
--- /dev/null
+++ b/eagle-core/eagle-application-management/eagle-stream-application-manager/src/test/scala/org/apache/eagle/stream/application/scheduler/StormApplicationManagerSpec.scala
@@ -0,0 +1,40 @@
+package org.apache.eagle.stream.application.scheduler
+
+import com.typesafe.config.ConfigFactory
+import org.apache.eagle.common.config.EagleConfigConstants
+import org.apache.eagle.stream.application.ExecutionPlatform
+import org.apache.eagle.stream.application.impl.StormExecutionPlatform
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+
+object StormApplicationManagerSpec extends App {
+ val manager: ExecutionPlatform = new StormExecutionPlatform
+ val baseConfig = ConfigFactory.load()
+ val topoConfigStr: String = "webConfig{\"hbase.zookeeper.property.clientPort\":\"2181\", \"hbase.zookeeper.quorum\":\"localhost\"}\nappConfig{\n \"envContextConfig\" : {\n \"env\" : \"storm\",\n \"mode\" : \"cluster\",\n \"topologyName\" : \"sandbox-hbaseSecurityLog-topology\",\n \"stormConfigFile\" : \"security-auditlog-storm.yaml\",\n \"parallelismConfig\" : {\n \"kafkaMsgConsumer\" : 1,\n \"hbaseSecurityLogAlertExecutor*\" : 1\n }\n },\n \"dataSourceConfig\": {\n \"topic\" : \"sandbox_hbase_security_log\",\n \"zkConnection\" : \"127.0.0.1:2181\",\n \"zkConnectionTimeoutMS\" : 15000,\n \"brokerZkPath\" : \"/brokers\",\n \"fetchSize\" : 1048586,\n \"deserializerClass\" : \"org.apache.eagle.security.hbase.parse.HbaseAuditLogKafkaDeserializer\",\n \"transactionZKServers\" : \"127.0.0.1\",\n \"transactionZKPort\" : 2181,\n \"transactionZKRoot\" : \"/consumers\",\n \"consumerGroupId\" : \"eagle.hbasesecurity.consumer\",\n
\"transactionStateUpdateMS\" : 2000\n },\n \"alertExecutorConfigs\" : {\n \"hbaseSecurityLogAlertExecutor\" : {\n \"parallelism\" : 1,\n \"partitioner\" : \"org.apache.eagle.policy.DefaultPolicyPartitioner\"\n \"needValidation\" : \"true\"\n }\n },\n \"eagleProps\" : {\n \"site\" : \"sandbox\",\n \"application\": \"hbaseSecurityLog\",\n \"dataJoinPollIntervalSec\" : 30,\n \"mailHost\" : \"mailHost.com\",\n \"mailSmtpPort\":\"25\",\n \"mailDebug\" : \"true\",\n \"eagleService\": {\n \"host\": \"localhost\",\n \"port\": 9099\n \"username\": \"admin\",\n \"password\": \"secret\"\n }\n },\n \"dynamicConfigSource\" : {\n \"enabled\" : true,\n \"initDelayMillis\" : 0,\n \"delayMillis\" : 30000\n }\n}"
+
+ val topoConfig = ConfigFactory.parseString(topoConfigStr)
+ val conf = topoConfig.getConfig(EagleConfigConstants.APP_CONFIG).withFallback(baseConfig)
+
+ //val (ret, nextState) = manager.execute("START", topologyDescModel, null, conf)
+ //println(s"Result: ret=$ret, nextState=$nextState")
+}
+
+
+
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ecf75b28/eagle-core/eagle-application-management/eagle-stream-application-manager/src/test/scala/org/apache/eagle/stream/application/scheduler/TestScheduler.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-application-management/eagle-stream-application-manager/src/test/scala/org/apache/eagle/stream/application/scheduler/TestScheduler.scala b/eagle-core/eagle-application-management/eagle-stream-application-manager/src/test/scala/org/apache/eagle/stream/application/scheduler/TestScheduler.scala
new file mode 100644
index 0000000..3db2d67
--- /dev/null
+++ b/eagle-core/eagle-application-management/eagle-stream-application-manager/src/test/scala/org/apache/eagle/stream/application/scheduler/TestScheduler.scala
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.stream.application.scheduler
+
+import akka.actor.{ActorSystem, Props}
+import akka.testkit.{TestActorRef, TestKit}
+import com.typesafe.config.ConfigFactory
+import org.scalatest.{Ignore, BeforeAndAfterAll, MustMatchers, WordSpecLike}
+
+@Ignore
+class TestSchedulerSpec extends TestKit(ActorSystem("stream-app-scheduler"))
+with WordSpecLike with MustMatchers with BeforeAndAfterAll {
+
+ "A Scheduler actor" must {
+ "Forward a message it receives" in {
+ val coordinator = TestActorRef[StreamAppCoordinator]
+ coordinator ! CommandLoaderEvent
+ expectNoMsg()
+ }
+ }
+
+ "A Integrated test" must {
+ "run end-to-end" in {
+ val coordinator = system.actorOf(Props[StreamAppCoordinator])
+ coordinator ! CommandLoaderEvent
+ expectNoMsg()
+ }
+ }
+
+ override def afterAll(): Unit = {
+ super.afterAll()
+ system.shutdown()
+ }
+}
+
+@Ignore
+object TestStreamAppScheduler extends App {
+ val conf: String = """
+ akka.loglevel = "DEBUG"
+ akka.actor.debug {
+ receive = on
+ lifecycle = on
+ }
+ """
+ new ApplicationScheduler().start(ConfigFactory.parseString(conf))
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ecf75b28/eagle-core/eagle-application-management/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-application-management/pom.xml b/eagle-core/eagle-application-management/pom.xml
new file mode 100644
index 0000000..0637d7e
--- /dev/null
+++ b/eagle-core/eagle-application-management/pom.xml
@@ -0,0 +1,40 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ ~
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>eagle-core</artifactId>
+ <groupId>org.apache.eagle</groupId>
+ <version>0.3.0-incubating</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>eagle-application-management</artifactId>
+ <packaging>pom</packaging>
+ <description>Eagle Application Management</description>
+
+ <modules>
+ <module>eagle-stream-application-manager</module>
+ <module>eagle-application-service</module>
+ </modules>
+
+
+</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ecf75b28/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/parser/Pipeline.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/parser/Pipeline.scala b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/parser/Pipeline.scala
index cc1e009..eb09156 100644
--- a/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/parser/Pipeline.scala
+++ b/eagle-core/eagle-data-process/eagle-stream-pipeline/src/main/scala/org/apache/eagle/stream/pipeline/parser/Pipeline.scala
@@ -78,6 +78,12 @@ trait PipelineParser{
}
def parseString(config:String):Pipeline = parse(ConfigFactory.parseString(config))
+
+ def parseStringWithConfig(dataFlow:String, config: Config) = {
+ val pConfig = config.withFallback(ConfigFactory.parseString(dataFlow))
+ parse(pConfig)
+ }
+
def parseResource(resource:String):Pipeline = {
// TODO: Load environment, currently hard-code with storm
if(resource.startsWith("/") || resource.startsWith("./")){
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ecf75b28/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormTopologyExecutorImpl.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormTopologyExecutorImpl.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormTopologyExecutorImpl.scala
index 5d64c4c..54d09e6 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormTopologyExecutorImpl.scala
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/storm/StormTopologyExecutorImpl.scala
@@ -22,6 +22,7 @@ import _root_.storm.trident.spout.RichSpoutBatchExecutor
import backtype.storm.generated.StormTopology
import backtype.storm.utils.Utils
import backtype.storm.{Config, LocalCluster, StormSubmitter}
+import org.apache.eagle.common.config.EagleConfigConstants
import org.apache.eagle.datastream.core.AbstractTopologyExecutor
import org.apache.thrift7.transport.TTransportException
import org.slf4j.LoggerFactory
@@ -31,7 +32,7 @@ case class StormTopologyExecutorImpl(topology: StormTopology, config: com.typesa
val LOG = LoggerFactory.getLogger(classOf[StormTopologyExecutorImpl])
@throws(classOf[Exception])
def execute {
- val localMode: Boolean = config.getString("envContextConfig.mode").equalsIgnoreCase("local")
+ val localMode: Boolean = config.getString("envContextConfig.mode").equalsIgnoreCase(EagleConfigConstants.LOCAL_MODE)
val conf: Config = new Config
conf.put(RichSpoutBatchExecutor.MAX_BATCH_SIZE_CONF, Int.box(64 * 1024))
conf.put(Config.TOPOLOGY_RECEIVER_BUFFER_SIZE, Int.box(8))
@@ -91,12 +92,15 @@ case class StormTopologyExecutorImpl(topology: StormTopology, config: com.typesa
LOG.info("Submitting as local mode")
val cluster: LocalCluster = new LocalCluster
cluster.submitTopology(topologyName, conf, topology)
- while(true) {
- try {
+ try {
+ while(true) {
Utils.sleep(Integer.MAX_VALUE)
- } catch {
- case _: Throwable => () // Do nothing
}
+ } catch {
+ case ex: Throwable =>
+ LOG.warn("Sleep is interrupted with " + ex.toString)
+ cluster.killTopology(topologyName)
+ cluster.shutdown
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ecf75b28/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/common/Constants.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/common/Constants.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/common/Constants.java
index ca65669..2a33460 100644
--- a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/common/Constants.java
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/common/Constants.java
@@ -32,6 +32,10 @@ public class Constants {
public static final String APPLICATION_DESCRIPTION_SERVICE_ENDPOINT_NAME = "ApplicationDescService";
public static final String FEATURE_DESCRIPTION_SERVICE_ENDPOINT_NAME = "FeatureDescService";
+ public static final String TOPOLOGY_EXECUTION_SERVICE_ENDPOINT_NAME = "TopologyExecutionService";
+ public static final String TOPOLOGY_OPERATION_SERVICE_ENDPOINT_NAME = "TopologyOperationService";
+ public static final String TOPOLOGY_DESCRIPTION_SERVICE_ENDPOINT_NAME = "TopologyDescriptionService";
+
public static final String GENERIC_RESOURCE_SERVICE_ENDPOINT_NAME = "GenericResourceService";
public final static String AGGREGATE_DEFINITION_SERVICE_ENDPOINT_NAME = "AggregateDefinitionService";
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ecf75b28/eagle-core/eagle-query/eagle-common/src/main/java/org/apache/eagle/common/config/EagleConfigConstants.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-common/src/main/java/org/apache/eagle/common/config/EagleConfigConstants.java b/eagle-core/eagle-query/eagle-common/src/main/java/org/apache/eagle/common/config/EagleConfigConstants.java
index 0a035b8..26d7b49 100644
--- a/eagle-core/eagle-query/eagle-common/src/main/java/org/apache/eagle/common/config/EagleConfigConstants.java
+++ b/eagle-core/eagle-query/eagle-common/src/main/java/org/apache/eagle/common/config/EagleConfigConstants.java
@@ -58,5 +58,8 @@ public final class EagleConfigConstants {
public final static String WEB_CONFIG = "web";
public final static String APP_CONFIG = "app";
+ public final static String CLASSIFICATION_CONFIG = "classification";
+ public final static String LOCAL_MODE = "local";
+ public final static String CLUSTER_MODE = "cluster";
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/ecf75b28/eagle-core/eagle-query/eagle-storage-base/src/main/java/org/apache/eagle/storage/operation/CompiledQuery.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-query/eagle-storage-base/src/main/java/org/apache/eagle/storage/operation/CompiledQuery.java b/eagle-core/eagle-query/eagle-storage-base/src/main/java/org/apache/eagle/storage/operation/CompiledQuery.java
index 34761cb..e3368ab 100644
--- a/eagle-core/eagle-query/eagle-storage-base/src/main/java/org/apache/eagle/storage/operation/CompiledQuery.java
+++ b/eagle-core/eagle-query/eagle-storage-base/src/main/java/org/apache/eagle/storage/operation/CompiledQuery.java
@@ -214,10 +214,23 @@ public class CompiledQuery {
EntityDefinition ed = EntityDefinitionManager.getEntityByServiceName(serviceName);
if(ed.isTimeSeries()){
// TODO check Time exists for timeseries or topology data
- this.searchCondition.setStartTime(this.rawQuery.getStartTime());
- this.searchCondition.setEndTime(this.rawQuery.getEndTime());
- this.setStartTime(DateTimeUtil.humanDateToSeconds(this.getRawQuery().getStartTime()) * 1000);
- this.setEndTime(DateTimeUtil.humanDateToSeconds(this.getRawQuery().getEndTime()) * 1000);
+ long endTimeMillis = System.currentTimeMillis();
+ long startTimeMills = endTimeMillis - 30 * DateTimeUtil.ONEDAY;
+ String endTime = DateTimeUtil.millisecondsToHumanDateWithSeconds(endTimeMillis);
+ String startTime = DateTimeUtil.millisecondsToHumanDateWithSeconds(startTimeMills);
+
+ if(this.rawQuery.getStartTime() != null && this.rawQuery.getEndTime() != null) {
+ endTime = this.rawQuery.getEndTime();
+ startTime = this.rawQuery.getStartTime();
+ endTimeMillis = DateTimeUtil.humanDateToSeconds(endTime) * 1000;
+ startTimeMills = DateTimeUtil.humanDateToSeconds(startTime) * 1000;
+ } else {
+ LOG.warn("startTime or endTime is not given, use [currentSystemTime - 30 days, currentSystemTime]");
+ }
+ this.searchCondition.setStartTime(startTime);
+ this.searchCondition.setEndTime(endTime);
+ this.setStartTime(startTimeMills);
+ this.setEndTime(endTimeMillis);
}else{
this.searchCondition.setStartTime("0");
this.searchCondition.setEndTime("1");