You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by re...@apache.org on 2014/10/30 13:31:50 UTC
[1/2] git commit: support cluster created when auto-recovery case in
the member fault
Repository: stratos
Updated Branches:
refs/heads/4.0.0-grouping 58f42cee3 -> 65d96bce5
support cluster created when auto-recovery case in the member fault
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/65d96bce
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/65d96bce
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/65d96bce
Branch: refs/heads/4.0.0-grouping
Commit: 65d96bce5237a4bee999fe35d21956d886dca1cc
Parents: 2414bca
Author: reka <rt...@gmail.com>
Authored: Thu Oct 30 18:01:17 2014 +0530
Committer: reka <rt...@gmail.com>
Committed: Thu Oct 30 18:01:38 2014 +0530
----------------------------------------------------------------------
.../grouping/topic/StatusEventPublisher.java | 24 ++++++++++++++++++++
.../monitor/ParentComponentMonitor.java | 2 ++
.../status/checker/StatusChecker.java | 19 ++++++++++++++--
3 files changed, 43 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/stratos/blob/65d96bce/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/grouping/topic/StatusEventPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/grouping/topic/StatusEventPublisher.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/grouping/topic/StatusEventPublisher.java
index 7bbe8ce..8058c85 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/grouping/topic/StatusEventPublisher.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/grouping/topic/StatusEventPublisher.java
@@ -18,6 +18,30 @@ import java.util.Set;
public class StatusEventPublisher {
private static final Log log = LogFactory.getLog(StatusEventPublisher.class);
+ public static void sendClusterCreatedEvent(String appId, String serviceName, String clusterId) {
+ try {
+ TopologyManager.acquireReadLockForCluster(serviceName, clusterId);
+ Service service = TopologyManager.getTopology().getService(serviceName);
+ if (service != null) {
+ Cluster cluster = service.getCluster(clusterId);
+ if (cluster.isStateTransitionValid(ClusterStatus.Active)) {
+ if (log.isInfoEnabled()) {
+ log.info("Publishing Cluster activated event for [application]: " + appId +
+ " [cluster]: " + clusterId);
+ }
+ AppStatusClusterActivatedEvent clusterActivatedEvent =
+ new AppStatusClusterActivatedEvent(appId, serviceName, clusterId);
+
+ publishEvent(clusterActivatedEvent);
+ } else {
+ log.warn("Active is not in the possible state list of [cluster] " + clusterId);
+ }
+ }
+ } finally {
+ TopologyManager.releaseReadLockForCluster(serviceName, clusterId);
+ }
+ }
+
public static void sendClusterActivatedEvent(String appId, String serviceName, String clusterId) {
try {
TopologyManager.acquireReadLockForCluster(serviceName, clusterId);
http://git-wip-us.apache.org/repos/asf/stratos/blob/65d96bce/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ParentComponentMonitor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ParentComponentMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ParentComponentMonitor.java
index ced698c..91d5275 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ParentComponentMonitor.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ParentComponentMonitor.java
@@ -205,6 +205,8 @@ public abstract class ParentComponentMonitor extends Monitor {
//Find the non existent monitor by traversing dependency tree
try {
this.startDependencyOnTermination();
+ List<ApplicationContext> applicationContexts = this.dependencyTree.
+ getStarAbleDependenciesByTermination();
} catch (TopologyInConsistentException e) {
//TODO revert the siblings and notify parent, change a flag for reverting/un-subscription
log.error("Error while starting the monitor upon termination" + e);
http://git-wip-us.apache.org/repos/asf/stratos/blob/65d96bce/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/status/checker/StatusChecker.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/status/checker/StatusChecker.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/status/checker/StatusChecker.java
index bc08c56..97f6476 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/status/checker/StatusChecker.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/status/checker/StatusChecker.java
@@ -89,12 +89,24 @@ public class StatusChecker {
TopologyManager.acquireReadLockForCluster(monitor.getServiceId(), monitor.getClusterId());
Service service = TopologyManager.getTopology().getService(monitor.getServiceId());
Cluster cluster;
+ String appId = monitor.getAppId();
if (service != null) {
cluster = service.getCluster(monitor.getClusterId());
if (cluster != null) {
+ try {
+
+ TopologyManager.acquireReadLockForApplication(appId);
+ Application application = TopologyManager.getTopology().getApplication(appId);
+
if (!clusterMonitorHasMembers && cluster.getStatus() == ClusterStatus.Terminating) {
- StatusEventPublisher.sendClusterTerminatedEvent(monitor.getAppId(), monitor.getServiceId(),
- monitor.getClusterId());
+ if(application.getStatus() == ApplicationStatus.Terminating) {
+ StatusEventPublisher.sendClusterTerminatedEvent(appId, monitor.getServiceId(),
+ monitor.getClusterId());
+ } else {
+ StatusEventPublisher.sendClusterCreatedEvent(appId, monitor.getServiceId(),
+ monitor.getClusterId());
+ }
+
} else {
log.info("Cluster has non terminated [members] and in the [status] "
+ cluster.getStatus().toString());
@@ -107,6 +119,9 @@ public class StatusChecker {
}*/
}
+ } finally {
+ TopologyManager.releaseReadLockForApplication(appId);
+ }
}
}
[2/2] git commit: adding created events for cluster and group in
topology and in app statuc topics
Posted by re...@apache.org.
adding created events for cluster and group in topology and in app statuc topics
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/2414bca4
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/2414bca4
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/2414bca4
Branch: refs/heads/4.0.0-grouping
Commit: 2414bca4bf4ffcd373f010156c7d683afd11567d
Parents: 58f42ce
Author: reka <rt...@gmail.com>
Authored: Thu Oct 30 17:57:09 2014 +0530
Committer: reka <rt...@gmail.com>
Committed: Thu Oct 30 18:01:38 2014 +0530
----------------------------------------------------------------------
.../status/AppStatusClusterCreatedEvent.java | 50 +++++++++
.../status/AppStatusGroupCreatedEvent.java | 44 ++++++++
.../event/topology/GroupCreatedEvent.java | 43 ++++++++
.../AppStatusClusterCreatedEventListener.java | 24 ++++
.../AppStatusGroupCreatedEventListener.java | 24 ++++
.../topology/GroupCreatedEventListener.java | 27 +++++
...AppStatusClusterCreatedMessageProcessor.java | 58 ++++++++++
.../AppStatusGroupCreatedMessageProcessor.java | 62 +++++++++++
.../status/AppStatusMessageProcessorChain.java | 15 ++-
.../topology/GroupCreatedProcessor.java | 109 +++++++++++++++++++
.../topology/TopologyMessageProcessorChain.java | 6 +
11 files changed, 461 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/stratos/blob/2414bca4/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/application/status/AppStatusClusterCreatedEvent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/application/status/AppStatusClusterCreatedEvent.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/application/status/AppStatusClusterCreatedEvent.java
new file mode 100644
index 0000000..6480af2
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/application/status/AppStatusClusterCreatedEvent.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.messaging.event.application.status;
+
+/**
+ * This event is fired by cartridge agent when it has started the server and
+ * applications are ready to serve the incoming requests.
+ */
+public class AppStatusClusterCreatedEvent extends StatusEvent {
+ private static final long serialVersionUID = 2625412714611885089L;
+
+ private final String serviceName;
+ private final String clusterId;
+ private String appId;
+
+ public AppStatusClusterCreatedEvent(String appId, String serviceName, String clusterId) {
+ this.serviceName = serviceName;
+ this.clusterId = clusterId;
+ this.appId = appId;
+ }
+
+ public String getServiceName() {
+ return serviceName;
+ }
+
+ public String getClusterId() {
+ return clusterId;
+ }
+
+ public String getAppId() {
+ return appId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/stratos/blob/2414bca4/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/application/status/AppStatusGroupCreatedEvent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/application/status/AppStatusGroupCreatedEvent.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/application/status/AppStatusGroupCreatedEvent.java
new file mode 100644
index 0000000..04ee30e
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/application/status/AppStatusGroupCreatedEvent.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.messaging.event.application.status;
+
+/**
+ * This event is fired by cartridge agent when it has started the server and
+ * applications are ready to serve the incoming requests.
+ */
+public class AppStatusGroupCreatedEvent extends StatusEvent {
+ private static final long serialVersionUID = 2625412714611885089L;
+
+ private String groupId;
+ private String appId;
+
+ public AppStatusGroupCreatedEvent(String appId, String groupId) {
+ this.appId = appId;
+ this.groupId = groupId;
+ }
+
+ public String getGroupId() {
+ return this.groupId;
+ }
+
+ public String getAppId() {
+ return appId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/stratos/blob/2414bca4/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/GroupCreatedEvent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/GroupCreatedEvent.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/GroupCreatedEvent.java
new file mode 100644
index 0000000..e3794f0
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/GroupCreatedEvent.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.event.topology;
+
+import org.apache.stratos.messaging.event.Event;
+
+/**
+ * Group Activated Event which will be sent to Topology upon group activation
+ */
+public class GroupCreatedEvent extends Event {
+ private String appId;
+ private String groupId;
+
+ public GroupCreatedEvent(String appId, String groupId) {
+ this.appId = appId;
+ this.groupId = groupId;
+ }
+
+ public String getAppId() {
+ return appId;
+ }
+
+ public String getGroupId() {
+ return groupId;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/stratos/blob/2414bca4/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/application/status/AppStatusClusterCreatedEventListener.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/application/status/AppStatusClusterCreatedEventListener.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/application/status/AppStatusClusterCreatedEventListener.java
new file mode 100644
index 0000000..c0c62f9
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/application/status/AppStatusClusterCreatedEventListener.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.listener.application.status;
+
+import org.apache.stratos.messaging.listener.EventListener;
+
+public abstract class AppStatusClusterCreatedEventListener extends EventListener{
+}
http://git-wip-us.apache.org/repos/asf/stratos/blob/2414bca4/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/application/status/AppStatusGroupCreatedEventListener.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/application/status/AppStatusGroupCreatedEventListener.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/application/status/AppStatusGroupCreatedEventListener.java
new file mode 100644
index 0000000..82386a3
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/application/status/AppStatusGroupCreatedEventListener.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.listener.application.status;
+
+import org.apache.stratos.messaging.listener.EventListener;
+
+public abstract class AppStatusGroupCreatedEventListener extends EventListener {
+}
http://git-wip-us.apache.org/repos/asf/stratos/blob/2414bca4/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/topology/GroupCreatedEventListener.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/topology/GroupCreatedEventListener.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/topology/GroupCreatedEventListener.java
new file mode 100644
index 0000000..3fb2d11
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/listener/topology/GroupCreatedEventListener.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.listener.topology;
+
+import org.apache.stratos.messaging.listener.EventListener;
+
+/**
+ * This will get triggered by the groups activation processor after processing the event
+ */
+public abstract class GroupCreatedEventListener extends EventListener {
+}
http://git-wip-us.apache.org/repos/asf/stratos/blob/2414bca4/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/application/status/AppStatusClusterCreatedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/application/status/AppStatusClusterCreatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/application/status/AppStatusClusterCreatedMessageProcessor.java
new file mode 100644
index 0000000..a743c43
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/application/status/AppStatusClusterCreatedMessageProcessor.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.processor.application.status;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.event.application.status.AppStatusClusterActivatedEvent;
+import org.apache.stratos.messaging.event.application.status.AppStatusClusterCreatedEvent;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.util.Util;
+
+
+public class AppStatusClusterCreatedMessageProcessor extends MessageProcessor {
+ private static final Log log = LogFactory.getLog(AppStatusClusterCreatedMessageProcessor.class);
+ private MessageProcessor nextProcessor;
+
+ @Override
+ public void setNext(MessageProcessor nextProcessor) {
+ this.nextProcessor = nextProcessor;
+ }
+
+ @Override
+ public boolean process(String type, String message, Object object) {
+ if (AppStatusClusterCreatedEvent.class.getName().equals(type)) {
+ // Parse complete message and build event
+ AppStatusClusterCreatedEvent event = (AppStatusClusterCreatedEvent) Util.jsonToObject(message, AppStatusClusterCreatedEvent.class);
+
+ if(log.isDebugEnabled()) {
+ log.debug("Received AppStatusClusterCreatedEvent: " + event.toString());
+ }
+ // Notify event listeners
+ notifyEventListeners(event);
+ return true;
+ } else {
+ if (nextProcessor != null) {
+ return nextProcessor.process(type, message, object);
+ } else {
+ throw new RuntimeException(String.format("Failed to process cluster created message using available message processors: [type] %s [body] %s", type, message));
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/stratos/blob/2414bca4/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/application/status/AppStatusGroupCreatedMessageProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/application/status/AppStatusGroupCreatedMessageProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/application/status/AppStatusGroupCreatedMessageProcessor.java
new file mode 100644
index 0000000..b9a1c6d
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/application/status/AppStatusGroupCreatedMessageProcessor.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.processor.application.status;
+
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.event.application.status.AppStatusGroupActivatedEvent;
+import org.apache.stratos.messaging.event.application.status.AppStatusGroupCreatedEvent;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.util.Util;
+
+public class AppStatusGroupCreatedMessageProcessor extends MessageProcessor {
+ private static final Log log =
+ LogFactory.getLog(AppStatusGroupCreatedMessageProcessor.class);
+ private MessageProcessor nextProcessor;
+
+ @Override
+ public void setNext(MessageProcessor nextProcessor) {
+ this.nextProcessor = nextProcessor;
+ }
+
+ @Override
+ public boolean process(String type, String message, Object object) {
+ if (AppStatusGroupCreatedEvent.class.getName().equals(type)) {
+ // Parse complete message and build event
+ AppStatusGroupCreatedEvent event =
+ (AppStatusGroupCreatedEvent) Util.jsonToObject(message, AppStatusGroupCreatedEvent.class);
+
+ if (log.isDebugEnabled()) {
+ log.debug("Received AppStatusGroupCreatedEvent: " + event.toString());
+ }
+ // Notify event listeners
+ notifyEventListeners(event);
+ return true;
+ } else {
+ if (nextProcessor != null) {
+ return nextProcessor.process(type, message, object);
+ } else {
+ throw new RuntimeException(
+ String.format("Failed to process group created message " +
+ "using available message processors: [type] %s [body] %s", type, message));
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/stratos/blob/2414bca4/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/application/status/AppStatusMessageProcessorChain.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/application/status/AppStatusMessageProcessorChain.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/application/status/AppStatusMessageProcessorChain.java
index 14b8bc2..34cd02b 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/application/status/AppStatusMessageProcessorChain.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/application/status/AppStatusMessageProcessorChain.java
@@ -31,10 +31,12 @@ import org.apache.stratos.messaging.message.processor.MessageProcessorChain;
public class AppStatusMessageProcessorChain extends MessageProcessorChain {
private static final Log log = LogFactory.getLog(AppStatusMessageProcessorChain.class);
+ private AppStatusClusterCreatedMessageProcessor clusterCreatedMessageProcessor;
private AppStatusClusterActivatedMessageProcessor clusterActivatedMessageProcessor;
private AppStatusClusterInactivateMessageProcessor clusterInActivateMessageProcessor;
private AppStatusClusterTerminatingMessageProcessor clusterTerminatingMessageProcessor;
private AppStatusClusterTerminatedMessageProcessor clusterTerminatedMessageProcessor;
+ private AppStatusGroupCreatedMessageProcessor groupCreatedMessageProcessor;
private AppStatusGroupActivatedMessageProcessor groupActivatedMessageProcessor;
private AppStatusGroupInactivatedMessageProcessor groupInActivateMessageProcessor;
private AppStatusApplicationActivatedMessageProcessor appActivatedMessageProcessor;
@@ -48,6 +50,9 @@ public class AppStatusMessageProcessorChain extends MessageProcessorChain {
public void initialize() {
// Add instance notifier event processors
+ clusterCreatedMessageProcessor= new AppStatusClusterCreatedMessageProcessor();
+ add(clusterCreatedMessageProcessor);
+
clusterActivatedMessageProcessor = new AppStatusClusterActivatedMessageProcessor();
add(clusterActivatedMessageProcessor);
@@ -56,9 +61,13 @@ public class AppStatusMessageProcessorChain extends MessageProcessorChain {
clusterTerminatingMessageProcessor = new AppStatusClusterTerminatingMessageProcessor();
add(clusterTerminatingMessageProcessor);
+
clusterTerminatedMessageProcessor = new AppStatusClusterTerminatedMessageProcessor();
add(clusterTerminatedMessageProcessor);
+ groupCreatedMessageProcessor = new AppStatusGroupCreatedMessageProcessor();
+ add(groupCreatedMessageProcessor);
+
groupActivatedMessageProcessor = new AppStatusGroupActivatedMessageProcessor();
add(groupActivatedMessageProcessor);
@@ -92,10 +101,14 @@ public class AppStatusMessageProcessorChain extends MessageProcessorChain {
}
public void addEventListener(EventListener eventListener) {
- if (eventListener instanceof AppStatusClusterActivatedEventListener) {
+ if(eventListener instanceof AppStatusClusterCreatedEventListener) {
+ clusterCreatedMessageProcessor.addEventListener(eventListener);
+ } else if (eventListener instanceof AppStatusClusterActivatedEventListener) {
clusterActivatedMessageProcessor.addEventListener(eventListener);
} else if (eventListener instanceof AppStatusClusterInactivateEventListener) {
clusterInActivateMessageProcessor.addEventListener(eventListener);
+ } else if(eventListener instanceof AppStatusGroupCreatedEventListener) {
+ groupCreatedMessageProcessor.addEventListener(eventListener);
} else if (eventListener instanceof AppStatusGroupActivatedEventListener) {
groupActivatedMessageProcessor.addEventListener(eventListener);
} else if(eventListener instanceof AppStatusClusterTerminatedEventListener){
http://git-wip-us.apache.org/repos/asf/stratos/blob/2414bca4/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupCreatedProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupCreatedProcessor.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupCreatedProcessor.java
new file mode 100644
index 0000000..4a8a744
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/GroupCreatedProcessor.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.messaging.message.processor.topology;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.domain.topology.Application;
+import org.apache.stratos.messaging.domain.topology.Group;
+import org.apache.stratos.messaging.domain.topology.GroupStatus;
+import org.apache.stratos.messaging.domain.topology.Topology;
+import org.apache.stratos.messaging.event.topology.ClusterCreatedEvent;
+import org.apache.stratos.messaging.event.topology.GroupActivatedEvent;
+import org.apache.stratos.messaging.event.topology.GroupCreatedEvent;
+import org.apache.stratos.messaging.message.processor.MessageProcessor;
+import org.apache.stratos.messaging.message.processor.topology.updater.TopologyUpdater;
+import org.apache.stratos.messaging.util.Util;
+
+/**
+ * This processor will act upon the Group activation events
+ */
+public class GroupCreatedProcessor extends MessageProcessor {
+ private static final Log log = LogFactory.getLog(GroupCreatedProcessor.class);
+ private MessageProcessor nextProcessor;
+
+ @Override
+ public void setNext(MessageProcessor nextProcessor) {
+ this.nextProcessor = nextProcessor;
+ }
+
+ @Override
+ public boolean process(String type, String message, Object object) {
+ Topology topology = (Topology) object;
+
+ if (GroupCreatedEvent.class.getName().equals(type)) {
+ // Return if topology has not been initialized
+ if (!topology.isInitialized())
+ return false;
+
+ // Parse complete message and build event
+ GroupCreatedEvent event = (GroupCreatedEvent) Util.
+ jsonToObject(message, GroupCreatedEvent.class);
+
+ TopologyUpdater.acquireWriteLockForApplication(event.getAppId());
+
+ try {
+ return doProcess(event, topology);
+
+ } finally {
+ TopologyUpdater.releaseWriteLockForApplication(event.getAppId());
+ }
+
+ } else {
+ if (nextProcessor != null) {
+ // ask the next processor to take care of the message.
+ return nextProcessor.process(type, message, topology);
+ } else {
+ throw new RuntimeException(String.format("Failed to process message using available message processors: [type] %s [body] %s", type, message));
+ }
+ }
+ }
+
+ private boolean doProcess (GroupCreatedEvent event,Topology topology) {
+
+ // Validate event against the existing topology
+ Application application = topology.getApplication(event.getAppId());
+ if (application == null) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Application does not exist: [service] %s",
+ event.getAppId()));
+ }
+ return false;
+ }
+ Group group = application.getGroupRecursively(event.getGroupId());
+
+ if (group == null) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Group not exists in service: [AppId] %s [groupId] %s", event.getAppId(),
+ event.getGroupId()));
+ }
+ } else {
+ // Apply changes to the topology
+ if (!group.isStateTransitionValid(GroupStatus.Created)) {
+ log.error("Invalid State Transition from " + group.getStatus() + " to " + GroupStatus.Active);
+ }
+ group.setStatus(GroupStatus.Created);
+
+ }
+
+ // Notify event listeners
+ notifyEventListeners(event);
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/stratos/blob/2414bca4/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java
index 4f6d3a9..1ed5576 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/processor/topology/TopologyMessageProcessorChain.java
@@ -46,6 +46,7 @@ public class TopologyMessageProcessorChain extends MessageProcessorChain {
private MemberMaintenanceModeProcessor memberMaintenanceModeProcessor;
private MemberSuspendedMessageProcessor memberSuspendedMessageProcessor;
private MemberTerminatedMessageProcessor memberTerminatedMessageProcessor;
+ private GroupCreatedProcessor groupCreatedProcessor;
private GroupActivatedProcessor groupActivatedProcessor;
private GroupInActivateProcessor groupInActivateProcessor;
private ApplicationCreatedMessageProcessor applicationCreatedMessageProcessor;
@@ -109,6 +110,9 @@ public class TopologyMessageProcessorChain extends MessageProcessorChain {
memberTerminatedMessageProcessor = new MemberTerminatedMessageProcessor();
add(memberTerminatedMessageProcessor);
+ groupCreatedProcessor = new GroupCreatedProcessor();
+ add(groupCreatedProcessor);
+
groupActivatedProcessor = new GroupActivatedProcessor();
add(groupActivatedProcessor);
@@ -179,6 +183,8 @@ public class TopologyMessageProcessorChain extends MessageProcessorChain {
memberMaintenanceModeProcessor.addEventListener(eventListener);
} else if (eventListener instanceof GroupActivatedEventListener) {
groupActivatedProcessor.addEventListener(eventListener);
+ } else if (eventListener instanceof GroupCreatedEventListener) {
+ groupCreatedProcessor.addEventListener(eventListener);
} else if (eventListener instanceof GroupInActivateEventListener) {
groupInActivateProcessor.addEventListener(eventListener);
} else if (eventListener instanceof GroupTerminatedEventListener){