You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@stratos.apache.org by Nirmal Fernando <ni...@gmail.com> on 2013/10/18 11:24:38 UTC

Do not wrap log.info logs with an if condition || Fwd: [1/2] Initial version of the lb endpoint for stratos 4.0.0

+
+    @Override
+    public void run() {
+        if (log.isInfoEnabled()) {
+            log.info("Topology event message processor started");
+            log.info("Waiting for the complete topology event message...");
+        }

This makes these info logs not appear by default. You need to explicitly
specify INFO level, if you want to see these logs.

So, do not wrap INFO logs with an 'IF'. But wrap DEBUG logs.

+        while (true) {
+            try {
+                // First take the complete topology event
+                String json = TopologyEventQueue.getInstance().take();
+
+                // Read message header and identify event
+                EventMessageHeader header = readHeader(json);
+                if
(header.getEventClassName().equals(CompleteTopologyEvent.class.getName())) {
+                    if (log.isInfoEnabled()) {
+                        log.info(String.format("Event message received
from queue: %s", header.getEventClassName()));
+                    }
+                    // Parse complete message and build event
+                    TopologyEventMessage eventMessage =
(TopologyEventMessage) jsonToObject(json, TopologyEventMessage.class);
+                    CompleteTopologyEvent event = (CompleteTopologyEvent)
jsonToObject(eventMessage.getBody(), CompleteTopologyEvent.class);
+
 TopologyManager.getTopology().addServices(event.getTopology().getServices());
+                    if (log.isInfoEnabled()) {
+                        log.info("Topology initialized");
+                    }
+                    break;
+                }
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }
+
+        while (true) {
+            try {
+                String json = TopologyEventQueue.getInstance().take();
+
+                // Read message header and identify event
+                EventMessageHeader header = readHeader(json);
+                if (log.isInfoEnabled()) {
+                    log.info(String.format("Event message received from
queue: %s", header.getEventClassName()));
+                }
+
+                if
(header.getEventClassName().equals(ServiceCreatedEvent.class.getName())) {
+                    // Parse complete message and build event
+                    TopologyEventMessage eventMessage =
(TopologyEventMessage) jsonToObject(json, TopologyEventMessage.class);
+                    ServiceCreatedEvent event = (ServiceCreatedEvent)
jsonToObject(eventMessage.getBody(), ServiceCreatedEvent.class);
+
+                    // Validate event against the existing topology
+                    try {
+                        TopologyManager.acquireReadLock();
+                        if
(TopologyManager.getTopology().serviceExists(event.getServiceName())) {
+                            throw new
RuntimeException(String.format("Service %s already exists",
event.getServiceName()));
+                        }
+                    }
+                    finally {
+                        TopologyManager.releaseReadLock();
+                    }
+
+                    // Apply changes to the topology
+                    try {
+                        Service service = new Service();
+                        service.setServiceName(event.getServiceName());
+                        TopologyManager.acquireWriteLock();
+                        TopologyManager.getTopology().addService(service);
+                    }
+                    finally {
+                        TopologyManager.releaseWriteLock();
+                    }
+
+                    if (log.isInfoEnabled()) {
+                        log.info(String.format("Service %s created",
event.getServiceName()));
+                    }
+                } else if
(header.getEventClassName().equals(ServiceRemovedEvent.class.getName())) {
+                    // Parse complete message and build event
+                    TopologyEventMessage eventMessage =
(TopologyEventMessage) jsonToObject(json, TopologyEventMessage.class);
+                    ServiceRemovedEvent event = (ServiceRemovedEvent)
jsonToObject(eventMessage.getBody(), ServiceRemovedEvent.class);
+
+                    // Validate event against the existing topology
+                    try {
+                        TopologyManager.acquireReadLock();
+                        Service service =
TopologyManager.getTopology().getService(event.getServiceName());
+                        if (service == null) {
+                            throw new
RuntimeException(String.format("Service %s does not exist",
event.getServiceName()));
+                        }
+                    }
+                    finally {
+                        TopologyManager.releaseReadLock();
+                    }
+
+                    // Apply changes to the topology
+                    try {
+                        TopologyManager.acquireWriteLock();
+                        Service service =
TopologyManager.getTopology().getService(event.getServiceName());
+
 TopologyManager.getTopology().removeService(service);
+                        if (log.isInfoEnabled()) {
+                            log.info(String.format("Service %s removed",
event.getServiceName()));
+                        }
+                    }
+                    finally {
+                        TopologyManager.releaseWriteLock();
+                    }
+                } else if
(header.getEventClassName().equals(ClusterCreatedEvent.class.getName())) {
+                    // Parse complete message and build event
+                    TopologyEventMessage eventMessage =
(TopologyEventMessage) jsonToObject(json, TopologyEventMessage.class);
+                    ClusterCreatedEvent event = (ClusterCreatedEvent)
jsonToObject(eventMessage.getBody(), ClusterCreatedEvent.class);
+
+                    // Validate event against the existing topology
+                    try {
+                        TopologyManager.acquireReadLock();
+                        Service service =
TopologyManager.getTopology().getService(event.getServiceName());
+                        if (service == null) {
+                            throw new
RuntimeException(String.format("Service %s does not exist",
event.getServiceName()));
+                        }
+                        if (service.clusterExists(event.getClusterId())) {
+                            throw new
RuntimeException(String.format("Cluster %s already exists in service %s",
event.getClusterId(), event.getServiceName()));
+                        }
+                    }
+                    finally {
+                        TopologyManager.releaseReadLock();
+                    }
+
+                    // Apply changes to the topology
+                    try {
+                        TopologyManager.acquireWriteLock();
+                        Cluster cluster = new Cluster();
+                        cluster.setClusterId(event.getClusterId());
+                        cluster.setHostName(event.getHostName());
+                        cluster.setTenantRange(event.getTenantRange());
+                        cluster.setCloud(event.getCloud());
+                        cluster.setRegion(event.getRegion());
+                        cluster.setZone(event.getZone());
+
+                        Service service =
TopologyManager.getTopology().getService(event.getServiceName());
+                        service.addCluster(cluster);
+                        if (log.isInfoEnabled()) {
+                            log.info(String.format("Cluster %s created for
service %s", event.getClusterId(), event.getServiceName()));
+                        }
+                    }
+                    finally {
+                        TopologyManager.releaseWriteLock();
+                    }
+                } else if
(header.getEventClassName().endsWith(ClusterRemovedEvent.class.getName())) {
+                    // Parse complete message and build event
+                    TopologyEventMessage eventMessage =
(TopologyEventMessage) jsonToObject(json, TopologyEventMessage.class);
+                    ClusterRemovedEvent event = (ClusterRemovedEvent)
jsonToObject(eventMessage.getBody(), ClusterRemovedEvent.class);
+
+                    // Validate event against the existing topology
+                    try {
+                        TopologyManager.acquireReadLock();
+                        Service service =
TopologyManager.getTopology().getService(event.getServiceName());
+                        if (service == null) {
+                            throw new
RuntimeException(String.format("Service %s does not exist",
event.getServiceName()));
+                        }
+                        if (!service.clusterExists(event.getClusterId())) {
+                            throw new
RuntimeException(String.format("Cluster %s does not exist in service %s",
event.getClusterId(), event.getServiceName()));
+                        }
+                    }
+                    finally {
+                        TopologyManager.releaseReadLock();
+                    }
+
+                    // Apply changes to the topology
+                    try {
+                        TopologyManager.acquireWriteLock();
+                        Service service =
TopologyManager.getTopology().getService(event.getServiceName());
+                        service.removeCluster(event.getClusterId());
+
+                        if (log.isInfoEnabled()) {
+                            log.info(String.format("Cluster %s removed
from service %s", event.getClusterId(), event.getServiceName()));
+                        }
+                    }
+                    finally {
+                        TopologyManager.releaseWriteLock();
+                    }
+                } else if
(header.getEventClassName().endsWith(MemberStartedEvent.class.getName())) {
+                    // Parse complete message and build event
+                    TopologyEventMessage eventMessage =
(TopologyEventMessage) jsonToObject(json, TopologyEventMessage.class);
+                    MemberStartedEvent event = (MemberStartedEvent)
jsonToObject(eventMessage.getBody(), MemberStartedEvent.class);
+
+                    // Validate event against the existing topology
+                    try {
+                        TopologyManager.acquireReadLock();
+                        Service service =
TopologyManager.getTopology().getService(event.getServiceName());
+                        if (service == null) {
+                            throw new
RuntimeException(String.format("Service %s does not exist",
event.getServiceName()));
+                        }
+                        Cluster cluster =
service.getCluster(event.getClusterId());
+                        if (cluster == null) {
+                            throw new
RuntimeException(String.format("Cluster %s does not exist",
event.getClusterId()));
+                        }
+                        if (cluster.memberExists(event.getMemberId())) {
+                            throw new
RuntimeException(String.format("Member %s already exist in cluster %s of
service %s", event.getMemberId(), event.getClusterId(),
event.getServiceName()));
+                        }
+                    }
+                    finally {
+                        TopologyManager.releaseReadLock();
+                    }
+
+                    // Apply changes to the topology
+                    try {
+                        TopologyManager.acquireWriteLock();
+
+                        Member member = new Member();
+                        member.setServiceName(event.getServiceName());
+                        member.setClusterId(event.getClusterId());
+                        member.setMemberId(event.getMemberId());
+                        member.setHostName(event.getHostName());
+                        member.setStatus(MemberStatus.Starting);
+                        Service service =
TopologyManager.getTopology().getService(event.getServiceName());
+                        Cluster cluster =
service.getCluster(event.getClusterId());
+                        cluster.addMember(member);
+
+                        if (log.isInfoEnabled()) {
+                            log.info(String.format("Member %s started in
cluster %s of service %s", event.getMemberId(), event.getClusterId(),
event.getServiceName()));
+                        }
+                    }
+                    finally {
+                        TopologyManager.releaseWriteLock();
+                    }
+                } else if
(header.getEventClassName().endsWith(MemberActivatedEvent.class.getName()))
{
+                    // Parse complete message and build event
+                    TopologyEventMessage eventMessage =
(TopologyEventMessage) jsonToObject(json, TopologyEventMessage.class);
+                    MemberActivatedEvent event = (MemberActivatedEvent)
jsonToObject(eventMessage.getBody(), MemberActivatedEvent.class);
+
+                    // Validate event against the existing topology
+                    try {
+                        TopologyManager.acquireReadLock();
+
+                        Service service =
TopologyManager.getTopology().getService(event.getServiceName());
+                        if (service == null) {
+                            throw new
RuntimeException(String.format("Service %s does not exist",
event.getServiceName()));
+                        }
+                        Cluster cluster =
service.getCluster(event.getClusterId());
+                        if (cluster == null) {
+                            throw new
RuntimeException(String.format("Cluster %s does not exist",
event.getClusterId()));
+                        }
+                        Member member =
cluster.getMember(event.getMemberId());
+                        if (member == null) {
+                            throw new
RuntimeException(String.format("Member %s does not exist",
event.getMemberId()));
+                        }
+                        if(member.getStatus() == MemberStatus.Activated) {
+                            throw new
RuntimeException(String.format("Member %s of cluster %s of service %s is
already activated", event.getMemberId(), event.getClusterId(),
event.getServiceName()));
+                        }
+                    }
+                    finally {
+                        TopologyManager.releaseReadLock();
+                    }
+
+                    // Apply changes to the topology
+                    try {
+                        TopologyManager.acquireWriteLock();
+
+                        Service service =
TopologyManager.getTopology().getService(event.getServiceName());
+                        Cluster cluster =
service.getCluster(event.getClusterId());
+                        Member member =
cluster.getMember(event.getMemberId());
+                        member.setStatus(MemberStatus.Activated);
+                        if (log.isInfoEnabled()) {
+                            log.info(String.format("Member %s activated in
cluster %s of service %s", event.getMemberId(), event.getClusterId(),
event.getServiceName()));
+                        }
+                    }
+                    finally {
+                        TopologyManager.releaseWriteLock();
+                    }
+                } else if
(header.getEventClassName().endsWith(MemberSuspendedEvent.class.getName()))
{
+                    // Parse complete message and build event
+                    TopologyEventMessage eventMessage =
(TopologyEventMessage) jsonToObject(json, TopologyEventMessage.class);
+                    MemberSuspendedEvent event = (MemberSuspendedEvent)
jsonToObject(eventMessage.getBody(), MemberSuspendedEvent.class);
+
+                    // Validate event against the existing topology
+                    try {
+                        TopologyManager.acquireReadLock();
+
+                        Service service =
TopologyManager.getTopology().getService(event.getServiceName());
+                        if (service == null) {
+                            throw new
RuntimeException(String.format("Service %s does not exist",
event.getServiceName()));
+                        }
+                        Cluster cluster =
service.getCluster(event.getClusterId());
+                        if (cluster == null) {
+                            throw new
RuntimeException(String.format("Cluster %s does not exist",
event.getClusterId()));
+                        }
+                        Member member =
cluster.getMember(event.getMemberId());
+                        if (member == null) {
+                            throw new
RuntimeException(String.format("Member %s does not exist",
event.getMemberId()));
+                        }
+                        if(member.getStatus() == MemberStatus.Suspended) {
+                            throw new
RuntimeException(String.format("Member %s of cluster %s of service %s is
already suspended", event.getMemberId(), event.getClusterId(),
event.getServiceName()));
+                        }
+                    }
+                    finally {
+                        TopologyManager.releaseReadLock();
+                    }
+
+                    // Apply changes to the topology
+                    try {
+                        TopologyManager.acquireWriteLock();
+
+                        Service service =
TopologyManager.getTopology().getService(event.getServiceName());
+                        Cluster cluster =
service.getCluster(event.getClusterId());
+                        Member member =
cluster.getMember(event.getMemberId());
+                        member.setStatus(MemberStatus.Suspended);
+                        if (log.isInfoEnabled()) {
+                            log.info(String.format("Member %s suspended in
cluster %s of service %s", event.getMemberId(), event.getClusterId(),
event.getServiceName()));
+                        }
+                    }
+                    finally {
+                        TopologyManager.releaseWriteLock();
+                    }
+                } else if
(header.getEventClassName().endsWith(MemberTerminatedEvent.class.getName()))
{
+                    // Parse complete message and build event
+                    TopologyEventMessage eventMessage =
(TopologyEventMessage) jsonToObject(json, TopologyEventMessage.class);
+                    MemberTerminatedEvent event = (MemberTerminatedEvent)
jsonToObject(eventMessage.getBody(), MemberTerminatedEvent.class);
+
+                    // Validate event against the existing topology
+                    try {
+                        TopologyManager.acquireReadLock();
+
+                        Service service =
TopologyManager.getTopology().getService(event.getServiceName());
+                        if (service == null) {
+                            throw new
RuntimeException(String.format("Service %s does not exist",
event.getServiceName()));
+                        }
+                        Cluster cluster =
service.getCluster(event.getClusterId());
+                        if (cluster == null) {
+                            throw new
RuntimeException(String.format("Cluster %s does not exist",
event.getClusterId()));
+                        }
+                        Member member =
cluster.getMember(event.getMemberId());
+                        if (member == null) {
+                            throw new
RuntimeException(String.format("Member %s does not exist",
event.getMemberId()));
+                        }
+                        if(member.getStatus() == MemberStatus.Terminated) {
+                            throw new
RuntimeException(String.format("Member %s of cluster %s of service %s is
already terminated", event.getMemberId(), event.getClusterId(),
event.getServiceName()));
+                        }
+                    }
+                    finally {
+                        TopologyManager.releaseReadLock();
+                    }
+
+                    // Apply changes to the topology
+                    try {
+                        TopologyManager.acquireWriteLock();
+
+                        Service service =
TopologyManager.getTopology().getService(event.getServiceName());
+                        Cluster cluster =
service.getCluster(event.getClusterId());
+                        Member member =
cluster.getMember(event.getMemberId());
+                        member.setStatus(MemberStatus.Terminated);
+                        if (log.isInfoEnabled()) {
+                            log.info(String.format("Member %s terminated
in cluster %s of service %s", event.getMemberId(), event.getClusterId(),
event.getServiceName()));
+                        }
+                    }
+                    finally {
+                        TopologyManager.releaseWriteLock();
+                    }
+                }
+
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/bff99291/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/topology/TopologyEventMessageReceiver.java
----------------------------------------------------------------------
diff --git
a/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/topology/TopologyEventMessageReceiver.java
b/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/topology/TopologyEventMessageReceiver.java
new file mode 100644
index 0000000..8040314
--- /dev/null
+++
b/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/topology/TopologyEventMessageReceiver.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.lb.endpoint.topology;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.TextMessage;
+
+import org.apache.stratos.lb.endpoint.LoadBalancerContext;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class TopologyEventMessageReceiver implements MessageListener {
+
+    private static final Log log =
LogFactory.getLog(TopologyEventMessageReceiver.class);
+
+    @Override
+    public void onMessage(Message message) {
+        if (message instanceof TextMessage) {
+            TextMessage receivedMessage = (TextMessage) message;
+            try {
+                if (log.isDebugEnabled()) {
+                    log.debug("Message received: " + ((TextMessage)
message).getText());
+                }
+                // Add received message to the queue
+
 TopologyEventQueue.getInstance().add(receivedMessage.getText());
+
+            } catch (JMSException e) {
+                log.error(e.getMessage(), e);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/bff99291/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/topology/TopologyEventQueue.java
----------------------------------------------------------------------
diff --git
a/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/topology/TopologyEventQueue.java
b/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/topology/TopologyEventQueue.java
new file mode 100644
index 0000000..c2cebff
--- /dev/null
+++
b/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/topology/TopologyEventQueue.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.lb.endpoint.topology;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * Implements topology event queue.
+ */
+public class TopologyEventQueue extends LinkedBlockingQueue<String>{
+    private static volatile TopologyEventQueue instance;
+
+    private TopologyEventQueue(){
+    }
+
+    public static synchronized TopologyEventQueue getInstance() {
+        if (instance == null) {
+            synchronized (TopologyEventQueue.class){
+                if (instance == null) {
+                    instance = new TopologyEventQueue ();
+                }
+            }
+        }
+        return instance;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/bff99291/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/topology/TopologyManager.java
----------------------------------------------------------------------
diff --git
a/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/topology/TopologyManager.java
b/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/topology/TopologyManager.java
new file mode 100644
index 0000000..a27df39
--- /dev/null
+++
b/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/topology/TopologyManager.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.lb.endpoint.topology;
+
+import org.apache.stratos.messaging.domain.topology.Topology;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class TopologyManager {
+    private static volatile Topology topology;
+    private static volatile ReentrantReadWriteLock lock = new
ReentrantReadWriteLock();
+    private static volatile ReentrantReadWriteLock.ReadLock readLock =
lock.readLock();
+    private static volatile ReentrantReadWriteLock.WriteLock writeLock =
lock.writeLock();
+
+    public static void acquireReadLock() {
+        readLock.lock();
+    }
+
+    public static void releaseReadLock() {
+        readLock.unlock();
+    }
+
+    public static void acquireWriteLock() {
+        writeLock.lock();
+    }
+
+    public static void releaseWriteLock() {
+        writeLock.unlock();
+    }
+
+    public static synchronized Topology getTopology() {
+        if (topology == null) {
+            synchronized (TopologyManager.class){
+                if (topology == null) {
+                    topology = new Topology();
+                }
+            }
+        }
+        return topology;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/bff99291/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/util/Transformer.java
----------------------------------------------------------------------
diff --git
a/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/util/Transformer.java
b/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/util/Transformer.java
new file mode 100644
index 0000000..3d19b50
--- /dev/null
+++
b/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/util/Transformer.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.lb.endpoint.util;
+
+import org.apache.axis2.clustering.Member;
+import org.apache.stratos.messaging.domain.topology.Port;
+
+/**
+ * Implements domain model transformation logic.
+ */
+public class Transformer {
+    public static Member
transform(org.apache.stratos.messaging.domain.topology.Member
topologyMember) {
+        Port httpPort = topologyMember.getPort("HTTP");
+        Port httpsPort = topologyMember.getPort("HTTPS");
+
+        Member member = new Member(topologyMember.getHostName(),
httpPort.getValue());
+        member.setDomain(topologyMember.getHostName());
+        member.setHttpPort(httpPort.getValue());
+        member.setHttpsPort(httpsPort.getValue());
+        member.setActive(topologyMember.isActive());
+        member.setProperties(topologyMember.getProperties());
+        return  member;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/bff99291/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Service.java
----------------------------------------------------------------------
diff --git
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Service.java
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Service.java
index 471c91a..2e838e9 100644
---
a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Service.java
+++
b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Service.java
@@ -43,7 +43,7 @@ public class Service {
         this.serviceName = serviceName;
     }

-    public Collection<Cluster> getClusterMap() {
+    public Collection<Cluster> getClusters() {
         return clusterMap.values();
     }





-- 
Best Regards,
Nirmal

Nirmal Fernando.
PPMC Member & Committer of Apache Stratos,
Senior Software Engineer, WSO2 Inc.

Blog: http://nirmalfdo.blogspot.com/

Re: Do not wrap log.info logs with an if condition || Fwd: [1/2] Initial version of the lb endpoint for stratos 4.0.0

Posted by Nirmal Fernando <ni...@gmail.com>.
On Fri, Oct 18, 2013 at 6:53 PM, Isuru Perera <is...@wso2.com> wrote:

> Hi Nirmal,
>
> On Fri, Oct 18, 2013 at 6:36 PM, Nirmal Fernando <ni...@gmail.com>wrote:
>
>> Isuru,
>>
>>
>> On Fri, Oct 18, 2013 at 6:23 PM, Isuru Perera <is...@wso2.com> wrote:
>>
>>> For me, it is always better to have a guard condition before you logging.
>>>
>>> Someone might want to enable only ERROR level logs in production.
>>>
>>
>> If you explicitly enable only ERROR logs, INFO logs won't get printed. I
>> just verified this.
>>
> That's true. And that's why guard conditions are there, so that we can
> avoid the overhead of logging something if the corresponding log level is
> disabled
>

Ah.. ya .. true. I will add them back where necessary (where we use String
operations etc.).


>
>>> Speaking of logging, I think we should use SLF4J [1].
>>>
>>
>> +1, I've also noticed lot of good things in SLF4J. For an example AFAIR
>> with SLF4J you don't need to clutter the code with those log.isDebugEnabled
>> stuff.
>>
> We would still need guard conditions with SLF4J. Refer this [1] SO
> question regarding this.
>
> One thing I like is that we can easily use parametrized log messages.
>
> And another thing is that we can call the logger as a "logger", but not a
> "log". i.e. Logger logger = LoggerFactory.getLogger(Foo.class) instead of
> Log log = LogFactory.getLog(Foo.class) in Apache Commons Logging (which is
> what we use currently)
>
> [1]
> http://stackoverflow.com/questions/8444266/even-with-slf4j-should-you-guard-your-logging
>
>>
>> I'm just a big fan of it.
>>>
>>> [1] http://www.slf4j.org/
>>>
>>>
>>> On Fri, Oct 18, 2013 at 3:30 PM, Nirmal Fernando <nirmal070125@gmail.com
>>> > wrote:
>>>
>>>>
>>>>
>>>>
>>>> On Fri, Oct 18, 2013 at 2:54 PM, Nirmal Fernando <
>>>> nirmal070125@gmail.com> wrote:
>>>>
>>>>>
>>>>> +
>>>>> +    @Override
>>>>> +    public void run() {
>>>>> +        if (log.isInfoEnabled()) {
>>>>> +            log.info("Topology event message processor started");
>>>>> +            log.info("Waiting for the complete topology event
>>>>> message...");
>>>>> +        }
>>>>>
>>>>> This makes these info logs not appear by default. You need to
>>>>> explicitly specify INFO level, if you want to see these logs.
>>>>>
>>>>
>>>> Above statement is not correct. Apologies!
>>>>
>>>> Still, the suggestion is valid, you do not need to check whether the
>>>> INFO log is enabled IMO.
>>>>
>>>>>
>>>>> So, do not wrap INFO logs with an 'IF'. But wrap DEBUG logs.
>>>>>
>>>>> +        while (true) {
>>>>> +            try {
>>>>> +                // First take the complete topology event
>>>>> +                String json = TopologyEventQueue.getInstance().take();
>>>>> +
>>>>> +                // Read message header and identify event
>>>>> +                EventMessageHeader header = readHeader(json);
>>>>> +                if
>>>>> (header.getEventClassName().equals(CompleteTopologyEvent.class.getName())) {
>>>>> +                    if (log.isInfoEnabled()) {
>>>>> +                        log.info(String.format("Event message
>>>>> received from queue: %s", header.getEventClassName()));
>>>>> +                    }
>>>>> +                    // Parse complete message and build event
>>>>> +                    TopologyEventMessage eventMessage =
>>>>> (TopologyEventMessage) jsonToObject(json, TopologyEventMessage.class);
>>>>> +                    CompleteTopologyEvent event =
>>>>> (CompleteTopologyEvent) jsonToObject(eventMessage.getBody(),
>>>>> CompleteTopologyEvent.class);
>>>>> +
>>>>>  TopologyManager.getTopology().addServices(event.getTopology().getServices());
>>>>> +                    if (log.isInfoEnabled()) {
>>>>> +                        log.info("Topology initialized");
>>>>> +                    }
>>>>> +                    break;
>>>>> +                }
>>>>> +            } catch (Exception e) {
>>>>> +                e.printStackTrace();
>>>>> +            }
>>>>> +        }
>>>>> +
>>>>> +        while (true) {
>>>>> +            try {
>>>>> +                String json = TopologyEventQueue.getInstance().take();
>>>>> +
>>>>> +                // Read message header and identify event
>>>>> +                EventMessageHeader header = readHeader(json);
>>>>> +                if (log.isInfoEnabled()) {
>>>>> +                    log.info(String.format("Event message received
>>>>> from queue: %s", header.getEventClassName()));
>>>>> +                }
>>>>> +
>>>>> +                if
>>>>> (header.getEventClassName().equals(ServiceCreatedEvent.class.getName())) {
>>>>> +                    // Parse complete message and build event
>>>>> +                    TopologyEventMessage eventMessage =
>>>>> (TopologyEventMessage) jsonToObject(json, TopologyEventMessage.class);
>>>>> +                    ServiceCreatedEvent event = (ServiceCreatedEvent)
>>>>> jsonToObject(eventMessage.getBody(), ServiceCreatedEvent.class);
>>>>> +
>>>>> +                    // Validate event against the existing topology
>>>>> +                    try {
>>>>> +                        TopologyManager.acquireReadLock();
>>>>> +                        if
>>>>> (TopologyManager.getTopology().serviceExists(event.getServiceName())) {
>>>>> +                            throw new
>>>>> RuntimeException(String.format("Service %s already exists",
>>>>> event.getServiceName()));
>>>>> +                        }
>>>>> +                    }
>>>>> +                    finally {
>>>>> +                        TopologyManager.releaseReadLock();
>>>>> +                    }
>>>>> +
>>>>> +                    // Apply changes to the topology
>>>>> +                    try {
>>>>> +                        Service service = new Service();
>>>>> +
>>>>>  service.setServiceName(event.getServiceName());
>>>>> +                        TopologyManager.acquireWriteLock();
>>>>> +
>>>>>  TopologyManager.getTopology().addService(service);
>>>>> +                    }
>>>>> +                    finally {
>>>>> +                        TopologyManager.releaseWriteLock();
>>>>> +                    }
>>>>> +
>>>>> +                    if (log.isInfoEnabled()) {
>>>>> +                        log.info(String.format("Service %s created",
>>>>> event.getServiceName()));
>>>>> +                    }
>>>>> +                } else if
>>>>> (header.getEventClassName().equals(ServiceRemovedEvent.class.getName())) {
>>>>> +                    // Parse complete message and build event
>>>>> +                    TopologyEventMessage eventMessage =
>>>>> (TopologyEventMessage) jsonToObject(json, TopologyEventMessage.class);
>>>>> +                    ServiceRemovedEvent event = (ServiceRemovedEvent)
>>>>> jsonToObject(eventMessage.getBody(), ServiceRemovedEvent.class);
>>>>> +
>>>>> +                    // Validate event against the existing topology
>>>>> +                    try {
>>>>> +                        TopologyManager.acquireReadLock();
>>>>> +                        Service service =
>>>>> TopologyManager.getTopology().getService(event.getServiceName());
>>>>> +                        if (service == null) {
>>>>> +                            throw new
>>>>> RuntimeException(String.format("Service %s does not exist",
>>>>> event.getServiceName()));
>>>>> +                        }
>>>>> +                    }
>>>>> +                    finally {
>>>>> +                        TopologyManager.releaseReadLock();
>>>>> +                    }
>>>>> +
>>>>> +                    // Apply changes to the topology
>>>>> +                    try {
>>>>> +                        TopologyManager.acquireWriteLock();
>>>>> +                        Service service =
>>>>> TopologyManager.getTopology().getService(event.getServiceName());
>>>>> +
>>>>>  TopologyManager.getTopology().removeService(service);
>>>>> +                        if (log.isInfoEnabled()) {
>>>>> +                            log.info(String.format("Service %s
>>>>> removed", event.getServiceName()));
>>>>> +                        }
>>>>> +                    }
>>>>> +                    finally {
>>>>> +                        TopologyManager.releaseWriteLock();
>>>>> +                    }
>>>>> +                } else if
>>>>> (header.getEventClassName().equals(ClusterCreatedEvent.class.getName())) {
>>>>> +                    // Parse complete message and build event
>>>>> +                    TopologyEventMessage eventMessage =
>>>>> (TopologyEventMessage) jsonToObject(json, TopologyEventMessage.class);
>>>>> +                    ClusterCreatedEvent event = (ClusterCreatedEvent)
>>>>> jsonToObject(eventMessage.getBody(), ClusterCreatedEvent.class);
>>>>> +
>>>>> +                    // Validate event against the existing topology
>>>>> +                    try {
>>>>> +                        TopologyManager.acquireReadLock();
>>>>> +                        Service service =
>>>>> TopologyManager.getTopology().getService(event.getServiceName());
>>>>> +                        if (service == null) {
>>>>> +                            throw new
>>>>> RuntimeException(String.format("Service %s does not exist",
>>>>> event.getServiceName()));
>>>>> +                        }
>>>>> +                        if
>>>>> (service.clusterExists(event.getClusterId())) {
>>>>> +                            throw new
>>>>> RuntimeException(String.format("Cluster %s already exists in service %s",
>>>>> event.getClusterId(), event.getServiceName()));
>>>>> +                        }
>>>>> +                    }
>>>>> +                    finally {
>>>>> +                        TopologyManager.releaseReadLock();
>>>>> +                    }
>>>>> +
>>>>> +                    // Apply changes to the topology
>>>>> +                    try {
>>>>> +                        TopologyManager.acquireWriteLock();
>>>>> +                        Cluster cluster = new Cluster();
>>>>> +                        cluster.setClusterId(event.getClusterId());
>>>>> +                        cluster.setHostName(event.getHostName());
>>>>> +
>>>>>  cluster.setTenantRange(event.getTenantRange());
>>>>> +                        cluster.setCloud(event.getCloud());
>>>>> +                        cluster.setRegion(event.getRegion());
>>>>> +                        cluster.setZone(event.getZone());
>>>>> +
>>>>> +                        Service service =
>>>>> TopologyManager.getTopology().getService(event.getServiceName());
>>>>> +                        service.addCluster(cluster);
>>>>> +                        if (log.isInfoEnabled()) {
>>>>> +                            log.info(String.format("Cluster %s
>>>>> created for service %s", event.getClusterId(), event.getServiceName()));
>>>>> +                        }
>>>>> +                    }
>>>>> +                    finally {
>>>>> +                        TopologyManager.releaseWriteLock();
>>>>> +                    }
>>>>> +                } else if
>>>>> (header.getEventClassName().endsWith(ClusterRemovedEvent.class.getName())) {
>>>>> +                    // Parse complete message and build event
>>>>> +                    TopologyEventMessage eventMessage =
>>>>> (TopologyEventMessage) jsonToObject(json, TopologyEventMessage.class);
>>>>> +                    ClusterRemovedEvent event = (ClusterRemovedEvent)
>>>>> jsonToObject(eventMessage.getBody(), ClusterRemovedEvent.class);
>>>>> +
>>>>> +                    // Validate event against the existing topology
>>>>> +                    try {
>>>>> +                        TopologyManager.acquireReadLock();
>>>>> +                        Service service =
>>>>> TopologyManager.getTopology().getService(event.getServiceName());
>>>>> +                        if (service == null) {
>>>>> +                            throw new
>>>>> RuntimeException(String.format("Service %s does not exist",
>>>>> event.getServiceName()));
>>>>> +                        }
>>>>> +                        if
>>>>> (!service.clusterExists(event.getClusterId())) {
>>>>> +                            throw new
>>>>> RuntimeException(String.format("Cluster %s does not exist in service %s",
>>>>> event.getClusterId(), event.getServiceName()));
>>>>> +                        }
>>>>> +                    }
>>>>> +                    finally {
>>>>> +                        TopologyManager.releaseReadLock();
>>>>> +                    }
>>>>> +
>>>>> +                    // Apply changes to the topology
>>>>> +                    try {
>>>>> +                        TopologyManager.acquireWriteLock();
>>>>> +                        Service service =
>>>>> TopologyManager.getTopology().getService(event.getServiceName());
>>>>> +                        service.removeCluster(event.getClusterId());
>>>>> +
>>>>> +                        if (log.isInfoEnabled()) {
>>>>> +                            log.info(String.format("Cluster %s
>>>>> removed from service %s", event.getClusterId(), event.getServiceName()));
>>>>> +                        }
>>>>> +                    }
>>>>> +                    finally {
>>>>> +                        TopologyManager.releaseWriteLock();
>>>>> +                    }
>>>>> +                } else if
>>>>> (header.getEventClassName().endsWith(MemberStartedEvent.class.getName())) {
>>>>> +                    // Parse complete message and build event
>>>>> +                    TopologyEventMessage eventMessage =
>>>>> (TopologyEventMessage) jsonToObject(json, TopologyEventMessage.class);
>>>>> +                    MemberStartedEvent event = (MemberStartedEvent)
>>>>> jsonToObject(eventMessage.getBody(), MemberStartedEvent.class);
>>>>> +
>>>>> +                    // Validate event against the existing topology
>>>>> +                    try {
>>>>> +                        TopologyManager.acquireReadLock();
>>>>> +                        Service service =
>>>>> TopologyManager.getTopology().getService(event.getServiceName());
>>>>> +                        if (service == null) {
>>>>> +                            throw new
>>>>> RuntimeException(String.format("Service %s does not exist",
>>>>> event.getServiceName()));
>>>>> +                        }
>>>>> +                        Cluster cluster =
>>>>> service.getCluster(event.getClusterId());
>>>>> +                        if (cluster == null) {
>>>>> +                            throw new
>>>>> RuntimeException(String.format("Cluster %s does not exist",
>>>>> event.getClusterId()));
>>>>> +                        }
>>>>> +                        if
>>>>> (cluster.memberExists(event.getMemberId())) {
>>>>> +                            throw new
>>>>> RuntimeException(String.format("Member %s already exist in cluster %s of
>>>>> service %s", event.getMemberId(), event.getClusterId(),
>>>>> event.getServiceName()));
>>>>> +                        }
>>>>> +                    }
>>>>> +                    finally {
>>>>> +                        TopologyManager.releaseReadLock();
>>>>> +                    }
>>>>> +
>>>>> +                    // Apply changes to the topology
>>>>> +                    try {
>>>>> +                        TopologyManager.acquireWriteLock();
>>>>> +
>>>>> +                        Member member = new Member();
>>>>> +                        member.setServiceName(event.getServiceName());
>>>>> +                        member.setClusterId(event.getClusterId());
>>>>> +                        member.setMemberId(event.getMemberId());
>>>>> +                        member.setHostName(event.getHostName());
>>>>> +                        member.setStatus(MemberStatus.Starting);
>>>>> +                        Service service =
>>>>> TopologyManager.getTopology().getService(event.getServiceName());
>>>>> +                        Cluster cluster =
>>>>> service.getCluster(event.getClusterId());
>>>>> +                        cluster.addMember(member);
>>>>> +
>>>>> +                        if (log.isInfoEnabled()) {
>>>>> +                            log.info(String.format("Member %s
>>>>> started in cluster %s of service %s", event.getMemberId(),
>>>>> event.getClusterId(), event.getServiceName()));
>>>>> +                        }
>>>>> +                    }
>>>>> +                    finally {
>>>>> +                        TopologyManager.releaseWriteLock();
>>>>> +                    }
>>>>> +                } else if
>>>>> (header.getEventClassName().endsWith(MemberActivatedEvent.class.getName()))
>>>>> {
>>>>> +                    // Parse complete message and build event
>>>>> +                    TopologyEventMessage eventMessage =
>>>>> (TopologyEventMessage) jsonToObject(json, TopologyEventMessage.class);
>>>>> +                    MemberActivatedEvent event =
>>>>> (MemberActivatedEvent) jsonToObject(eventMessage.getBody(),
>>>>> MemberActivatedEvent.class);
>>>>> +
>>>>> +                    // Validate event against the existing topology
>>>>> +                    try {
>>>>> +                        TopologyManager.acquireReadLock();
>>>>> +
>>>>> +                        Service service =
>>>>> TopologyManager.getTopology().getService(event.getServiceName());
>>>>> +                        if (service == null) {
>>>>> +                            throw new
>>>>> RuntimeException(String.format("Service %s does not exist",
>>>>> event.getServiceName()));
>>>>> +                        }
>>>>> +                        Cluster cluster =
>>>>> service.getCluster(event.getClusterId());
>>>>> +                        if (cluster == null) {
>>>>> +                            throw new
>>>>> RuntimeException(String.format("Cluster %s does not exist",
>>>>> event.getClusterId()));
>>>>> +                        }
>>>>> +                        Member member =
>>>>> cluster.getMember(event.getMemberId());
>>>>> +                        if (member == null) {
>>>>> +                            throw new
>>>>> RuntimeException(String.format("Member %s does not exist",
>>>>> event.getMemberId()));
>>>>> +                        }
>>>>> +                        if(member.getStatus() ==
>>>>> MemberStatus.Activated) {
>>>>> +                            throw new
>>>>> RuntimeException(String.format("Member %s of cluster %s of service %s is
>>>>> already activated", event.getMemberId(), event.getClusterId(),
>>>>> event.getServiceName()));
>>>>> +                        }
>>>>> +                    }
>>>>> +                    finally {
>>>>> +                        TopologyManager.releaseReadLock();
>>>>> +                    }
>>>>> +
>>>>> +                    // Apply changes to the topology
>>>>> +                    try {
>>>>> +                        TopologyManager.acquireWriteLock();
>>>>> +
>>>>> +                        Service service =
>>>>> TopologyManager.getTopology().getService(event.getServiceName());
>>>>> +                        Cluster cluster =
>>>>> service.getCluster(event.getClusterId());
>>>>> +                        Member member =
>>>>> cluster.getMember(event.getMemberId());
>>>>> +                        member.setStatus(MemberStatus.Activated);
>>>>> +                        if (log.isInfoEnabled()) {
>>>>> +                            log.info(String.format("Member %s
>>>>> activated in cluster %s of service %s", event.getMemberId(),
>>>>> event.getClusterId(), event.getServiceName()));
>>>>> +                        }
>>>>> +                    }
>>>>> +                    finally {
>>>>> +                        TopologyManager.releaseWriteLock();
>>>>> +                    }
>>>>> +                } else if
>>>>> (header.getEventClassName().endsWith(MemberSuspendedEvent.class.getName()))
>>>>> {
>>>>> +                    // Parse complete message and build event
>>>>> +                    TopologyEventMessage eventMessage =
>>>>> (TopologyEventMessage) jsonToObject(json, TopologyEventMessage.class);
>>>>> +                    MemberSuspendedEvent event =
>>>>> (MemberSuspendedEvent) jsonToObject(eventMessage.getBody(),
>>>>> MemberSuspendedEvent.class);
>>>>> +
>>>>> +                    // Validate event against the existing topology
>>>>> +                    try {
>>>>> +                        TopologyManager.acquireReadLock();
>>>>> +
>>>>> +                        Service service =
>>>>> TopologyManager.getTopology().getService(event.getServiceName());
>>>>> +                        if (service == null) {
>>>>> +                            throw new
>>>>> RuntimeException(String.format("Service %s does not exist",
>>>>> event.getServiceName()));
>>>>> +                        }
>>>>> +                        Cluster cluster =
>>>>> service.getCluster(event.getClusterId());
>>>>> +                        if (cluster == null) {
>>>>> +                            throw new
>>>>> RuntimeException(String.format("Cluster %s does not exist",
>>>>> event.getClusterId()));
>>>>> +                        }
>>>>> +                        Member member =
>>>>> cluster.getMember(event.getMemberId());
>>>>> +                        if (member == null) {
>>>>> +                            throw new
>>>>> RuntimeException(String.format("Member %s does not exist",
>>>>> event.getMemberId()));
>>>>> +                        }
>>>>> +                        if(member.getStatus() ==
>>>>> MemberStatus.Suspended) {
>>>>> +                            throw new
>>>>> RuntimeException(String.format("Member %s of cluster %s of service %s is
>>>>> already suspended", event.getMemberId(), event.getClusterId(),
>>>>> event.getServiceName()));
>>>>> +                        }
>>>>> +                    }
>>>>> +                    finally {
>>>>> +                        TopologyManager.releaseReadLock();
>>>>> +                    }
>>>>> +
>>>>> +                    // Apply changes to the topology
>>>>> +                    try {
>>>>> +                        TopologyManager.acquireWriteLock();
>>>>> +
>>>>> +                        Service service =
>>>>> TopologyManager.getTopology().getService(event.getServiceName());
>>>>> +                        Cluster cluster =
>>>>> service.getCluster(event.getClusterId());
>>>>> +                        Member member =
>>>>> cluster.getMember(event.getMemberId());
>>>>> +                        member.setStatus(MemberStatus.Suspended);
>>>>> +                        if (log.isInfoEnabled()) {
>>>>> +                            log.info(String.format("Member %s
>>>>> suspended in cluster %s of service %s", event.getMemberId(),
>>>>> event.getClusterId(), event.getServiceName()));
>>>>> +                        }
>>>>> +                    }
>>>>> +                    finally {
>>>>> +                        TopologyManager.releaseWriteLock();
>>>>> +                    }
>>>>> +                } else if
>>>>> (header.getEventClassName().endsWith(MemberTerminatedEvent.class.getName()))
>>>>> {
>>>>> +                    // Parse complete message and build event
>>>>> +                    TopologyEventMessage eventMessage =
>>>>> (TopologyEventMessage) jsonToObject(json, TopologyEventMessage.class);
>>>>> +                    MemberTerminatedEvent event =
>>>>> (MemberTerminatedEvent) jsonToObject(eventMessage.getBody(),
>>>>> MemberTerminatedEvent.class);
>>>>> +
>>>>> +                    // Validate event against the existing topology
>>>>> +                    try {
>>>>> +                        TopologyManager.acquireReadLock();
>>>>> +
>>>>> +                        Service service =
>>>>> TopologyManager.getTopology().getService(event.getServiceName());
>>>>> +                        if (service == null) {
>>>>> +                            throw new
>>>>> RuntimeException(String.format("Service %s does not exist",
>>>>> event.getServiceName()));
>>>>> +                        }
>>>>> +                        Cluster cluster =
>>>>> service.getCluster(event.getClusterId());
>>>>> +                        if (cluster == null) {
>>>>> +                            throw new
>>>>> RuntimeException(String.format("Cluster %s does not exist",
>>>>> event.getClusterId()));
>>>>> +                        }
>>>>> +                        Member member =
>>>>> cluster.getMember(event.getMemberId());
>>>>> +                        if (member == null) {
>>>>> +                            throw new
>>>>> RuntimeException(String.format("Member %s does not exist",
>>>>> event.getMemberId()));
>>>>> +                        }
>>>>> +                        if(member.getStatus() ==
>>>>> MemberStatus.Terminated) {
>>>>> +                            throw new
>>>>> RuntimeException(String.format("Member %s of cluster %s of service %s is
>>>>> already terminated", event.getMemberId(), event.getClusterId(),
>>>>> event.getServiceName()));
>>>>> +                        }
>>>>> +                    }
>>>>> +                    finally {
>>>>> +                        TopologyManager.releaseReadLock();
>>>>> +                    }
>>>>> +
>>>>> +                    // Apply changes to the topology
>>>>> +                    try {
>>>>> +                        TopologyManager.acquireWriteLock();
>>>>> +
>>>>> +                        Service service =
>>>>> TopologyManager.getTopology().getService(event.getServiceName());
>>>>> +                        Cluster cluster =
>>>>> service.getCluster(event.getClusterId());
>>>>> +                        Member member =
>>>>> cluster.getMember(event.getMemberId());
>>>>> +                        member.setStatus(MemberStatus.Terminated);
>>>>> +                        if (log.isInfoEnabled()) {
>>>>> +                            log.info(String.format("Member %s
>>>>> terminated in cluster %s of service %s", event.getMemberId(),
>>>>> event.getClusterId(), event.getServiceName()));
>>>>> +                        }
>>>>> +                    }
>>>>> +                    finally {
>>>>> +                        TopologyManager.releaseWriteLock();
>>>>> +                    }
>>>>> +                }
>>>>> +
>>>>> +            } catch (Exception e) {
>>>>> +                e.printStackTrace();
>>>>> +            }
>>>>> +        }
>>>>> +    }
>>>>> +}
>>>>>
>>>>>
>>>>> http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/bff99291/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/topology/TopologyEventMessageReceiver.java
>>>>> ----------------------------------------------------------------------
>>>>> diff --git
>>>>> a/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/topology/TopologyEventMessageReceiver.java
>>>>> b/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/topology/TopologyEventMessageReceiver.java
>>>>> new file mode 100644
>>>>> index 0000000..8040314
>>>>> --- /dev/null
>>>>> +++
>>>>> b/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/topology/TopologyEventMessageReceiver.java
>>>>> @@ -0,0 +1,50 @@
>>>>> +/*
>>>>> + * Licensed to the Apache Software Foundation (ASF) under one
>>>>> + * or more contributor license agreements.  See the NOTICE file
>>>>> + * distributed with this work for additional information
>>>>> + * regarding copyright ownership.  The ASF licenses this file
>>>>> + * to you under the Apache License, Version 2.0 (the
>>>>> + * "License"); you may not use this file except in compliance
>>>>> + * with the License.  You may obtain a copy of the License at
>>>>> + *
>>>>> + *     http://www.apache.org/licenses/LICENSE-2.0
>>>>> + *
>>>>> + * Unless required by applicable law or agreed to in writing,
>>>>> + * software distributed under the License is distributed on an
>>>>> + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
>>>>> + * KIND, either express or implied.  See the License for the
>>>>> + * specific language governing permissions and limitations
>>>>> + * under the License.
>>>>> + */
>>>>> +package org.apache.stratos.lb.endpoint.topology;
>>>>> +
>>>>> +import javax.jms.JMSException;
>>>>> +import javax.jms.Message;
>>>>> +import javax.jms.MessageListener;
>>>>> +import javax.jms.TextMessage;
>>>>> +
>>>>> +import org.apache.stratos.lb.endpoint.LoadBalancerContext;
>>>>> +import org.apache.commons.logging.Log;
>>>>> +import org.apache.commons.logging.LogFactory;
>>>>> +
>>>>> +public class TopologyEventMessageReceiver implements MessageListener {
>>>>> +
>>>>> +    private static final Log log =
>>>>> LogFactory.getLog(TopologyEventMessageReceiver.class);
>>>>> +
>>>>> +    @Override
>>>>> +    public void onMessage(Message message) {
>>>>> +        if (message instanceof TextMessage) {
>>>>> +            TextMessage receivedMessage = (TextMessage) message;
>>>>> +            try {
>>>>> +                if (log.isDebugEnabled()) {
>>>>> +                    log.debug("Message received: " + ((TextMessage)
>>>>> message).getText());
>>>>> +                }
>>>>> +                // Add received message to the queue
>>>>> +
>>>>>  TopologyEventQueue.getInstance().add(receivedMessage.getText());
>>>>> +
>>>>> +            } catch (JMSException e) {
>>>>> +                log.error(e.getMessage(), e);
>>>>> +            }
>>>>> +        }
>>>>> +    }
>>>>> +}
>>>>>
>>>>>
>>>>> http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/bff99291/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/topology/TopologyEventQueue.java
>>>>> ----------------------------------------------------------------------
>>>>> diff --git
>>>>> a/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/topology/TopologyEventQueue.java
>>>>> b/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/topology/TopologyEventQueue.java
>>>>> new file mode 100644
>>>>> index 0000000..c2cebff
>>>>> --- /dev/null
>>>>> +++
>>>>> b/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/topology/TopologyEventQueue.java
>>>>> @@ -0,0 +1,44 @@
>>>>> +/*
>>>>> + * Licensed to the Apache Software Foundation (ASF) under one
>>>>> + * or more contributor license agreements.  See the NOTICE file
>>>>> + * distributed with this work for additional information
>>>>> + * regarding copyright ownership.  The ASF licenses this file
>>>>> + * to you under the Apache License, Version 2.0 (the
>>>>> + * "License"); you may not use this file except in compliance
>>>>> + * with the License.  You may obtain a copy of the License at
>>>>> + *
>>>>> + *     http://www.apache.org/licenses/LICENSE-2.0
>>>>> + *
>>>>> + * Unless required by applicable law or agreed to in writing,
>>>>> + * software distributed under the License is distributed on an
>>>>> + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
>>>>> + * KIND, either express or implied.  See the License for the
>>>>> + * specific language governing permissions and limitations
>>>>> + * under the License.
>>>>> + */
>>>>> +
>>>>> +package org.apache.stratos.lb.endpoint.topology;
>>>>> +
>>>>> +import java.util.concurrent.BlockingQueue;
>>>>> +import java.util.concurrent.LinkedBlockingQueue;
>>>>> +
>>>>> +/**
>>>>> + * Implements topology event queue.
>>>>> + */
>>>>> +public class TopologyEventQueue extends LinkedBlockingQueue<String>{
>>>>> +    private static volatile TopologyEventQueue instance;
>>>>> +
>>>>> +    private TopologyEventQueue(){
>>>>> +    }
>>>>> +
>>>>> +    public static synchronized TopologyEventQueue getInstance() {
>>>>> +        if (instance == null) {
>>>>> +            synchronized (TopologyEventQueue.class){
>>>>> +                if (instance == null) {
>>>>> +                    instance = new TopologyEventQueue ();
>>>>> +                }
>>>>> +            }
>>>>> +        }
>>>>> +        return instance;
>>>>> +    }
>>>>> +}
>>>>>
>>>>>
>>>>> http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/bff99291/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/topology/TopologyManager.java
>>>>> ----------------------------------------------------------------------
>>>>> diff --git
>>>>> a/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/topology/TopologyManager.java
>>>>> b/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/topology/TopologyManager.java
>>>>> new file mode 100644
>>>>> index 0000000..a27df39
>>>>> --- /dev/null
>>>>> +++
>>>>> b/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/topology/TopologyManager.java
>>>>> @@ -0,0 +1,57 @@
>>>>> +/*
>>>>> + * Licensed to the Apache Software Foundation (ASF) under one
>>>>> + * or more contributor license agreements.  See the NOTICE file
>>>>> + * distributed with this work for additional information
>>>>> + * regarding copyright ownership.  The ASF licenses this file
>>>>> + * to you under the Apache License, Version 2.0 (the
>>>>> + * "License"); you may not use this file except in compliance
>>>>> + * with the License.  You may obtain a copy of the License at
>>>>> + *
>>>>> + *     http://www.apache.org/licenses/LICENSE-2.0
>>>>> + *
>>>>> + * Unless required by applicable law or agreed to in writing,
>>>>> + * software distributed under the License is distributed on an
>>>>> + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
>>>>> + * KIND, either express or implied.  See the License for the
>>>>> + * specific language governing permissions and limitations
>>>>> + * under the License.
>>>>> + */
>>>>> +
>>>>> +package org.apache.stratos.lb.endpoint.topology;
>>>>> +
>>>>> +import org.apache.stratos.messaging.domain.topology.Topology;
>>>>> +import java.util.concurrent.locks.ReentrantReadWriteLock;
>>>>> +
>>>>> +public class TopologyManager {
>>>>> +    private static volatile Topology topology;
>>>>> +    private static volatile ReentrantReadWriteLock lock = new
>>>>> ReentrantReadWriteLock();
>>>>> +    private static volatile ReentrantReadWriteLock.ReadLock readLock
>>>>> = lock.readLock();
>>>>> +    private static volatile ReentrantReadWriteLock.WriteLock
>>>>> writeLock = lock.writeLock();
>>>>> +
>>>>> +    public static void acquireReadLock() {
>>>>> +        readLock.lock();
>>>>> +    }
>>>>> +
>>>>> +    public static void releaseReadLock() {
>>>>> +        readLock.unlock();
>>>>> +    }
>>>>> +
>>>>> +    public static void acquireWriteLock() {
>>>>> +        writeLock.lock();
>>>>> +    }
>>>>> +
>>>>> +    public static void releaseWriteLock() {
>>>>> +        writeLock.unlock();
>>>>> +    }
>>>>> +
>>>>> +    public static synchronized Topology getTopology() {
>>>>> +        if (topology == null) {
>>>>> +            synchronized (TopologyManager.class){
>>>>> +                if (topology == null) {
>>>>> +                    topology = new Topology();
>>>>> +                }
>>>>> +            }
>>>>> +        }
>>>>> +        return topology;
>>>>> +    }
>>>>> +}
>>>>>
>>>>>
>>>>> http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/bff99291/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/util/Transformer.java
>>>>> ----------------------------------------------------------------------
>>>>> diff --git
>>>>> a/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/util/Transformer.java
>>>>> b/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/util/Transformer.java
>>>>> new file mode 100644
>>>>> index 0000000..3d19b50
>>>>> --- /dev/null
>>>>> +++
>>>>> b/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/util/Transformer.java
>>>>> @@ -0,0 +1,41 @@
>>>>> +/*
>>>>> + * Licensed to the Apache Software Foundation (ASF) under one
>>>>> + * or more contributor license agreements.  See the NOTICE file
>>>>> + * distributed with this work for additional information
>>>>> + * regarding copyright ownership.  The ASF licenses this file
>>>>> + * to you under the Apache License, Version 2.0 (the
>>>>> + * "License"); you may not use this file except in compliance
>>>>> + * with the License.  You may obtain a copy of the License at
>>>>> + *
>>>>> + *     http://www.apache.org/licenses/LICENSE-2.0
>>>>> + *
>>>>> + * Unless required by applicable law or agreed to in writing,
>>>>> + * software distributed under the License is distributed on an
>>>>> + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
>>>>> + * KIND, either express or implied.  See the License for the
>>>>> + * specific language governing permissions and limitations
>>>>> + * under the License.
>>>>> + */
>>>>> +
>>>>> +package org.apache.stratos.lb.endpoint.util;
>>>>> +
>>>>> +import org.apache.axis2.clustering.Member;
>>>>> +import org.apache.stratos.messaging.domain.topology.Port;
>>>>> +
>>>>> +/**
>>>>> + * Implements domain model transformation logic.
>>>>> + */
>>>>> +public class Transformer {
>>>>> +    public static Member
>>>>> transform(org.apache.stratos.messaging.domain.topology.Member
>>>>> topologyMember) {
>>>>> +        Port httpPort = topologyMember.getPort("HTTP");
>>>>> +        Port httpsPort = topologyMember.getPort("HTTPS");
>>>>> +
>>>>> +        Member member = new Member(topologyMember.getHostName(),
>>>>> httpPort.getValue());
>>>>> +        member.setDomain(topologyMember.getHostName());
>>>>> +        member.setHttpPort(httpPort.getValue());
>>>>> +        member.setHttpsPort(httpsPort.getValue());
>>>>> +        member.setActive(topologyMember.isActive());
>>>>> +        member.setProperties(topologyMember.getProperties());
>>>>> +        return  member;
>>>>> +    }
>>>>> +}
>>>>>
>>>>>
>>>>> http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/bff99291/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Service.java
>>>>> ----------------------------------------------------------------------
>>>>> diff --git
>>>>> a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Service.java
>>>>> b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Service.java
>>>>> index 471c91a..2e838e9 100644
>>>>> ---
>>>>> a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Service.java
>>>>> +++
>>>>> b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Service.java
>>>>> @@ -43,7 +43,7 @@ public class Service {
>>>>>          this.serviceName = serviceName;
>>>>>      }
>>>>>
>>>>> -    public Collection<Cluster> getClusterMap() {
>>>>> +    public Collection<Cluster> getClusters() {
>>>>>          return clusterMap.values();
>>>>>      }
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Best Regards,
>>>>> Nirmal
>>>>>
>>>>> Nirmal Fernando.
>>>>> PPMC Member & Committer of Apache Stratos,
>>>>> Senior Software Engineer, WSO2 Inc.
>>>>>
>>>>> Blog: http://nirmalfdo.blogspot.com/
>>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> Best Regards,
>>>> Nirmal
>>>>
>>>> Nirmal Fernando.
>>>> PPMC Member & Committer of Apache Stratos,
>>>> Senior Software Engineer, WSO2 Inc.
>>>>
>>>> Blog: http://nirmalfdo.blogspot.com/
>>>>
>>>
>>>
>>>
>>> --
>>> Isuru Perera
>>> Senior Software Engineer | WSO2, Inc. | http://wso2.com/
>>> Lean . Enterprise . Middleware
>>>
>>> about.me/chrishantha
>>>
>>
>>
>>
>> --
>> Best Regards,
>> Nirmal
>>
>> Nirmal Fernando.
>> PPMC Member & Committer of Apache Stratos,
>> Senior Software Engineer, WSO2 Inc.
>>
>> Blog: http://nirmalfdo.blogspot.com/
>>
>
>
>
> --
> Isuru Perera
> Senior Software Engineer | WSO2, Inc. | http://wso2.com/
> Lean . Enterprise . Middleware
>
> about.me/chrishantha
>



-- 
Best Regards,
Nirmal

Nirmal Fernando.
PPMC Member & Committer of Apache Stratos,
Senior Software Engineer, WSO2 Inc.

Blog: http://nirmalfdo.blogspot.com/

Re: Do not wrap log.info logs with an if condition || Fwd: [1/2] Initial version of the lb endpoint for stratos 4.0.0

Posted by Isuru Perera <is...@wso2.com>.
Hi Nirmal,

On Fri, Oct 18, 2013 at 6:36 PM, Nirmal Fernando <ni...@gmail.com>wrote:

> Isuru,
>
>
> On Fri, Oct 18, 2013 at 6:23 PM, Isuru Perera <is...@wso2.com> wrote:
>
>> For me, it is always better to have a guard condition before you logging.
>>
>> Someone might want to enable only ERROR level logs in production.
>>
>
> If you explicitly enable only ERROR logs, INFO logs won't get printed. I
> just verified this.
>
That's true. And that's why guard conditions are there, so that we can
avoid the overhead of logging something if the corresponding log level is
disabled

>
>> Speaking of logging, I think we should use SLF4J [1].
>>
>
> +1, I've also noticed lot of good things in SLF4J. For an example AFAIR
> with SLF4J you don't need to clutter the code with those log.isDebugEnabled
> stuff.
>
We would still need guard conditions with SLF4J. Refer this [1] SO question
regarding this.

One thing I like is that we can easily use parametrized log messages.

And another thing is that we can call the logger as a "logger", but not a
"log". i.e. Logger logger = LoggerFactory.getLogger(Foo.class) instead of
Log log = LogFactory.getLog(Foo.class) in Apache Commons Logging (which is
what we use currently)

[1]
http://stackoverflow.com/questions/8444266/even-with-slf4j-should-you-guard-your-logging

>
> I'm just a big fan of it.
>>
>> [1] http://www.slf4j.org/
>>
>>
>> On Fri, Oct 18, 2013 at 3:30 PM, Nirmal Fernando <ni...@gmail.com>wrote:
>>
>>>
>>>
>>>
>>> On Fri, Oct 18, 2013 at 2:54 PM, Nirmal Fernando <nirmal070125@gmail.com
>>> > wrote:
>>>
>>>>
>>>> +
>>>> +    @Override
>>>> +    public void run() {
>>>> +        if (log.isInfoEnabled()) {
>>>> +            log.info("Topology event message processor started");
>>>> +            log.info("Waiting for the complete topology event
>>>> message...");
>>>> +        }
>>>>
>>>> This makes these info logs not appear by default. You need to
>>>> explicitly specify INFO level, if you want to see these logs.
>>>>
>>>
>>> Above statement is not correct. Apologies!
>>>
>>> Still, the suggestion is valid, you do not need to check whether the
>>> INFO log is enabled IMO.
>>>
>>>>
>>>> So, do not wrap INFO logs with an 'IF'. But wrap DEBUG logs.
>>>>
>>>> +        while (true) {
>>>> +            try {
>>>> +                // First take the complete topology event
>>>> +                String json = TopologyEventQueue.getInstance().take();
>>>> +
>>>> +                // Read message header and identify event
>>>> +                EventMessageHeader header = readHeader(json);
>>>> +                if
>>>> (header.getEventClassName().equals(CompleteTopologyEvent.class.getName())) {
>>>> +                    if (log.isInfoEnabled()) {
>>>> +                        log.info(String.format("Event message
>>>> received from queue: %s", header.getEventClassName()));
>>>> +                    }
>>>> +                    // Parse complete message and build event
>>>> +                    TopologyEventMessage eventMessage =
>>>> (TopologyEventMessage) jsonToObject(json, TopologyEventMessage.class);
>>>> +                    CompleteTopologyEvent event =
>>>> (CompleteTopologyEvent) jsonToObject(eventMessage.getBody(),
>>>> CompleteTopologyEvent.class);
>>>> +
>>>>  TopologyManager.getTopology().addServices(event.getTopology().getServices());
>>>> +                    if (log.isInfoEnabled()) {
>>>> +                        log.info("Topology initialized");
>>>> +                    }
>>>> +                    break;
>>>> +                }
>>>> +            } catch (Exception e) {
>>>> +                e.printStackTrace();
>>>> +            }
>>>> +        }
>>>> +
>>>> +        while (true) {
>>>> +            try {
>>>> +                String json = TopologyEventQueue.getInstance().take();
>>>> +
>>>> +                // Read message header and identify event
>>>> +                EventMessageHeader header = readHeader(json);
>>>> +                if (log.isInfoEnabled()) {
>>>> +                    log.info(String.format("Event message received
>>>> from queue: %s", header.getEventClassName()));
>>>> +                }
>>>> +
>>>> +                if
>>>> (header.getEventClassName().equals(ServiceCreatedEvent.class.getName())) {
>>>> +                    // Parse complete message and build event
>>>> +                    TopologyEventMessage eventMessage =
>>>> (TopologyEventMessage) jsonToObject(json, TopologyEventMessage.class);
>>>> +                    ServiceCreatedEvent event = (ServiceCreatedEvent)
>>>> jsonToObject(eventMessage.getBody(), ServiceCreatedEvent.class);
>>>> +
>>>> +                    // Validate event against the existing topology
>>>> +                    try {
>>>> +                        TopologyManager.acquireReadLock();
>>>> +                        if
>>>> (TopologyManager.getTopology().serviceExists(event.getServiceName())) {
>>>> +                            throw new
>>>> RuntimeException(String.format("Service %s already exists",
>>>> event.getServiceName()));
>>>> +                        }
>>>> +                    }
>>>> +                    finally {
>>>> +                        TopologyManager.releaseReadLock();
>>>> +                    }
>>>> +
>>>> +                    // Apply changes to the topology
>>>> +                    try {
>>>> +                        Service service = new Service();
>>>> +                        service.setServiceName(event.getServiceName());
>>>> +                        TopologyManager.acquireWriteLock();
>>>> +
>>>>  TopologyManager.getTopology().addService(service);
>>>> +                    }
>>>> +                    finally {
>>>> +                        TopologyManager.releaseWriteLock();
>>>> +                    }
>>>> +
>>>> +                    if (log.isInfoEnabled()) {
>>>> +                        log.info(String.format("Service %s created",
>>>> event.getServiceName()));
>>>> +                    }
>>>> +                } else if
>>>> (header.getEventClassName().equals(ServiceRemovedEvent.class.getName())) {
>>>> +                    // Parse complete message and build event
>>>> +                    TopologyEventMessage eventMessage =
>>>> (TopologyEventMessage) jsonToObject(json, TopologyEventMessage.class);
>>>> +                    ServiceRemovedEvent event = (ServiceRemovedEvent)
>>>> jsonToObject(eventMessage.getBody(), ServiceRemovedEvent.class);
>>>> +
>>>> +                    // Validate event against the existing topology
>>>> +                    try {
>>>> +                        TopologyManager.acquireReadLock();
>>>> +                        Service service =
>>>> TopologyManager.getTopology().getService(event.getServiceName());
>>>> +                        if (service == null) {
>>>> +                            throw new
>>>> RuntimeException(String.format("Service %s does not exist",
>>>> event.getServiceName()));
>>>> +                        }
>>>> +                    }
>>>> +                    finally {
>>>> +                        TopologyManager.releaseReadLock();
>>>> +                    }
>>>> +
>>>> +                    // Apply changes to the topology
>>>> +                    try {
>>>> +                        TopologyManager.acquireWriteLock();
>>>> +                        Service service =
>>>> TopologyManager.getTopology().getService(event.getServiceName());
>>>> +
>>>>  TopologyManager.getTopology().removeService(service);
>>>> +                        if (log.isInfoEnabled()) {
>>>> +                            log.info(String.format("Service %s
>>>> removed", event.getServiceName()));
>>>> +                        }
>>>> +                    }
>>>> +                    finally {
>>>> +                        TopologyManager.releaseWriteLock();
>>>> +                    }
>>>> +                } else if
>>>> (header.getEventClassName().equals(ClusterCreatedEvent.class.getName())) {
>>>> +                    // Parse complete message and build event
>>>> +                    TopologyEventMessage eventMessage =
>>>> (TopologyEventMessage) jsonToObject(json, TopologyEventMessage.class);
>>>> +                    ClusterCreatedEvent event = (ClusterCreatedEvent)
>>>> jsonToObject(eventMessage.getBody(), ClusterCreatedEvent.class);
>>>> +
>>>> +                    // Validate event against the existing topology
>>>> +                    try {
>>>> +                        TopologyManager.acquireReadLock();
>>>> +                        Service service =
>>>> TopologyManager.getTopology().getService(event.getServiceName());
>>>> +                        if (service == null) {
>>>> +                            throw new
>>>> RuntimeException(String.format("Service %s does not exist",
>>>> event.getServiceName()));
>>>> +                        }
>>>> +                        if
>>>> (service.clusterExists(event.getClusterId())) {
>>>> +                            throw new
>>>> RuntimeException(String.format("Cluster %s already exists in service %s",
>>>> event.getClusterId(), event.getServiceName()));
>>>> +                        }
>>>> +                    }
>>>> +                    finally {
>>>> +                        TopologyManager.releaseReadLock();
>>>> +                    }
>>>> +
>>>> +                    // Apply changes to the topology
>>>> +                    try {
>>>> +                        TopologyManager.acquireWriteLock();
>>>> +                        Cluster cluster = new Cluster();
>>>> +                        cluster.setClusterId(event.getClusterId());
>>>> +                        cluster.setHostName(event.getHostName());
>>>> +                        cluster.setTenantRange(event.getTenantRange());
>>>> +                        cluster.setCloud(event.getCloud());
>>>> +                        cluster.setRegion(event.getRegion());
>>>> +                        cluster.setZone(event.getZone());
>>>> +
>>>> +                        Service service =
>>>> TopologyManager.getTopology().getService(event.getServiceName());
>>>> +                        service.addCluster(cluster);
>>>> +                        if (log.isInfoEnabled()) {
>>>> +                            log.info(String.format("Cluster %s
>>>> created for service %s", event.getClusterId(), event.getServiceName()));
>>>> +                        }
>>>> +                    }
>>>> +                    finally {
>>>> +                        TopologyManager.releaseWriteLock();
>>>> +                    }
>>>> +                } else if
>>>> (header.getEventClassName().endsWith(ClusterRemovedEvent.class.getName())) {
>>>> +                    // Parse complete message and build event
>>>> +                    TopologyEventMessage eventMessage =
>>>> (TopologyEventMessage) jsonToObject(json, TopologyEventMessage.class);
>>>> +                    ClusterRemovedEvent event = (ClusterRemovedEvent)
>>>> jsonToObject(eventMessage.getBody(), ClusterRemovedEvent.class);
>>>> +
>>>> +                    // Validate event against the existing topology
>>>> +                    try {
>>>> +                        TopologyManager.acquireReadLock();
>>>> +                        Service service =
>>>> TopologyManager.getTopology().getService(event.getServiceName());
>>>> +                        if (service == null) {
>>>> +                            throw new
>>>> RuntimeException(String.format("Service %s does not exist",
>>>> event.getServiceName()));
>>>> +                        }
>>>> +                        if
>>>> (!service.clusterExists(event.getClusterId())) {
>>>> +                            throw new
>>>> RuntimeException(String.format("Cluster %s does not exist in service %s",
>>>> event.getClusterId(), event.getServiceName()));
>>>> +                        }
>>>> +                    }
>>>> +                    finally {
>>>> +                        TopologyManager.releaseReadLock();
>>>> +                    }
>>>> +
>>>> +                    // Apply changes to the topology
>>>> +                    try {
>>>> +                        TopologyManager.acquireWriteLock();
>>>> +                        Service service =
>>>> TopologyManager.getTopology().getService(event.getServiceName());
>>>> +                        service.removeCluster(event.getClusterId());
>>>> +
>>>> +                        if (log.isInfoEnabled()) {
>>>> +                            log.info(String.format("Cluster %s
>>>> removed from service %s", event.getClusterId(), event.getServiceName()));
>>>> +                        }
>>>> +                    }
>>>> +                    finally {
>>>> +                        TopologyManager.releaseWriteLock();
>>>> +                    }
>>>> +                } else if
>>>> (header.getEventClassName().endsWith(MemberStartedEvent.class.getName())) {
>>>> +                    // Parse complete message and build event
>>>> +                    TopologyEventMessage eventMessage =
>>>> (TopologyEventMessage) jsonToObject(json, TopologyEventMessage.class);
>>>> +                    MemberStartedEvent event = (MemberStartedEvent)
>>>> jsonToObject(eventMessage.getBody(), MemberStartedEvent.class);
>>>> +
>>>> +                    // Validate event against the existing topology
>>>> +                    try {
>>>> +                        TopologyManager.acquireReadLock();
>>>> +                        Service service =
>>>> TopologyManager.getTopology().getService(event.getServiceName());
>>>> +                        if (service == null) {
>>>> +                            throw new
>>>> RuntimeException(String.format("Service %s does not exist",
>>>> event.getServiceName()));
>>>> +                        }
>>>> +                        Cluster cluster =
>>>> service.getCluster(event.getClusterId());
>>>> +                        if (cluster == null) {
>>>> +                            throw new
>>>> RuntimeException(String.format("Cluster %s does not exist",
>>>> event.getClusterId()));
>>>> +                        }
>>>> +                        if (cluster.memberExists(event.getMemberId()))
>>>> {
>>>> +                            throw new
>>>> RuntimeException(String.format("Member %s already exist in cluster %s of
>>>> service %s", event.getMemberId(), event.getClusterId(),
>>>> event.getServiceName()));
>>>> +                        }
>>>> +                    }
>>>> +                    finally {
>>>> +                        TopologyManager.releaseReadLock();
>>>> +                    }
>>>> +
>>>> +                    // Apply changes to the topology
>>>> +                    try {
>>>> +                        TopologyManager.acquireWriteLock();
>>>> +
>>>> +                        Member member = new Member();
>>>> +                        member.setServiceName(event.getServiceName());
>>>> +                        member.setClusterId(event.getClusterId());
>>>> +                        member.setMemberId(event.getMemberId());
>>>> +                        member.setHostName(event.getHostName());
>>>> +                        member.setStatus(MemberStatus.Starting);
>>>> +                        Service service =
>>>> TopologyManager.getTopology().getService(event.getServiceName());
>>>> +                        Cluster cluster =
>>>> service.getCluster(event.getClusterId());
>>>> +                        cluster.addMember(member);
>>>> +
>>>> +                        if (log.isInfoEnabled()) {
>>>> +                            log.info(String.format("Member %s started
>>>> in cluster %s of service %s", event.getMemberId(), event.getClusterId(),
>>>> event.getServiceName()));
>>>> +                        }
>>>> +                    }
>>>> +                    finally {
>>>> +                        TopologyManager.releaseWriteLock();
>>>> +                    }
>>>> +                } else if
>>>> (header.getEventClassName().endsWith(MemberActivatedEvent.class.getName()))
>>>> {
>>>> +                    // Parse complete message and build event
>>>> +                    TopologyEventMessage eventMessage =
>>>> (TopologyEventMessage) jsonToObject(json, TopologyEventMessage.class);
>>>> +                    MemberActivatedEvent event =
>>>> (MemberActivatedEvent) jsonToObject(eventMessage.getBody(),
>>>> MemberActivatedEvent.class);
>>>> +
>>>> +                    // Validate event against the existing topology
>>>> +                    try {
>>>> +                        TopologyManager.acquireReadLock();
>>>> +
>>>> +                        Service service =
>>>> TopologyManager.getTopology().getService(event.getServiceName());
>>>> +                        if (service == null) {
>>>> +                            throw new
>>>> RuntimeException(String.format("Service %s does not exist",
>>>> event.getServiceName()));
>>>> +                        }
>>>> +                        Cluster cluster =
>>>> service.getCluster(event.getClusterId());
>>>> +                        if (cluster == null) {
>>>> +                            throw new
>>>> RuntimeException(String.format("Cluster %s does not exist",
>>>> event.getClusterId()));
>>>> +                        }
>>>> +                        Member member =
>>>> cluster.getMember(event.getMemberId());
>>>> +                        if (member == null) {
>>>> +                            throw new
>>>> RuntimeException(String.format("Member %s does not exist",
>>>> event.getMemberId()));
>>>> +                        }
>>>> +                        if(member.getStatus() ==
>>>> MemberStatus.Activated) {
>>>> +                            throw new
>>>> RuntimeException(String.format("Member %s of cluster %s of service %s is
>>>> already activated", event.getMemberId(), event.getClusterId(),
>>>> event.getServiceName()));
>>>> +                        }
>>>> +                    }
>>>> +                    finally {
>>>> +                        TopologyManager.releaseReadLock();
>>>> +                    }
>>>> +
>>>> +                    // Apply changes to the topology
>>>> +                    try {
>>>> +                        TopologyManager.acquireWriteLock();
>>>> +
>>>> +                        Service service =
>>>> TopologyManager.getTopology().getService(event.getServiceName());
>>>> +                        Cluster cluster =
>>>> service.getCluster(event.getClusterId());
>>>> +                        Member member =
>>>> cluster.getMember(event.getMemberId());
>>>> +                        member.setStatus(MemberStatus.Activated);
>>>> +                        if (log.isInfoEnabled()) {
>>>> +                            log.info(String.format("Member %s
>>>> activated in cluster %s of service %s", event.getMemberId(),
>>>> event.getClusterId(), event.getServiceName()));
>>>> +                        }
>>>> +                    }
>>>> +                    finally {
>>>> +                        TopologyManager.releaseWriteLock();
>>>> +                    }
>>>> +                } else if
>>>> (header.getEventClassName().endsWith(MemberSuspendedEvent.class.getName()))
>>>> {
>>>> +                    // Parse complete message and build event
>>>> +                    TopologyEventMessage eventMessage =
>>>> (TopologyEventMessage) jsonToObject(json, TopologyEventMessage.class);
>>>> +                    MemberSuspendedEvent event =
>>>> (MemberSuspendedEvent) jsonToObject(eventMessage.getBody(),
>>>> MemberSuspendedEvent.class);
>>>> +
>>>> +                    // Validate event against the existing topology
>>>> +                    try {
>>>> +                        TopologyManager.acquireReadLock();
>>>> +
>>>> +                        Service service =
>>>> TopologyManager.getTopology().getService(event.getServiceName());
>>>> +                        if (service == null) {
>>>> +                            throw new
>>>> RuntimeException(String.format("Service %s does not exist",
>>>> event.getServiceName()));
>>>> +                        }
>>>> +                        Cluster cluster =
>>>> service.getCluster(event.getClusterId());
>>>> +                        if (cluster == null) {
>>>> +                            throw new
>>>> RuntimeException(String.format("Cluster %s does not exist",
>>>> event.getClusterId()));
>>>> +                        }
>>>> +                        Member member =
>>>> cluster.getMember(event.getMemberId());
>>>> +                        if (member == null) {
>>>> +                            throw new
>>>> RuntimeException(String.format("Member %s does not exist",
>>>> event.getMemberId()));
>>>> +                        }
>>>> +                        if(member.getStatus() ==
>>>> MemberStatus.Suspended) {
>>>> +                            throw new
>>>> RuntimeException(String.format("Member %s of cluster %s of service %s is
>>>> already suspended", event.getMemberId(), event.getClusterId(),
>>>> event.getServiceName()));
>>>> +                        }
>>>> +                    }
>>>> +                    finally {
>>>> +                        TopologyManager.releaseReadLock();
>>>> +                    }
>>>> +
>>>> +                    // Apply changes to the topology
>>>> +                    try {
>>>> +                        TopologyManager.acquireWriteLock();
>>>> +
>>>> +                        Service service =
>>>> TopologyManager.getTopology().getService(event.getServiceName());
>>>> +                        Cluster cluster =
>>>> service.getCluster(event.getClusterId());
>>>> +                        Member member =
>>>> cluster.getMember(event.getMemberId());
>>>> +                        member.setStatus(MemberStatus.Suspended);
>>>> +                        if (log.isInfoEnabled()) {
>>>> +                            log.info(String.format("Member %s
>>>> suspended in cluster %s of service %s", event.getMemberId(),
>>>> event.getClusterId(), event.getServiceName()));
>>>> +                        }
>>>> +                    }
>>>> +                    finally {
>>>> +                        TopologyManager.releaseWriteLock();
>>>> +                    }
>>>> +                } else if
>>>> (header.getEventClassName().endsWith(MemberTerminatedEvent.class.getName()))
>>>> {
>>>> +                    // Parse complete message and build event
>>>> +                    TopologyEventMessage eventMessage =
>>>> (TopologyEventMessage) jsonToObject(json, TopologyEventMessage.class);
>>>> +                    MemberTerminatedEvent event =
>>>> (MemberTerminatedEvent) jsonToObject(eventMessage.getBody(),
>>>> MemberTerminatedEvent.class);
>>>> +
>>>> +                    // Validate event against the existing topology
>>>> +                    try {
>>>> +                        TopologyManager.acquireReadLock();
>>>> +
>>>> +                        Service service =
>>>> TopologyManager.getTopology().getService(event.getServiceName());
>>>> +                        if (service == null) {
>>>> +                            throw new
>>>> RuntimeException(String.format("Service %s does not exist",
>>>> event.getServiceName()));
>>>> +                        }
>>>> +                        Cluster cluster =
>>>> service.getCluster(event.getClusterId());
>>>> +                        if (cluster == null) {
>>>> +                            throw new
>>>> RuntimeException(String.format("Cluster %s does not exist",
>>>> event.getClusterId()));
>>>> +                        }
>>>> +                        Member member =
>>>> cluster.getMember(event.getMemberId());
>>>> +                        if (member == null) {
>>>> +                            throw new
>>>> RuntimeException(String.format("Member %s does not exist",
>>>> event.getMemberId()));
>>>> +                        }
>>>> +                        if(member.getStatus() ==
>>>> MemberStatus.Terminated) {
>>>> +                            throw new
>>>> RuntimeException(String.format("Member %s of cluster %s of service %s is
>>>> already terminated", event.getMemberId(), event.getClusterId(),
>>>> event.getServiceName()));
>>>> +                        }
>>>> +                    }
>>>> +                    finally {
>>>> +                        TopologyManager.releaseReadLock();
>>>> +                    }
>>>> +
>>>> +                    // Apply changes to the topology
>>>> +                    try {
>>>> +                        TopologyManager.acquireWriteLock();
>>>> +
>>>> +                        Service service =
>>>> TopologyManager.getTopology().getService(event.getServiceName());
>>>> +                        Cluster cluster =
>>>> service.getCluster(event.getClusterId());
>>>> +                        Member member =
>>>> cluster.getMember(event.getMemberId());
>>>> +                        member.setStatus(MemberStatus.Terminated);
>>>> +                        if (log.isInfoEnabled()) {
>>>> +                            log.info(String.format("Member %s
>>>> terminated in cluster %s of service %s", event.getMemberId(),
>>>> event.getClusterId(), event.getServiceName()));
>>>> +                        }
>>>> +                    }
>>>> +                    finally {
>>>> +                        TopologyManager.releaseWriteLock();
>>>> +                    }
>>>> +                }
>>>> +
>>>> +            } catch (Exception e) {
>>>> +                e.printStackTrace();
>>>> +            }
>>>> +        }
>>>> +    }
>>>> +}
>>>>
>>>>
>>>> http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/bff99291/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/topology/TopologyEventMessageReceiver.java
>>>> ----------------------------------------------------------------------
>>>> diff --git
>>>> a/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/topology/TopologyEventMessageReceiver.java
>>>> b/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/topology/TopologyEventMessageReceiver.java
>>>> new file mode 100644
>>>> index 0000000..8040314
>>>> --- /dev/null
>>>> +++
>>>> b/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/topology/TopologyEventMessageReceiver.java
>>>> @@ -0,0 +1,50 @@
>>>> +/*
>>>> + * Licensed to the Apache Software Foundation (ASF) under one
>>>> + * or more contributor license agreements.  See the NOTICE file
>>>> + * distributed with this work for additional information
>>>> + * regarding copyright ownership.  The ASF licenses this file
>>>> + * to you under the Apache License, Version 2.0 (the
>>>> + * "License"); you may not use this file except in compliance
>>>> + * with the License.  You may obtain a copy of the License at
>>>> + *
>>>> + *     http://www.apache.org/licenses/LICENSE-2.0
>>>> + *
>>>> + * Unless required by applicable law or agreed to in writing,
>>>> + * software distributed under the License is distributed on an
>>>> + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
>>>> + * KIND, either express or implied.  See the License for the
>>>> + * specific language governing permissions and limitations
>>>> + * under the License.
>>>> + */
>>>> +package org.apache.stratos.lb.endpoint.topology;
>>>> +
>>>> +import javax.jms.JMSException;
>>>> +import javax.jms.Message;
>>>> +import javax.jms.MessageListener;
>>>> +import javax.jms.TextMessage;
>>>> +
>>>> +import org.apache.stratos.lb.endpoint.LoadBalancerContext;
>>>> +import org.apache.commons.logging.Log;
>>>> +import org.apache.commons.logging.LogFactory;
>>>> +
>>>> +public class TopologyEventMessageReceiver implements MessageListener {
>>>> +
>>>> +    private static final Log log =
>>>> LogFactory.getLog(TopologyEventMessageReceiver.class);
>>>> +
>>>> +    @Override
>>>> +    public void onMessage(Message message) {
>>>> +        if (message instanceof TextMessage) {
>>>> +            TextMessage receivedMessage = (TextMessage) message;
>>>> +            try {
>>>> +                if (log.isDebugEnabled()) {
>>>> +                    log.debug("Message received: " + ((TextMessage)
>>>> message).getText());
>>>> +                }
>>>> +                // Add received message to the queue
>>>> +
>>>>  TopologyEventQueue.getInstance().add(receivedMessage.getText());
>>>> +
>>>> +            } catch (JMSException e) {
>>>> +                log.error(e.getMessage(), e);
>>>> +            }
>>>> +        }
>>>> +    }
>>>> +}
>>>>
>>>>
>>>> http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/bff99291/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/topology/TopologyEventQueue.java
>>>> ----------------------------------------------------------------------
>>>> diff --git
>>>> a/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/topology/TopologyEventQueue.java
>>>> b/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/topology/TopologyEventQueue.java
>>>> new file mode 100644
>>>> index 0000000..c2cebff
>>>> --- /dev/null
>>>> +++
>>>> b/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/topology/TopologyEventQueue.java
>>>> @@ -0,0 +1,44 @@
>>>> +/*
>>>> + * Licensed to the Apache Software Foundation (ASF) under one
>>>> + * or more contributor license agreements.  See the NOTICE file
>>>> + * distributed with this work for additional information
>>>> + * regarding copyright ownership.  The ASF licenses this file
>>>> + * to you under the Apache License, Version 2.0 (the
>>>> + * "License"); you may not use this file except in compliance
>>>> + * with the License.  You may obtain a copy of the License at
>>>> + *
>>>> + *     http://www.apache.org/licenses/LICENSE-2.0
>>>> + *
>>>> + * Unless required by applicable law or agreed to in writing,
>>>> + * software distributed under the License is distributed on an
>>>> + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
>>>> + * KIND, either express or implied.  See the License for the
>>>> + * specific language governing permissions and limitations
>>>> + * under the License.
>>>> + */
>>>> +
>>>> +package org.apache.stratos.lb.endpoint.topology;
>>>> +
>>>> +import java.util.concurrent.BlockingQueue;
>>>> +import java.util.concurrent.LinkedBlockingQueue;
>>>> +
>>>> +/**
>>>> + * Implements topology event queue.
>>>> + */
>>>> +public class TopologyEventQueue extends LinkedBlockingQueue<String>{
>>>> +    private static volatile TopologyEventQueue instance;
>>>> +
>>>> +    private TopologyEventQueue(){
>>>> +    }
>>>> +
>>>> +    public static synchronized TopologyEventQueue getInstance() {
>>>> +        if (instance == null) {
>>>> +            synchronized (TopologyEventQueue.class){
>>>> +                if (instance == null) {
>>>> +                    instance = new TopologyEventQueue ();
>>>> +                }
>>>> +            }
>>>> +        }
>>>> +        return instance;
>>>> +    }
>>>> +}
>>>>
>>>>
>>>> http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/bff99291/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/topology/TopologyManager.java
>>>> ----------------------------------------------------------------------
>>>> diff --git
>>>> a/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/topology/TopologyManager.java
>>>> b/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/topology/TopologyManager.java
>>>> new file mode 100644
>>>> index 0000000..a27df39
>>>> --- /dev/null
>>>> +++
>>>> b/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/topology/TopologyManager.java
>>>> @@ -0,0 +1,57 @@
>>>> +/*
>>>> + * Licensed to the Apache Software Foundation (ASF) under one
>>>> + * or more contributor license agreements.  See the NOTICE file
>>>> + * distributed with this work for additional information
>>>> + * regarding copyright ownership.  The ASF licenses this file
>>>> + * to you under the Apache License, Version 2.0 (the
>>>> + * "License"); you may not use this file except in compliance
>>>> + * with the License.  You may obtain a copy of the License at
>>>> + *
>>>> + *     http://www.apache.org/licenses/LICENSE-2.0
>>>> + *
>>>> + * Unless required by applicable law or agreed to in writing,
>>>> + * software distributed under the License is distributed on an
>>>> + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
>>>> + * KIND, either express or implied.  See the License for the
>>>> + * specific language governing permissions and limitations
>>>> + * under the License.
>>>> + */
>>>> +
>>>> +package org.apache.stratos.lb.endpoint.topology;
>>>> +
>>>> +import org.apache.stratos.messaging.domain.topology.Topology;
>>>> +import java.util.concurrent.locks.ReentrantReadWriteLock;
>>>> +
>>>> +public class TopologyManager {
>>>> +    private static volatile Topology topology;
>>>> +    private static volatile ReentrantReadWriteLock lock = new
>>>> ReentrantReadWriteLock();
>>>> +    private static volatile ReentrantReadWriteLock.ReadLock readLock =
>>>> lock.readLock();
>>>> +    private static volatile ReentrantReadWriteLock.WriteLock writeLock
>>>> = lock.writeLock();
>>>> +
>>>> +    public static void acquireReadLock() {
>>>> +        readLock.lock();
>>>> +    }
>>>> +
>>>> +    public static void releaseReadLock() {
>>>> +        readLock.unlock();
>>>> +    }
>>>> +
>>>> +    public static void acquireWriteLock() {
>>>> +        writeLock.lock();
>>>> +    }
>>>> +
>>>> +    public static void releaseWriteLock() {
>>>> +        writeLock.unlock();
>>>> +    }
>>>> +
>>>> +    public static synchronized Topology getTopology() {
>>>> +        if (topology == null) {
>>>> +            synchronized (TopologyManager.class){
>>>> +                if (topology == null) {
>>>> +                    topology = new Topology();
>>>> +                }
>>>> +            }
>>>> +        }
>>>> +        return topology;
>>>> +    }
>>>> +}
>>>>
>>>>
>>>> http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/bff99291/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/util/Transformer.java
>>>> ----------------------------------------------------------------------
>>>> diff --git
>>>> a/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/util/Transformer.java
>>>> b/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/util/Transformer.java
>>>> new file mode 100644
>>>> index 0000000..3d19b50
>>>> --- /dev/null
>>>> +++
>>>> b/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/util/Transformer.java
>>>> @@ -0,0 +1,41 @@
>>>> +/*
>>>> + * Licensed to the Apache Software Foundation (ASF) under one
>>>> + * or more contributor license agreements.  See the NOTICE file
>>>> + * distributed with this work for additional information
>>>> + * regarding copyright ownership.  The ASF licenses this file
>>>> + * to you under the Apache License, Version 2.0 (the
>>>> + * "License"); you may not use this file except in compliance
>>>> + * with the License.  You may obtain a copy of the License at
>>>> + *
>>>> + *     http://www.apache.org/licenses/LICENSE-2.0
>>>> + *
>>>> + * Unless required by applicable law or agreed to in writing,
>>>> + * software distributed under the License is distributed on an
>>>> + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
>>>> + * KIND, either express or implied.  See the License for the
>>>> + * specific language governing permissions and limitations
>>>> + * under the License.
>>>> + */
>>>> +
>>>> +package org.apache.stratos.lb.endpoint.util;
>>>> +
>>>> +import org.apache.axis2.clustering.Member;
>>>> +import org.apache.stratos.messaging.domain.topology.Port;
>>>> +
>>>> +/**
>>>> + * Implements domain model transformation logic.
>>>> + */
>>>> +public class Transformer {
>>>> +    public static Member
>>>> transform(org.apache.stratos.messaging.domain.topology.Member
>>>> topologyMember) {
>>>> +        Port httpPort = topologyMember.getPort("HTTP");
>>>> +        Port httpsPort = topologyMember.getPort("HTTPS");
>>>> +
>>>> +        Member member = new Member(topologyMember.getHostName(),
>>>> httpPort.getValue());
>>>> +        member.setDomain(topologyMember.getHostName());
>>>> +        member.setHttpPort(httpPort.getValue());
>>>> +        member.setHttpsPort(httpsPort.getValue());
>>>> +        member.setActive(topologyMember.isActive());
>>>> +        member.setProperties(topologyMember.getProperties());
>>>> +        return  member;
>>>> +    }
>>>> +}
>>>>
>>>>
>>>> http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/bff99291/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Service.java
>>>> ----------------------------------------------------------------------
>>>> diff --git
>>>> a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Service.java
>>>> b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Service.java
>>>> index 471c91a..2e838e9 100644
>>>> ---
>>>> a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Service.java
>>>> +++
>>>> b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Service.java
>>>> @@ -43,7 +43,7 @@ public class Service {
>>>>          this.serviceName = serviceName;
>>>>      }
>>>>
>>>> -    public Collection<Cluster> getClusterMap() {
>>>> +    public Collection<Cluster> getClusters() {
>>>>          return clusterMap.values();
>>>>      }
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> --
>>>> Best Regards,
>>>> Nirmal
>>>>
>>>> Nirmal Fernando.
>>>> PPMC Member & Committer of Apache Stratos,
>>>> Senior Software Engineer, WSO2 Inc.
>>>>
>>>> Blog: http://nirmalfdo.blogspot.com/
>>>>
>>>
>>>
>>>
>>> --
>>> Best Regards,
>>> Nirmal
>>>
>>> Nirmal Fernando.
>>> PPMC Member & Committer of Apache Stratos,
>>> Senior Software Engineer, WSO2 Inc.
>>>
>>> Blog: http://nirmalfdo.blogspot.com/
>>>
>>
>>
>>
>> --
>> Isuru Perera
>> Senior Software Engineer | WSO2, Inc. | http://wso2.com/
>> Lean . Enterprise . Middleware
>>
>> about.me/chrishantha
>>
>
>
>
> --
> Best Regards,
> Nirmal
>
> Nirmal Fernando.
> PPMC Member & Committer of Apache Stratos,
> Senior Software Engineer, WSO2 Inc.
>
> Blog: http://nirmalfdo.blogspot.com/
>



-- 
Isuru Perera
Senior Software Engineer | WSO2, Inc. | http://wso2.com/
Lean . Enterprise . Middleware

about.me/chrishantha

Re: Do not wrap log.info logs with an if condition || Fwd: [1/2] Initial version of the lb endpoint for stratos 4.0.0

Posted by Nirmal Fernando <ni...@gmail.com>.
Isuru,


On Fri, Oct 18, 2013 at 6:23 PM, Isuru Perera <is...@wso2.com> wrote:

> For me, it is always better to have a guard condition before you logging.
>
> Someone might want to enable only ERROR level logs in production.
>

If you explicitly enable only ERROR logs, INFO logs won't get printed. I
just verified this.

>
> Speaking of logging, I think we should use SLF4J [1].
>

+1, I've also noticed lot of good things in SLF4J. For an example AFAIR
with SLF4J you don't need to clutter the code with those log.isDebugEnabled
stuff.

I'm just a big fan of it.
>
> [1] http://www.slf4j.org/
>
>
> On Fri, Oct 18, 2013 at 3:30 PM, Nirmal Fernando <ni...@gmail.com>wrote:
>
>>
>>
>>
>> On Fri, Oct 18, 2013 at 2:54 PM, Nirmal Fernando <ni...@gmail.com>wrote:
>>
>>>
>>> +
>>> +    @Override
>>> +    public void run() {
>>> +        if (log.isInfoEnabled()) {
>>> +            log.info("Topology event message processor started");
>>> +            log.info("Waiting for the complete topology event
>>> message...");
>>> +        }
>>>
>>> This makes these info logs not appear by default. You need to explicitly
>>> specify INFO level, if you want to see these logs.
>>>
>>
>> Above statement is not correct. Apologies!
>>
>> Still, the suggestion is valid, you do not need to check whether the INFO
>> log is enabled IMO.
>>
>>>
>>> So, do not wrap INFO logs with an 'IF'. But wrap DEBUG logs.
>>>
>>> +        while (true) {
>>> +            try {
>>> +                // First take the complete topology event
>>> +                String json = TopologyEventQueue.getInstance().take();
>>> +
>>> +                // Read message header and identify event
>>> +                EventMessageHeader header = readHeader(json);
>>> +                if
>>> (header.getEventClassName().equals(CompleteTopologyEvent.class.getName())) {
>>> +                    if (log.isInfoEnabled()) {
>>> +                        log.info(String.format("Event message received
>>> from queue: %s", header.getEventClassName()));
>>> +                    }
>>> +                    // Parse complete message and build event
>>> +                    TopologyEventMessage eventMessage =
>>> (TopologyEventMessage) jsonToObject(json, TopologyEventMessage.class);
>>> +                    CompleteTopologyEvent event =
>>> (CompleteTopologyEvent) jsonToObject(eventMessage.getBody(),
>>> CompleteTopologyEvent.class);
>>> +
>>>  TopologyManager.getTopology().addServices(event.getTopology().getServices());
>>> +                    if (log.isInfoEnabled()) {
>>> +                        log.info("Topology initialized");
>>> +                    }
>>> +                    break;
>>> +                }
>>> +            } catch (Exception e) {
>>> +                e.printStackTrace();
>>> +            }
>>> +        }
>>> +
>>> +        while (true) {
>>> +            try {
>>> +                String json = TopologyEventQueue.getInstance().take();
>>> +
>>> +                // Read message header and identify event
>>> +                EventMessageHeader header = readHeader(json);
>>> +                if (log.isInfoEnabled()) {
>>> +                    log.info(String.format("Event message received
>>> from queue: %s", header.getEventClassName()));
>>> +                }
>>> +
>>> +                if
>>> (header.getEventClassName().equals(ServiceCreatedEvent.class.getName())) {
>>> +                    // Parse complete message and build event
>>> +                    TopologyEventMessage eventMessage =
>>> (TopologyEventMessage) jsonToObject(json, TopologyEventMessage.class);
>>> +                    ServiceCreatedEvent event = (ServiceCreatedEvent)
>>> jsonToObject(eventMessage.getBody(), ServiceCreatedEvent.class);
>>> +
>>> +                    // Validate event against the existing topology
>>> +                    try {
>>> +                        TopologyManager.acquireReadLock();
>>> +                        if
>>> (TopologyManager.getTopology().serviceExists(event.getServiceName())) {
>>> +                            throw new
>>> RuntimeException(String.format("Service %s already exists",
>>> event.getServiceName()));
>>> +                        }
>>> +                    }
>>> +                    finally {
>>> +                        TopologyManager.releaseReadLock();
>>> +                    }
>>> +
>>> +                    // Apply changes to the topology
>>> +                    try {
>>> +                        Service service = new Service();
>>> +                        service.setServiceName(event.getServiceName());
>>> +                        TopologyManager.acquireWriteLock();
>>> +
>>>  TopologyManager.getTopology().addService(service);
>>> +                    }
>>> +                    finally {
>>> +                        TopologyManager.releaseWriteLock();
>>> +                    }
>>> +
>>> +                    if (log.isInfoEnabled()) {
>>> +                        log.info(String.format("Service %s created",
>>> event.getServiceName()));
>>> +                    }
>>> +                } else if
>>> (header.getEventClassName().equals(ServiceRemovedEvent.class.getName())) {
>>> +                    // Parse complete message and build event
>>> +                    TopologyEventMessage eventMessage =
>>> (TopologyEventMessage) jsonToObject(json, TopologyEventMessage.class);
>>> +                    ServiceRemovedEvent event = (ServiceRemovedEvent)
>>> jsonToObject(eventMessage.getBody(), ServiceRemovedEvent.class);
>>> +
>>> +                    // Validate event against the existing topology
>>> +                    try {
>>> +                        TopologyManager.acquireReadLock();
>>> +                        Service service =
>>> TopologyManager.getTopology().getService(event.getServiceName());
>>> +                        if (service == null) {
>>> +                            throw new
>>> RuntimeException(String.format("Service %s does not exist",
>>> event.getServiceName()));
>>> +                        }
>>> +                    }
>>> +                    finally {
>>> +                        TopologyManager.releaseReadLock();
>>> +                    }
>>> +
>>> +                    // Apply changes to the topology
>>> +                    try {
>>> +                        TopologyManager.acquireWriteLock();
>>> +                        Service service =
>>> TopologyManager.getTopology().getService(event.getServiceName());
>>> +
>>>  TopologyManager.getTopology().removeService(service);
>>> +                        if (log.isInfoEnabled()) {
>>> +                            log.info(String.format("Service %s
>>> removed", event.getServiceName()));
>>> +                        }
>>> +                    }
>>> +                    finally {
>>> +                        TopologyManager.releaseWriteLock();
>>> +                    }
>>> +                } else if
>>> (header.getEventClassName().equals(ClusterCreatedEvent.class.getName())) {
>>> +                    // Parse complete message and build event
>>> +                    TopologyEventMessage eventMessage =
>>> (TopologyEventMessage) jsonToObject(json, TopologyEventMessage.class);
>>> +                    ClusterCreatedEvent event = (ClusterCreatedEvent)
>>> jsonToObject(eventMessage.getBody(), ClusterCreatedEvent.class);
>>> +
>>> +                    // Validate event against the existing topology
>>> +                    try {
>>> +                        TopologyManager.acquireReadLock();
>>> +                        Service service =
>>> TopologyManager.getTopology().getService(event.getServiceName());
>>> +                        if (service == null) {
>>> +                            throw new
>>> RuntimeException(String.format("Service %s does not exist",
>>> event.getServiceName()));
>>> +                        }
>>> +                        if
>>> (service.clusterExists(event.getClusterId())) {
>>> +                            throw new
>>> RuntimeException(String.format("Cluster %s already exists in service %s",
>>> event.getClusterId(), event.getServiceName()));
>>> +                        }
>>> +                    }
>>> +                    finally {
>>> +                        TopologyManager.releaseReadLock();
>>> +                    }
>>> +
>>> +                    // Apply changes to the topology
>>> +                    try {
>>> +                        TopologyManager.acquireWriteLock();
>>> +                        Cluster cluster = new Cluster();
>>> +                        cluster.setClusterId(event.getClusterId());
>>> +                        cluster.setHostName(event.getHostName());
>>> +                        cluster.setTenantRange(event.getTenantRange());
>>> +                        cluster.setCloud(event.getCloud());
>>> +                        cluster.setRegion(event.getRegion());
>>> +                        cluster.setZone(event.getZone());
>>> +
>>> +                        Service service =
>>> TopologyManager.getTopology().getService(event.getServiceName());
>>> +                        service.addCluster(cluster);
>>> +                        if (log.isInfoEnabled()) {
>>> +                            log.info(String.format("Cluster %s created
>>> for service %s", event.getClusterId(), event.getServiceName()));
>>> +                        }
>>> +                    }
>>> +                    finally {
>>> +                        TopologyManager.releaseWriteLock();
>>> +                    }
>>> +                } else if
>>> (header.getEventClassName().endsWith(ClusterRemovedEvent.class.getName())) {
>>> +                    // Parse complete message and build event
>>> +                    TopologyEventMessage eventMessage =
>>> (TopologyEventMessage) jsonToObject(json, TopologyEventMessage.class);
>>> +                    ClusterRemovedEvent event = (ClusterRemovedEvent)
>>> jsonToObject(eventMessage.getBody(), ClusterRemovedEvent.class);
>>> +
>>> +                    // Validate event against the existing topology
>>> +                    try {
>>> +                        TopologyManager.acquireReadLock();
>>> +                        Service service =
>>> TopologyManager.getTopology().getService(event.getServiceName());
>>> +                        if (service == null) {
>>> +                            throw new
>>> RuntimeException(String.format("Service %s does not exist",
>>> event.getServiceName()));
>>> +                        }
>>> +                        if
>>> (!service.clusterExists(event.getClusterId())) {
>>> +                            throw new
>>> RuntimeException(String.format("Cluster %s does not exist in service %s",
>>> event.getClusterId(), event.getServiceName()));
>>> +                        }
>>> +                    }
>>> +                    finally {
>>> +                        TopologyManager.releaseReadLock();
>>> +                    }
>>> +
>>> +                    // Apply changes to the topology
>>> +                    try {
>>> +                        TopologyManager.acquireWriteLock();
>>> +                        Service service =
>>> TopologyManager.getTopology().getService(event.getServiceName());
>>> +                        service.removeCluster(event.getClusterId());
>>> +
>>> +                        if (log.isInfoEnabled()) {
>>> +                            log.info(String.format("Cluster %s removed
>>> from service %s", event.getClusterId(), event.getServiceName()));
>>> +                        }
>>> +                    }
>>> +                    finally {
>>> +                        TopologyManager.releaseWriteLock();
>>> +                    }
>>> +                } else if
>>> (header.getEventClassName().endsWith(MemberStartedEvent.class.getName())) {
>>> +                    // Parse complete message and build event
>>> +                    TopologyEventMessage eventMessage =
>>> (TopologyEventMessage) jsonToObject(json, TopologyEventMessage.class);
>>> +                    MemberStartedEvent event = (MemberStartedEvent)
>>> jsonToObject(eventMessage.getBody(), MemberStartedEvent.class);
>>> +
>>> +                    // Validate event against the existing topology
>>> +                    try {
>>> +                        TopologyManager.acquireReadLock();
>>> +                        Service service =
>>> TopologyManager.getTopology().getService(event.getServiceName());
>>> +                        if (service == null) {
>>> +                            throw new
>>> RuntimeException(String.format("Service %s does not exist",
>>> event.getServiceName()));
>>> +                        }
>>> +                        Cluster cluster =
>>> service.getCluster(event.getClusterId());
>>> +                        if (cluster == null) {
>>> +                            throw new
>>> RuntimeException(String.format("Cluster %s does not exist",
>>> event.getClusterId()));
>>> +                        }
>>> +                        if (cluster.memberExists(event.getMemberId())) {
>>> +                            throw new
>>> RuntimeException(String.format("Member %s already exist in cluster %s of
>>> service %s", event.getMemberId(), event.getClusterId(),
>>> event.getServiceName()));
>>> +                        }
>>> +                    }
>>> +                    finally {
>>> +                        TopologyManager.releaseReadLock();
>>> +                    }
>>> +
>>> +                    // Apply changes to the topology
>>> +                    try {
>>> +                        TopologyManager.acquireWriteLock();
>>> +
>>> +                        Member member = new Member();
>>> +                        member.setServiceName(event.getServiceName());
>>> +                        member.setClusterId(event.getClusterId());
>>> +                        member.setMemberId(event.getMemberId());
>>> +                        member.setHostName(event.getHostName());
>>> +                        member.setStatus(MemberStatus.Starting);
>>> +                        Service service =
>>> TopologyManager.getTopology().getService(event.getServiceName());
>>> +                        Cluster cluster =
>>> service.getCluster(event.getClusterId());
>>> +                        cluster.addMember(member);
>>> +
>>> +                        if (log.isInfoEnabled()) {
>>> +                            log.info(String.format("Member %s started
>>> in cluster %s of service %s", event.getMemberId(), event.getClusterId(),
>>> event.getServiceName()));
>>> +                        }
>>> +                    }
>>> +                    finally {
>>> +                        TopologyManager.releaseWriteLock();
>>> +                    }
>>> +                } else if
>>> (header.getEventClassName().endsWith(MemberActivatedEvent.class.getName()))
>>> {
>>> +                    // Parse complete message and build event
>>> +                    TopologyEventMessage eventMessage =
>>> (TopologyEventMessage) jsonToObject(json, TopologyEventMessage.class);
>>> +                    MemberActivatedEvent event = (MemberActivatedEvent)
>>> jsonToObject(eventMessage.getBody(), MemberActivatedEvent.class);
>>> +
>>> +                    // Validate event against the existing topology
>>> +                    try {
>>> +                        TopologyManager.acquireReadLock();
>>> +
>>> +                        Service service =
>>> TopologyManager.getTopology().getService(event.getServiceName());
>>> +                        if (service == null) {
>>> +                            throw new
>>> RuntimeException(String.format("Service %s does not exist",
>>> event.getServiceName()));
>>> +                        }
>>> +                        Cluster cluster =
>>> service.getCluster(event.getClusterId());
>>> +                        if (cluster == null) {
>>> +                            throw new
>>> RuntimeException(String.format("Cluster %s does not exist",
>>> event.getClusterId()));
>>> +                        }
>>> +                        Member member =
>>> cluster.getMember(event.getMemberId());
>>> +                        if (member == null) {
>>> +                            throw new
>>> RuntimeException(String.format("Member %s does not exist",
>>> event.getMemberId()));
>>> +                        }
>>> +                        if(member.getStatus() ==
>>> MemberStatus.Activated) {
>>> +                            throw new
>>> RuntimeException(String.format("Member %s of cluster %s of service %s is
>>> already activated", event.getMemberId(), event.getClusterId(),
>>> event.getServiceName()));
>>> +                        }
>>> +                    }
>>> +                    finally {
>>> +                        TopologyManager.releaseReadLock();
>>> +                    }
>>> +
>>> +                    // Apply changes to the topology
>>> +                    try {
>>> +                        TopologyManager.acquireWriteLock();
>>> +
>>> +                        Service service =
>>> TopologyManager.getTopology().getService(event.getServiceName());
>>> +                        Cluster cluster =
>>> service.getCluster(event.getClusterId());
>>> +                        Member member =
>>> cluster.getMember(event.getMemberId());
>>> +                        member.setStatus(MemberStatus.Activated);
>>> +                        if (log.isInfoEnabled()) {
>>> +                            log.info(String.format("Member %s
>>> activated in cluster %s of service %s", event.getMemberId(),
>>> event.getClusterId(), event.getServiceName()));
>>> +                        }
>>> +                    }
>>> +                    finally {
>>> +                        TopologyManager.releaseWriteLock();
>>> +                    }
>>> +                } else if
>>> (header.getEventClassName().endsWith(MemberSuspendedEvent.class.getName()))
>>> {
>>> +                    // Parse complete message and build event
>>> +                    TopologyEventMessage eventMessage =
>>> (TopologyEventMessage) jsonToObject(json, TopologyEventMessage.class);
>>> +                    MemberSuspendedEvent event = (MemberSuspendedEvent)
>>> jsonToObject(eventMessage.getBody(), MemberSuspendedEvent.class);
>>> +
>>> +                    // Validate event against the existing topology
>>> +                    try {
>>> +                        TopologyManager.acquireReadLock();
>>> +
>>> +                        Service service =
>>> TopologyManager.getTopology().getService(event.getServiceName());
>>> +                        if (service == null) {
>>> +                            throw new
>>> RuntimeException(String.format("Service %s does not exist",
>>> event.getServiceName()));
>>> +                        }
>>> +                        Cluster cluster =
>>> service.getCluster(event.getClusterId());
>>> +                        if (cluster == null) {
>>> +                            throw new
>>> RuntimeException(String.format("Cluster %s does not exist",
>>> event.getClusterId()));
>>> +                        }
>>> +                        Member member =
>>> cluster.getMember(event.getMemberId());
>>> +                        if (member == null) {
>>> +                            throw new
>>> RuntimeException(String.format("Member %s does not exist",
>>> event.getMemberId()));
>>> +                        }
>>> +                        if(member.getStatus() ==
>>> MemberStatus.Suspended) {
>>> +                            throw new
>>> RuntimeException(String.format("Member %s of cluster %s of service %s is
>>> already suspended", event.getMemberId(), event.getClusterId(),
>>> event.getServiceName()));
>>> +                        }
>>> +                    }
>>> +                    finally {
>>> +                        TopologyManager.releaseReadLock();
>>> +                    }
>>> +
>>> +                    // Apply changes to the topology
>>> +                    try {
>>> +                        TopologyManager.acquireWriteLock();
>>> +
>>> +                        Service service =
>>> TopologyManager.getTopology().getService(event.getServiceName());
>>> +                        Cluster cluster =
>>> service.getCluster(event.getClusterId());
>>> +                        Member member =
>>> cluster.getMember(event.getMemberId());
>>> +                        member.setStatus(MemberStatus.Suspended);
>>> +                        if (log.isInfoEnabled()) {
>>> +                            log.info(String.format("Member %s
>>> suspended in cluster %s of service %s", event.getMemberId(),
>>> event.getClusterId(), event.getServiceName()));
>>> +                        }
>>> +                    }
>>> +                    finally {
>>> +                        TopologyManager.releaseWriteLock();
>>> +                    }
>>> +                } else if
>>> (header.getEventClassName().endsWith(MemberTerminatedEvent.class.getName()))
>>> {
>>> +                    // Parse complete message and build event
>>> +                    TopologyEventMessage eventMessage =
>>> (TopologyEventMessage) jsonToObject(json, TopologyEventMessage.class);
>>> +                    MemberTerminatedEvent event =
>>> (MemberTerminatedEvent) jsonToObject(eventMessage.getBody(),
>>> MemberTerminatedEvent.class);
>>> +
>>> +                    // Validate event against the existing topology
>>> +                    try {
>>> +                        TopologyManager.acquireReadLock();
>>> +
>>> +                        Service service =
>>> TopologyManager.getTopology().getService(event.getServiceName());
>>> +                        if (service == null) {
>>> +                            throw new
>>> RuntimeException(String.format("Service %s does not exist",
>>> event.getServiceName()));
>>> +                        }
>>> +                        Cluster cluster =
>>> service.getCluster(event.getClusterId());
>>> +                        if (cluster == null) {
>>> +                            throw new
>>> RuntimeException(String.format("Cluster %s does not exist",
>>> event.getClusterId()));
>>> +                        }
>>> +                        Member member =
>>> cluster.getMember(event.getMemberId());
>>> +                        if (member == null) {
>>> +                            throw new
>>> RuntimeException(String.format("Member %s does not exist",
>>> event.getMemberId()));
>>> +                        }
>>> +                        if(member.getStatus() ==
>>> MemberStatus.Terminated) {
>>> +                            throw new
>>> RuntimeException(String.format("Member %s of cluster %s of service %s is
>>> already terminated", event.getMemberId(), event.getClusterId(),
>>> event.getServiceName()));
>>> +                        }
>>> +                    }
>>> +                    finally {
>>> +                        TopologyManager.releaseReadLock();
>>> +                    }
>>> +
>>> +                    // Apply changes to the topology
>>> +                    try {
>>> +                        TopologyManager.acquireWriteLock();
>>> +
>>> +                        Service service =
>>> TopologyManager.getTopology().getService(event.getServiceName());
>>> +                        Cluster cluster =
>>> service.getCluster(event.getClusterId());
>>> +                        Member member =
>>> cluster.getMember(event.getMemberId());
>>> +                        member.setStatus(MemberStatus.Terminated);
>>> +                        if (log.isInfoEnabled()) {
>>> +                            log.info(String.format("Member %s
>>> terminated in cluster %s of service %s", event.getMemberId(),
>>> event.getClusterId(), event.getServiceName()));
>>> +                        }
>>> +                    }
>>> +                    finally {
>>> +                        TopologyManager.releaseWriteLock();
>>> +                    }
>>> +                }
>>> +
>>> +            } catch (Exception e) {
>>> +                e.printStackTrace();
>>> +            }
>>> +        }
>>> +    }
>>> +}
>>>
>>>
>>> http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/bff99291/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/topology/TopologyEventMessageReceiver.java
>>> ----------------------------------------------------------------------
>>> diff --git
>>> a/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/topology/TopologyEventMessageReceiver.java
>>> b/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/topology/TopologyEventMessageReceiver.java
>>> new file mode 100644
>>> index 0000000..8040314
>>> --- /dev/null
>>> +++
>>> b/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/topology/TopologyEventMessageReceiver.java
>>> @@ -0,0 +1,50 @@
>>> +/*
>>> + * Licensed to the Apache Software Foundation (ASF) under one
>>> + * or more contributor license agreements.  See the NOTICE file
>>> + * distributed with this work for additional information
>>> + * regarding copyright ownership.  The ASF licenses this file
>>> + * to you under the Apache License, Version 2.0 (the
>>> + * "License"); you may not use this file except in compliance
>>> + * with the License.  You may obtain a copy of the License at
>>> + *
>>> + *     http://www.apache.org/licenses/LICENSE-2.0
>>> + *
>>> + * Unless required by applicable law or agreed to in writing,
>>> + * software distributed under the License is distributed on an
>>> + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
>>> + * KIND, either express or implied.  See the License for the
>>> + * specific language governing permissions and limitations
>>> + * under the License.
>>> + */
>>> +package org.apache.stratos.lb.endpoint.topology;
>>> +
>>> +import javax.jms.JMSException;
>>> +import javax.jms.Message;
>>> +import javax.jms.MessageListener;
>>> +import javax.jms.TextMessage;
>>> +
>>> +import org.apache.stratos.lb.endpoint.LoadBalancerContext;
>>> +import org.apache.commons.logging.Log;
>>> +import org.apache.commons.logging.LogFactory;
>>> +
>>> +public class TopologyEventMessageReceiver implements MessageListener {
>>> +
>>> +    private static final Log log =
>>> LogFactory.getLog(TopologyEventMessageReceiver.class);
>>> +
>>> +    @Override
>>> +    public void onMessage(Message message) {
>>> +        if (message instanceof TextMessage) {
>>> +            TextMessage receivedMessage = (TextMessage) message;
>>> +            try {
>>> +                if (log.isDebugEnabled()) {
>>> +                    log.debug("Message received: " + ((TextMessage)
>>> message).getText());
>>> +                }
>>> +                // Add received message to the queue
>>> +
>>>  TopologyEventQueue.getInstance().add(receivedMessage.getText());
>>> +
>>> +            } catch (JMSException e) {
>>> +                log.error(e.getMessage(), e);
>>> +            }
>>> +        }
>>> +    }
>>> +}
>>>
>>>
>>> http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/bff99291/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/topology/TopologyEventQueue.java
>>> ----------------------------------------------------------------------
>>> diff --git
>>> a/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/topology/TopologyEventQueue.java
>>> b/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/topology/TopologyEventQueue.java
>>> new file mode 100644
>>> index 0000000..c2cebff
>>> --- /dev/null
>>> +++
>>> b/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/topology/TopologyEventQueue.java
>>> @@ -0,0 +1,44 @@
>>> +/*
>>> + * Licensed to the Apache Software Foundation (ASF) under one
>>> + * or more contributor license agreements.  See the NOTICE file
>>> + * distributed with this work for additional information
>>> + * regarding copyright ownership.  The ASF licenses this file
>>> + * to you under the Apache License, Version 2.0 (the
>>> + * "License"); you may not use this file except in compliance
>>> + * with the License.  You may obtain a copy of the License at
>>> + *
>>> + *     http://www.apache.org/licenses/LICENSE-2.0
>>> + *
>>> + * Unless required by applicable law or agreed to in writing,
>>> + * software distributed under the License is distributed on an
>>> + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
>>> + * KIND, either express or implied.  See the License for the
>>> + * specific language governing permissions and limitations
>>> + * under the License.
>>> + */
>>> +
>>> +package org.apache.stratos.lb.endpoint.topology;
>>> +
>>> +import java.util.concurrent.BlockingQueue;
>>> +import java.util.concurrent.LinkedBlockingQueue;
>>> +
>>> +/**
>>> + * Implements topology event queue.
>>> + */
>>> +public class TopologyEventQueue extends LinkedBlockingQueue<String>{
>>> +    private static volatile TopologyEventQueue instance;
>>> +
>>> +    private TopologyEventQueue(){
>>> +    }
>>> +
>>> +    public static synchronized TopologyEventQueue getInstance() {
>>> +        if (instance == null) {
>>> +            synchronized (TopologyEventQueue.class){
>>> +                if (instance == null) {
>>> +                    instance = new TopologyEventQueue ();
>>> +                }
>>> +            }
>>> +        }
>>> +        return instance;
>>> +    }
>>> +}
>>>
>>>
>>> http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/bff99291/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/topology/TopologyManager.java
>>> ----------------------------------------------------------------------
>>> diff --git
>>> a/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/topology/TopologyManager.java
>>> b/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/topology/TopologyManager.java
>>> new file mode 100644
>>> index 0000000..a27df39
>>> --- /dev/null
>>> +++
>>> b/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/topology/TopologyManager.java
>>> @@ -0,0 +1,57 @@
>>> +/*
>>> + * Licensed to the Apache Software Foundation (ASF) under one
>>> + * or more contributor license agreements.  See the NOTICE file
>>> + * distributed with this work for additional information
>>> + * regarding copyright ownership.  The ASF licenses this file
>>> + * to you under the Apache License, Version 2.0 (the
>>> + * "License"); you may not use this file except in compliance
>>> + * with the License.  You may obtain a copy of the License at
>>> + *
>>> + *     http://www.apache.org/licenses/LICENSE-2.0
>>> + *
>>> + * Unless required by applicable law or agreed to in writing,
>>> + * software distributed under the License is distributed on an
>>> + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
>>> + * KIND, either express or implied.  See the License for the
>>> + * specific language governing permissions and limitations
>>> + * under the License.
>>> + */
>>> +
>>> +package org.apache.stratos.lb.endpoint.topology;
>>> +
>>> +import org.apache.stratos.messaging.domain.topology.Topology;
>>> +import java.util.concurrent.locks.ReentrantReadWriteLock;
>>> +
>>> +public class TopologyManager {
>>> +    private static volatile Topology topology;
>>> +    private static volatile ReentrantReadWriteLock lock = new
>>> ReentrantReadWriteLock();
>>> +    private static volatile ReentrantReadWriteLock.ReadLock readLock =
>>> lock.readLock();
>>> +    private static volatile ReentrantReadWriteLock.WriteLock writeLock
>>> = lock.writeLock();
>>> +
>>> +    public static void acquireReadLock() {
>>> +        readLock.lock();
>>> +    }
>>> +
>>> +    public static void releaseReadLock() {
>>> +        readLock.unlock();
>>> +    }
>>> +
>>> +    public static void acquireWriteLock() {
>>> +        writeLock.lock();
>>> +    }
>>> +
>>> +    public static void releaseWriteLock() {
>>> +        writeLock.unlock();
>>> +    }
>>> +
>>> +    public static synchronized Topology getTopology() {
>>> +        if (topology == null) {
>>> +            synchronized (TopologyManager.class){
>>> +                if (topology == null) {
>>> +                    topology = new Topology();
>>> +                }
>>> +            }
>>> +        }
>>> +        return topology;
>>> +    }
>>> +}
>>>
>>>
>>> http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/bff99291/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/util/Transformer.java
>>> ----------------------------------------------------------------------
>>> diff --git
>>> a/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/util/Transformer.java
>>> b/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/util/Transformer.java
>>> new file mode 100644
>>> index 0000000..3d19b50
>>> --- /dev/null
>>> +++
>>> b/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/util/Transformer.java
>>> @@ -0,0 +1,41 @@
>>> +/*
>>> + * Licensed to the Apache Software Foundation (ASF) under one
>>> + * or more contributor license agreements.  See the NOTICE file
>>> + * distributed with this work for additional information
>>> + * regarding copyright ownership.  The ASF licenses this file
>>> + * to you under the Apache License, Version 2.0 (the
>>> + * "License"); you may not use this file except in compliance
>>> + * with the License.  You may obtain a copy of the License at
>>> + *
>>> + *     http://www.apache.org/licenses/LICENSE-2.0
>>> + *
>>> + * Unless required by applicable law or agreed to in writing,
>>> + * software distributed under the License is distributed on an
>>> + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
>>> + * KIND, either express or implied.  See the License for the
>>> + * specific language governing permissions and limitations
>>> + * under the License.
>>> + */
>>> +
>>> +package org.apache.stratos.lb.endpoint.util;
>>> +
>>> +import org.apache.axis2.clustering.Member;
>>> +import org.apache.stratos.messaging.domain.topology.Port;
>>> +
>>> +/**
>>> + * Implements domain model transformation logic.
>>> + */
>>> +public class Transformer {
>>> +    public static Member
>>> transform(org.apache.stratos.messaging.domain.topology.Member
>>> topologyMember) {
>>> +        Port httpPort = topologyMember.getPort("HTTP");
>>> +        Port httpsPort = topologyMember.getPort("HTTPS");
>>> +
>>> +        Member member = new Member(topologyMember.getHostName(),
>>> httpPort.getValue());
>>> +        member.setDomain(topologyMember.getHostName());
>>> +        member.setHttpPort(httpPort.getValue());
>>> +        member.setHttpsPort(httpsPort.getValue());
>>> +        member.setActive(topologyMember.isActive());
>>> +        member.setProperties(topologyMember.getProperties());
>>> +        return  member;
>>> +    }
>>> +}
>>>
>>>
>>> http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/bff99291/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Service.java
>>> ----------------------------------------------------------------------
>>> diff --git
>>> a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Service.java
>>> b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Service.java
>>> index 471c91a..2e838e9 100644
>>> ---
>>> a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Service.java
>>> +++
>>> b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Service.java
>>> @@ -43,7 +43,7 @@ public class Service {
>>>          this.serviceName = serviceName;
>>>      }
>>>
>>> -    public Collection<Cluster> getClusterMap() {
>>> +    public Collection<Cluster> getClusters() {
>>>          return clusterMap.values();
>>>      }
>>>
>>>
>>>
>>>
>>>
>>> --
>>> Best Regards,
>>> Nirmal
>>>
>>> Nirmal Fernando.
>>> PPMC Member & Committer of Apache Stratos,
>>> Senior Software Engineer, WSO2 Inc.
>>>
>>> Blog: http://nirmalfdo.blogspot.com/
>>>
>>
>>
>>
>> --
>> Best Regards,
>> Nirmal
>>
>> Nirmal Fernando.
>> PPMC Member & Committer of Apache Stratos,
>> Senior Software Engineer, WSO2 Inc.
>>
>> Blog: http://nirmalfdo.blogspot.com/
>>
>
>
>
> --
> Isuru Perera
> Senior Software Engineer | WSO2, Inc. | http://wso2.com/
> Lean . Enterprise . Middleware
>
> about.me/chrishantha
>



-- 
Best Regards,
Nirmal

Nirmal Fernando.
PPMC Member & Committer of Apache Stratos,
Senior Software Engineer, WSO2 Inc.

Blog: http://nirmalfdo.blogspot.com/

Re: Do not wrap log.info logs with an if condition || Fwd: [1/2] Initial version of the lb endpoint for stratos 4.0.0

Posted by Isuru Perera <is...@wso2.com>.
For me, it is always better to have a guard condition before you logging.

Someone might want to enable only ERROR level logs in production.

Speaking of logging, I think we should use SLF4J [1]. I'm just a big fan of
it.

[1] http://www.slf4j.org/


On Fri, Oct 18, 2013 at 3:30 PM, Nirmal Fernando <ni...@gmail.com>wrote:

>
>
>
> On Fri, Oct 18, 2013 at 2:54 PM, Nirmal Fernando <ni...@gmail.com>wrote:
>
>>
>> +
>> +    @Override
>> +    public void run() {
>> +        if (log.isInfoEnabled()) {
>> +            log.info("Topology event message processor started");
>> +            log.info("Waiting for the complete topology event
>> message...");
>> +        }
>>
>> This makes these info logs not appear by default. You need to explicitly
>> specify INFO level, if you want to see these logs.
>>
>
> Above statement is not correct. Apologies!
>
> Still, the suggestion is valid, you do not need to check whether the INFO
> log is enabled IMO.
>
>>
>> So, do not wrap INFO logs with an 'IF'. But wrap DEBUG logs.
>>
>> +        while (true) {
>> +            try {
>> +                // First take the complete topology event
>> +                String json = TopologyEventQueue.getInstance().take();
>> +
>> +                // Read message header and identify event
>> +                EventMessageHeader header = readHeader(json);
>> +                if
>> (header.getEventClassName().equals(CompleteTopologyEvent.class.getName())) {
>> +                    if (log.isInfoEnabled()) {
>> +                        log.info(String.format("Event message received
>> from queue: %s", header.getEventClassName()));
>> +                    }
>> +                    // Parse complete message and build event
>> +                    TopologyEventMessage eventMessage =
>> (TopologyEventMessage) jsonToObject(json, TopologyEventMessage.class);
>> +                    CompleteTopologyEvent event =
>> (CompleteTopologyEvent) jsonToObject(eventMessage.getBody(),
>> CompleteTopologyEvent.class);
>> +
>>  TopologyManager.getTopology().addServices(event.getTopology().getServices());
>> +                    if (log.isInfoEnabled()) {
>> +                        log.info("Topology initialized");
>> +                    }
>> +                    break;
>> +                }
>> +            } catch (Exception e) {
>> +                e.printStackTrace();
>> +            }
>> +        }
>> +
>> +        while (true) {
>> +            try {
>> +                String json = TopologyEventQueue.getInstance().take();
>> +
>> +                // Read message header and identify event
>> +                EventMessageHeader header = readHeader(json);
>> +                if (log.isInfoEnabled()) {
>> +                    log.info(String.format("Event message received from
>> queue: %s", header.getEventClassName()));
>> +                }
>> +
>> +                if
>> (header.getEventClassName().equals(ServiceCreatedEvent.class.getName())) {
>> +                    // Parse complete message and build event
>> +                    TopologyEventMessage eventMessage =
>> (TopologyEventMessage) jsonToObject(json, TopologyEventMessage.class);
>> +                    ServiceCreatedEvent event = (ServiceCreatedEvent)
>> jsonToObject(eventMessage.getBody(), ServiceCreatedEvent.class);
>> +
>> +                    // Validate event against the existing topology
>> +                    try {
>> +                        TopologyManager.acquireReadLock();
>> +                        if
>> (TopologyManager.getTopology().serviceExists(event.getServiceName())) {
>> +                            throw new
>> RuntimeException(String.format("Service %s already exists",
>> event.getServiceName()));
>> +                        }
>> +                    }
>> +                    finally {
>> +                        TopologyManager.releaseReadLock();
>> +                    }
>> +
>> +                    // Apply changes to the topology
>> +                    try {
>> +                        Service service = new Service();
>> +                        service.setServiceName(event.getServiceName());
>> +                        TopologyManager.acquireWriteLock();
>> +
>>  TopologyManager.getTopology().addService(service);
>> +                    }
>> +                    finally {
>> +                        TopologyManager.releaseWriteLock();
>> +                    }
>> +
>> +                    if (log.isInfoEnabled()) {
>> +                        log.info(String.format("Service %s created",
>> event.getServiceName()));
>> +                    }
>> +                } else if
>> (header.getEventClassName().equals(ServiceRemovedEvent.class.getName())) {
>> +                    // Parse complete message and build event
>> +                    TopologyEventMessage eventMessage =
>> (TopologyEventMessage) jsonToObject(json, TopologyEventMessage.class);
>> +                    ServiceRemovedEvent event = (ServiceRemovedEvent)
>> jsonToObject(eventMessage.getBody(), ServiceRemovedEvent.class);
>> +
>> +                    // Validate event against the existing topology
>> +                    try {
>> +                        TopologyManager.acquireReadLock();
>> +                        Service service =
>> TopologyManager.getTopology().getService(event.getServiceName());
>> +                        if (service == null) {
>> +                            throw new
>> RuntimeException(String.format("Service %s does not exist",
>> event.getServiceName()));
>> +                        }
>> +                    }
>> +                    finally {
>> +                        TopologyManager.releaseReadLock();
>> +                    }
>> +
>> +                    // Apply changes to the topology
>> +                    try {
>> +                        TopologyManager.acquireWriteLock();
>> +                        Service service =
>> TopologyManager.getTopology().getService(event.getServiceName());
>> +
>>  TopologyManager.getTopology().removeService(service);
>> +                        if (log.isInfoEnabled()) {
>> +                            log.info(String.format("Service %s
>> removed", event.getServiceName()));
>> +                        }
>> +                    }
>> +                    finally {
>> +                        TopologyManager.releaseWriteLock();
>> +                    }
>> +                } else if
>> (header.getEventClassName().equals(ClusterCreatedEvent.class.getName())) {
>> +                    // Parse complete message and build event
>> +                    TopologyEventMessage eventMessage =
>> (TopologyEventMessage) jsonToObject(json, TopologyEventMessage.class);
>> +                    ClusterCreatedEvent event = (ClusterCreatedEvent)
>> jsonToObject(eventMessage.getBody(), ClusterCreatedEvent.class);
>> +
>> +                    // Validate event against the existing topology
>> +                    try {
>> +                        TopologyManager.acquireReadLock();
>> +                        Service service =
>> TopologyManager.getTopology().getService(event.getServiceName());
>> +                        if (service == null) {
>> +                            throw new
>> RuntimeException(String.format("Service %s does not exist",
>> event.getServiceName()));
>> +                        }
>> +                        if (service.clusterExists(event.getClusterId()))
>> {
>> +                            throw new
>> RuntimeException(String.format("Cluster %s already exists in service %s",
>> event.getClusterId(), event.getServiceName()));
>> +                        }
>> +                    }
>> +                    finally {
>> +                        TopologyManager.releaseReadLock();
>> +                    }
>> +
>> +                    // Apply changes to the topology
>> +                    try {
>> +                        TopologyManager.acquireWriteLock();
>> +                        Cluster cluster = new Cluster();
>> +                        cluster.setClusterId(event.getClusterId());
>> +                        cluster.setHostName(event.getHostName());
>> +                        cluster.setTenantRange(event.getTenantRange());
>> +                        cluster.setCloud(event.getCloud());
>> +                        cluster.setRegion(event.getRegion());
>> +                        cluster.setZone(event.getZone());
>> +
>> +                        Service service =
>> TopologyManager.getTopology().getService(event.getServiceName());
>> +                        service.addCluster(cluster);
>> +                        if (log.isInfoEnabled()) {
>> +                            log.info(String.format("Cluster %s created
>> for service %s", event.getClusterId(), event.getServiceName()));
>> +                        }
>> +                    }
>> +                    finally {
>> +                        TopologyManager.releaseWriteLock();
>> +                    }
>> +                } else if
>> (header.getEventClassName().endsWith(ClusterRemovedEvent.class.getName())) {
>> +                    // Parse complete message and build event
>> +                    TopologyEventMessage eventMessage =
>> (TopologyEventMessage) jsonToObject(json, TopologyEventMessage.class);
>> +                    ClusterRemovedEvent event = (ClusterRemovedEvent)
>> jsonToObject(eventMessage.getBody(), ClusterRemovedEvent.class);
>> +
>> +                    // Validate event against the existing topology
>> +                    try {
>> +                        TopologyManager.acquireReadLock();
>> +                        Service service =
>> TopologyManager.getTopology().getService(event.getServiceName());
>> +                        if (service == null) {
>> +                            throw new
>> RuntimeException(String.format("Service %s does not exist",
>> event.getServiceName()));
>> +                        }
>> +                        if
>> (!service.clusterExists(event.getClusterId())) {
>> +                            throw new
>> RuntimeException(String.format("Cluster %s does not exist in service %s",
>> event.getClusterId(), event.getServiceName()));
>> +                        }
>> +                    }
>> +                    finally {
>> +                        TopologyManager.releaseReadLock();
>> +                    }
>> +
>> +                    // Apply changes to the topology
>> +                    try {
>> +                        TopologyManager.acquireWriteLock();
>> +                        Service service =
>> TopologyManager.getTopology().getService(event.getServiceName());
>> +                        service.removeCluster(event.getClusterId());
>> +
>> +                        if (log.isInfoEnabled()) {
>> +                            log.info(String.format("Cluster %s removed
>> from service %s", event.getClusterId(), event.getServiceName()));
>> +                        }
>> +                    }
>> +                    finally {
>> +                        TopologyManager.releaseWriteLock();
>> +                    }
>> +                } else if
>> (header.getEventClassName().endsWith(MemberStartedEvent.class.getName())) {
>> +                    // Parse complete message and build event
>> +                    TopologyEventMessage eventMessage =
>> (TopologyEventMessage) jsonToObject(json, TopologyEventMessage.class);
>> +                    MemberStartedEvent event = (MemberStartedEvent)
>> jsonToObject(eventMessage.getBody(), MemberStartedEvent.class);
>> +
>> +                    // Validate event against the existing topology
>> +                    try {
>> +                        TopologyManager.acquireReadLock();
>> +                        Service service =
>> TopologyManager.getTopology().getService(event.getServiceName());
>> +                        if (service == null) {
>> +                            throw new
>> RuntimeException(String.format("Service %s does not exist",
>> event.getServiceName()));
>> +                        }
>> +                        Cluster cluster =
>> service.getCluster(event.getClusterId());
>> +                        if (cluster == null) {
>> +                            throw new
>> RuntimeException(String.format("Cluster %s does not exist",
>> event.getClusterId()));
>> +                        }
>> +                        if (cluster.memberExists(event.getMemberId())) {
>> +                            throw new
>> RuntimeException(String.format("Member %s already exist in cluster %s of
>> service %s", event.getMemberId(), event.getClusterId(),
>> event.getServiceName()));
>> +                        }
>> +                    }
>> +                    finally {
>> +                        TopologyManager.releaseReadLock();
>> +                    }
>> +
>> +                    // Apply changes to the topology
>> +                    try {
>> +                        TopologyManager.acquireWriteLock();
>> +
>> +                        Member member = new Member();
>> +                        member.setServiceName(event.getServiceName());
>> +                        member.setClusterId(event.getClusterId());
>> +                        member.setMemberId(event.getMemberId());
>> +                        member.setHostName(event.getHostName());
>> +                        member.setStatus(MemberStatus.Starting);
>> +                        Service service =
>> TopologyManager.getTopology().getService(event.getServiceName());
>> +                        Cluster cluster =
>> service.getCluster(event.getClusterId());
>> +                        cluster.addMember(member);
>> +
>> +                        if (log.isInfoEnabled()) {
>> +                            log.info(String.format("Member %s started
>> in cluster %s of service %s", event.getMemberId(), event.getClusterId(),
>> event.getServiceName()));
>> +                        }
>> +                    }
>> +                    finally {
>> +                        TopologyManager.releaseWriteLock();
>> +                    }
>> +                } else if
>> (header.getEventClassName().endsWith(MemberActivatedEvent.class.getName()))
>> {
>> +                    // Parse complete message and build event
>> +                    TopologyEventMessage eventMessage =
>> (TopologyEventMessage) jsonToObject(json, TopologyEventMessage.class);
>> +                    MemberActivatedEvent event = (MemberActivatedEvent)
>> jsonToObject(eventMessage.getBody(), MemberActivatedEvent.class);
>> +
>> +                    // Validate event against the existing topology
>> +                    try {
>> +                        TopologyManager.acquireReadLock();
>> +
>> +                        Service service =
>> TopologyManager.getTopology().getService(event.getServiceName());
>> +                        if (service == null) {
>> +                            throw new
>> RuntimeException(String.format("Service %s does not exist",
>> event.getServiceName()));
>> +                        }
>> +                        Cluster cluster =
>> service.getCluster(event.getClusterId());
>> +                        if (cluster == null) {
>> +                            throw new
>> RuntimeException(String.format("Cluster %s does not exist",
>> event.getClusterId()));
>> +                        }
>> +                        Member member =
>> cluster.getMember(event.getMemberId());
>> +                        if (member == null) {
>> +                            throw new
>> RuntimeException(String.format("Member %s does not exist",
>> event.getMemberId()));
>> +                        }
>> +                        if(member.getStatus() == MemberStatus.Activated)
>> {
>> +                            throw new
>> RuntimeException(String.format("Member %s of cluster %s of service %s is
>> already activated", event.getMemberId(), event.getClusterId(),
>> event.getServiceName()));
>> +                        }
>> +                    }
>> +                    finally {
>> +                        TopologyManager.releaseReadLock();
>> +                    }
>> +
>> +                    // Apply changes to the topology
>> +                    try {
>> +                        TopologyManager.acquireWriteLock();
>> +
>> +                        Service service =
>> TopologyManager.getTopology().getService(event.getServiceName());
>> +                        Cluster cluster =
>> service.getCluster(event.getClusterId());
>> +                        Member member =
>> cluster.getMember(event.getMemberId());
>> +                        member.setStatus(MemberStatus.Activated);
>> +                        if (log.isInfoEnabled()) {
>> +                            log.info(String.format("Member %s activated
>> in cluster %s of service %s", event.getMemberId(), event.getClusterId(),
>> event.getServiceName()));
>> +                        }
>> +                    }
>> +                    finally {
>> +                        TopologyManager.releaseWriteLock();
>> +                    }
>> +                } else if
>> (header.getEventClassName().endsWith(MemberSuspendedEvent.class.getName()))
>> {
>> +                    // Parse complete message and build event
>> +                    TopologyEventMessage eventMessage =
>> (TopologyEventMessage) jsonToObject(json, TopologyEventMessage.class);
>> +                    MemberSuspendedEvent event = (MemberSuspendedEvent)
>> jsonToObject(eventMessage.getBody(), MemberSuspendedEvent.class);
>> +
>> +                    // Validate event against the existing topology
>> +                    try {
>> +                        TopologyManager.acquireReadLock();
>> +
>> +                        Service service =
>> TopologyManager.getTopology().getService(event.getServiceName());
>> +                        if (service == null) {
>> +                            throw new
>> RuntimeException(String.format("Service %s does not exist",
>> event.getServiceName()));
>> +                        }
>> +                        Cluster cluster =
>> service.getCluster(event.getClusterId());
>> +                        if (cluster == null) {
>> +                            throw new
>> RuntimeException(String.format("Cluster %s does not exist",
>> event.getClusterId()));
>> +                        }
>> +                        Member member =
>> cluster.getMember(event.getMemberId());
>> +                        if (member == null) {
>> +                            throw new
>> RuntimeException(String.format("Member %s does not exist",
>> event.getMemberId()));
>> +                        }
>> +                        if(member.getStatus() == MemberStatus.Suspended)
>> {
>> +                            throw new
>> RuntimeException(String.format("Member %s of cluster %s of service %s is
>> already suspended", event.getMemberId(), event.getClusterId(),
>> event.getServiceName()));
>> +                        }
>> +                    }
>> +                    finally {
>> +                        TopologyManager.releaseReadLock();
>> +                    }
>> +
>> +                    // Apply changes to the topology
>> +                    try {
>> +                        TopologyManager.acquireWriteLock();
>> +
>> +                        Service service =
>> TopologyManager.getTopology().getService(event.getServiceName());
>> +                        Cluster cluster =
>> service.getCluster(event.getClusterId());
>> +                        Member member =
>> cluster.getMember(event.getMemberId());
>> +                        member.setStatus(MemberStatus.Suspended);
>> +                        if (log.isInfoEnabled()) {
>> +                            log.info(String.format("Member %s suspended
>> in cluster %s of service %s", event.getMemberId(), event.getClusterId(),
>> event.getServiceName()));
>> +                        }
>> +                    }
>> +                    finally {
>> +                        TopologyManager.releaseWriteLock();
>> +                    }
>> +                } else if
>> (header.getEventClassName().endsWith(MemberTerminatedEvent.class.getName()))
>> {
>> +                    // Parse complete message and build event
>> +                    TopologyEventMessage eventMessage =
>> (TopologyEventMessage) jsonToObject(json, TopologyEventMessage.class);
>> +                    MemberTerminatedEvent event =
>> (MemberTerminatedEvent) jsonToObject(eventMessage.getBody(),
>> MemberTerminatedEvent.class);
>> +
>> +                    // Validate event against the existing topology
>> +                    try {
>> +                        TopologyManager.acquireReadLock();
>> +
>> +                        Service service =
>> TopologyManager.getTopology().getService(event.getServiceName());
>> +                        if (service == null) {
>> +                            throw new
>> RuntimeException(String.format("Service %s does not exist",
>> event.getServiceName()));
>> +                        }
>> +                        Cluster cluster =
>> service.getCluster(event.getClusterId());
>> +                        if (cluster == null) {
>> +                            throw new
>> RuntimeException(String.format("Cluster %s does not exist",
>> event.getClusterId()));
>> +                        }
>> +                        Member member =
>> cluster.getMember(event.getMemberId());
>> +                        if (member == null) {
>> +                            throw new
>> RuntimeException(String.format("Member %s does not exist",
>> event.getMemberId()));
>> +                        }
>> +                        if(member.getStatus() ==
>> MemberStatus.Terminated) {
>> +                            throw new
>> RuntimeException(String.format("Member %s of cluster %s of service %s is
>> already terminated", event.getMemberId(), event.getClusterId(),
>> event.getServiceName()));
>> +                        }
>> +                    }
>> +                    finally {
>> +                        TopologyManager.releaseReadLock();
>> +                    }
>> +
>> +                    // Apply changes to the topology
>> +                    try {
>> +                        TopologyManager.acquireWriteLock();
>> +
>> +                        Service service =
>> TopologyManager.getTopology().getService(event.getServiceName());
>> +                        Cluster cluster =
>> service.getCluster(event.getClusterId());
>> +                        Member member =
>> cluster.getMember(event.getMemberId());
>> +                        member.setStatus(MemberStatus.Terminated);
>> +                        if (log.isInfoEnabled()) {
>> +                            log.info(String.format("Member %s
>> terminated in cluster %s of service %s", event.getMemberId(),
>> event.getClusterId(), event.getServiceName()));
>> +                        }
>> +                    }
>> +                    finally {
>> +                        TopologyManager.releaseWriteLock();
>> +                    }
>> +                }
>> +
>> +            } catch (Exception e) {
>> +                e.printStackTrace();
>> +            }
>> +        }
>> +    }
>> +}
>>
>>
>> http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/bff99291/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/topology/TopologyEventMessageReceiver.java
>> ----------------------------------------------------------------------
>> diff --git
>> a/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/topology/TopologyEventMessageReceiver.java
>> b/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/topology/TopologyEventMessageReceiver.java
>> new file mode 100644
>> index 0000000..8040314
>> --- /dev/null
>> +++
>> b/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/topology/TopologyEventMessageReceiver.java
>> @@ -0,0 +1,50 @@
>> +/*
>> + * Licensed to the Apache Software Foundation (ASF) under one
>> + * or more contributor license agreements.  See the NOTICE file
>> + * distributed with this work for additional information
>> + * regarding copyright ownership.  The ASF licenses this file
>> + * to you under the Apache License, Version 2.0 (the
>> + * "License"); you may not use this file except in compliance
>> + * with the License.  You may obtain a copy of the License at
>> + *
>> + *     http://www.apache.org/licenses/LICENSE-2.0
>> + *
>> + * Unless required by applicable law or agreed to in writing,
>> + * software distributed under the License is distributed on an
>> + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
>> + * KIND, either express or implied.  See the License for the
>> + * specific language governing permissions and limitations
>> + * under the License.
>> + */
>> +package org.apache.stratos.lb.endpoint.topology;
>> +
>> +import javax.jms.JMSException;
>> +import javax.jms.Message;
>> +import javax.jms.MessageListener;
>> +import javax.jms.TextMessage;
>> +
>> +import org.apache.stratos.lb.endpoint.LoadBalancerContext;
>> +import org.apache.commons.logging.Log;
>> +import org.apache.commons.logging.LogFactory;
>> +
>> +public class TopologyEventMessageReceiver implements MessageListener {
>> +
>> +    private static final Log log =
>> LogFactory.getLog(TopologyEventMessageReceiver.class);
>> +
>> +    @Override
>> +    public void onMessage(Message message) {
>> +        if (message instanceof TextMessage) {
>> +            TextMessage receivedMessage = (TextMessage) message;
>> +            try {
>> +                if (log.isDebugEnabled()) {
>> +                    log.debug("Message received: " + ((TextMessage)
>> message).getText());
>> +                }
>> +                // Add received message to the queue
>> +
>>  TopologyEventQueue.getInstance().add(receivedMessage.getText());
>> +
>> +            } catch (JMSException e) {
>> +                log.error(e.getMessage(), e);
>> +            }
>> +        }
>> +    }
>> +}
>>
>>
>> http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/bff99291/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/topology/TopologyEventQueue.java
>> ----------------------------------------------------------------------
>> diff --git
>> a/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/topology/TopologyEventQueue.java
>> b/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/topology/TopologyEventQueue.java
>> new file mode 100644
>> index 0000000..c2cebff
>> --- /dev/null
>> +++
>> b/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/topology/TopologyEventQueue.java
>> @@ -0,0 +1,44 @@
>> +/*
>> + * Licensed to the Apache Software Foundation (ASF) under one
>> + * or more contributor license agreements.  See the NOTICE file
>> + * distributed with this work for additional information
>> + * regarding copyright ownership.  The ASF licenses this file
>> + * to you under the Apache License, Version 2.0 (the
>> + * "License"); you may not use this file except in compliance
>> + * with the License.  You may obtain a copy of the License at
>> + *
>> + *     http://www.apache.org/licenses/LICENSE-2.0
>> + *
>> + * Unless required by applicable law or agreed to in writing,
>> + * software distributed under the License is distributed on an
>> + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
>> + * KIND, either express or implied.  See the License for the
>> + * specific language governing permissions and limitations
>> + * under the License.
>> + */
>> +
>> +package org.apache.stratos.lb.endpoint.topology;
>> +
>> +import java.util.concurrent.BlockingQueue;
>> +import java.util.concurrent.LinkedBlockingQueue;
>> +
>> +/**
>> + * Implements topology event queue.
>> + */
>> +public class TopologyEventQueue extends LinkedBlockingQueue<String>{
>> +    private static volatile TopologyEventQueue instance;
>> +
>> +    private TopologyEventQueue(){
>> +    }
>> +
>> +    public static synchronized TopologyEventQueue getInstance() {
>> +        if (instance == null) {
>> +            synchronized (TopologyEventQueue.class){
>> +                if (instance == null) {
>> +                    instance = new TopologyEventQueue ();
>> +                }
>> +            }
>> +        }
>> +        return instance;
>> +    }
>> +}
>>
>>
>> http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/bff99291/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/topology/TopologyManager.java
>> ----------------------------------------------------------------------
>> diff --git
>> a/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/topology/TopologyManager.java
>> b/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/topology/TopologyManager.java
>> new file mode 100644
>> index 0000000..a27df39
>> --- /dev/null
>> +++
>> b/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/topology/TopologyManager.java
>> @@ -0,0 +1,57 @@
>> +/*
>> + * Licensed to the Apache Software Foundation (ASF) under one
>> + * or more contributor license agreements.  See the NOTICE file
>> + * distributed with this work for additional information
>> + * regarding copyright ownership.  The ASF licenses this file
>> + * to you under the Apache License, Version 2.0 (the
>> + * "License"); you may not use this file except in compliance
>> + * with the License.  You may obtain a copy of the License at
>> + *
>> + *     http://www.apache.org/licenses/LICENSE-2.0
>> + *
>> + * Unless required by applicable law or agreed to in writing,
>> + * software distributed under the License is distributed on an
>> + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
>> + * KIND, either express or implied.  See the License for the
>> + * specific language governing permissions and limitations
>> + * under the License.
>> + */
>> +
>> +package org.apache.stratos.lb.endpoint.topology;
>> +
>> +import org.apache.stratos.messaging.domain.topology.Topology;
>> +import java.util.concurrent.locks.ReentrantReadWriteLock;
>> +
>> +public class TopologyManager {
>> +    private static volatile Topology topology;
>> +    private static volatile ReentrantReadWriteLock lock = new
>> ReentrantReadWriteLock();
>> +    private static volatile ReentrantReadWriteLock.ReadLock readLock =
>> lock.readLock();
>> +    private static volatile ReentrantReadWriteLock.WriteLock writeLock =
>> lock.writeLock();
>> +
>> +    public static void acquireReadLock() {
>> +        readLock.lock();
>> +    }
>> +
>> +    public static void releaseReadLock() {
>> +        readLock.unlock();
>> +    }
>> +
>> +    public static void acquireWriteLock() {
>> +        writeLock.lock();
>> +    }
>> +
>> +    public static void releaseWriteLock() {
>> +        writeLock.unlock();
>> +    }
>> +
>> +    public static synchronized Topology getTopology() {
>> +        if (topology == null) {
>> +            synchronized (TopologyManager.class){
>> +                if (topology == null) {
>> +                    topology = new Topology();
>> +                }
>> +            }
>> +        }
>> +        return topology;
>> +    }
>> +}
>>
>>
>> http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/bff99291/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/util/Transformer.java
>> ----------------------------------------------------------------------
>> diff --git
>> a/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/util/Transformer.java
>> b/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/util/Transformer.java
>> new file mode 100644
>> index 0000000..3d19b50
>> --- /dev/null
>> +++
>> b/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/util/Transformer.java
>> @@ -0,0 +1,41 @@
>> +/*
>> + * Licensed to the Apache Software Foundation (ASF) under one
>> + * or more contributor license agreements.  See the NOTICE file
>> + * distributed with this work for additional information
>> + * regarding copyright ownership.  The ASF licenses this file
>> + * to you under the Apache License, Version 2.0 (the
>> + * "License"); you may not use this file except in compliance
>> + * with the License.  You may obtain a copy of the License at
>> + *
>> + *     http://www.apache.org/licenses/LICENSE-2.0
>> + *
>> + * Unless required by applicable law or agreed to in writing,
>> + * software distributed under the License is distributed on an
>> + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
>> + * KIND, either express or implied.  See the License for the
>> + * specific language governing permissions and limitations
>> + * under the License.
>> + */
>> +
>> +package org.apache.stratos.lb.endpoint.util;
>> +
>> +import org.apache.axis2.clustering.Member;
>> +import org.apache.stratos.messaging.domain.topology.Port;
>> +
>> +/**
>> + * Implements domain model transformation logic.
>> + */
>> +public class Transformer {
>> +    public static Member
>> transform(org.apache.stratos.messaging.domain.topology.Member
>> topologyMember) {
>> +        Port httpPort = topologyMember.getPort("HTTP");
>> +        Port httpsPort = topologyMember.getPort("HTTPS");
>> +
>> +        Member member = new Member(topologyMember.getHostName(),
>> httpPort.getValue());
>> +        member.setDomain(topologyMember.getHostName());
>> +        member.setHttpPort(httpPort.getValue());
>> +        member.setHttpsPort(httpsPort.getValue());
>> +        member.setActive(topologyMember.isActive());
>> +        member.setProperties(topologyMember.getProperties());
>> +        return  member;
>> +    }
>> +}
>>
>>
>> http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/bff99291/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Service.java
>> ----------------------------------------------------------------------
>> diff --git
>> a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Service.java
>> b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Service.java
>> index 471c91a..2e838e9 100644
>> ---
>> a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Service.java
>> +++
>> b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Service.java
>> @@ -43,7 +43,7 @@ public class Service {
>>          this.serviceName = serviceName;
>>      }
>>
>> -    public Collection<Cluster> getClusterMap() {
>> +    public Collection<Cluster> getClusters() {
>>          return clusterMap.values();
>>      }
>>
>>
>>
>>
>>
>> --
>> Best Regards,
>> Nirmal
>>
>> Nirmal Fernando.
>> PPMC Member & Committer of Apache Stratos,
>> Senior Software Engineer, WSO2 Inc.
>>
>> Blog: http://nirmalfdo.blogspot.com/
>>
>
>
>
> --
> Best Regards,
> Nirmal
>
> Nirmal Fernando.
> PPMC Member & Committer of Apache Stratos,
> Senior Software Engineer, WSO2 Inc.
>
> Blog: http://nirmalfdo.blogspot.com/
>



-- 
Isuru Perera
Senior Software Engineer | WSO2, Inc. | http://wso2.com/
Lean . Enterprise . Middleware

about.me/chrishantha

Re: Do not wrap log.info logs with an if condition || Fwd: [1/2] Initial version of the lb endpoint for stratos 4.0.0

Posted by Nirmal Fernando <ni...@gmail.com>.
On Fri, Oct 18, 2013 at 2:54 PM, Nirmal Fernando <ni...@gmail.com>wrote:

>
> +
> +    @Override
> +    public void run() {
> +        if (log.isInfoEnabled()) {
> +            log.info("Topology event message processor started");
> +            log.info("Waiting for the complete topology event
> message...");
> +        }
>
> This makes these info logs not appear by default. You need to explicitly
> specify INFO level, if you want to see these logs.
>

Above statement is not correct. Apologies!

Still, the suggestion is valid, you do not need to check whether the INFO
log is enabled IMO.

>
> So, do not wrap INFO logs with an 'IF'. But wrap DEBUG logs.
>
> +        while (true) {
> +            try {
> +                // First take the complete topology event
> +                String json = TopologyEventQueue.getInstance().take();
> +
> +                // Read message header and identify event
> +                EventMessageHeader header = readHeader(json);
> +                if
> (header.getEventClassName().equals(CompleteTopologyEvent.class.getName())) {
> +                    if (log.isInfoEnabled()) {
> +                        log.info(String.format("Event message received
> from queue: %s", header.getEventClassName()));
> +                    }
> +                    // Parse complete message and build event
> +                    TopologyEventMessage eventMessage =
> (TopologyEventMessage) jsonToObject(json, TopologyEventMessage.class);
> +                    CompleteTopologyEvent event = (CompleteTopologyEvent)
> jsonToObject(eventMessage.getBody(), CompleteTopologyEvent.class);
> +
>  TopologyManager.getTopology().addServices(event.getTopology().getServices());
> +                    if (log.isInfoEnabled()) {
> +                        log.info("Topology initialized");
> +                    }
> +                    break;
> +                }
> +            } catch (Exception e) {
> +                e.printStackTrace();
> +            }
> +        }
> +
> +        while (true) {
> +            try {
> +                String json = TopologyEventQueue.getInstance().take();
> +
> +                // Read message header and identify event
> +                EventMessageHeader header = readHeader(json);
> +                if (log.isInfoEnabled()) {
> +                    log.info(String.format("Event message received from
> queue: %s", header.getEventClassName()));
> +                }
> +
> +                if
> (header.getEventClassName().equals(ServiceCreatedEvent.class.getName())) {
> +                    // Parse complete message and build event
> +                    TopologyEventMessage eventMessage =
> (TopologyEventMessage) jsonToObject(json, TopologyEventMessage.class);
> +                    ServiceCreatedEvent event = (ServiceCreatedEvent)
> jsonToObject(eventMessage.getBody(), ServiceCreatedEvent.class);
> +
> +                    // Validate event against the existing topology
> +                    try {
> +                        TopologyManager.acquireReadLock();
> +                        if
> (TopologyManager.getTopology().serviceExists(event.getServiceName())) {
> +                            throw new
> RuntimeException(String.format("Service %s already exists",
> event.getServiceName()));
> +                        }
> +                    }
> +                    finally {
> +                        TopologyManager.releaseReadLock();
> +                    }
> +
> +                    // Apply changes to the topology
> +                    try {
> +                        Service service = new Service();
> +                        service.setServiceName(event.getServiceName());
> +                        TopologyManager.acquireWriteLock();
> +                        TopologyManager.getTopology().addService(service);
> +                    }
> +                    finally {
> +                        TopologyManager.releaseWriteLock();
> +                    }
> +
> +                    if (log.isInfoEnabled()) {
> +                        log.info(String.format("Service %s created",
> event.getServiceName()));
> +                    }
> +                } else if
> (header.getEventClassName().equals(ServiceRemovedEvent.class.getName())) {
> +                    // Parse complete message and build event
> +                    TopologyEventMessage eventMessage =
> (TopologyEventMessage) jsonToObject(json, TopologyEventMessage.class);
> +                    ServiceRemovedEvent event = (ServiceRemovedEvent)
> jsonToObject(eventMessage.getBody(), ServiceRemovedEvent.class);
> +
> +                    // Validate event against the existing topology
> +                    try {
> +                        TopologyManager.acquireReadLock();
> +                        Service service =
> TopologyManager.getTopology().getService(event.getServiceName());
> +                        if (service == null) {
> +                            throw new
> RuntimeException(String.format("Service %s does not exist",
> event.getServiceName()));
> +                        }
> +                    }
> +                    finally {
> +                        TopologyManager.releaseReadLock();
> +                    }
> +
> +                    // Apply changes to the topology
> +                    try {
> +                        TopologyManager.acquireWriteLock();
> +                        Service service =
> TopologyManager.getTopology().getService(event.getServiceName());
> +
>  TopologyManager.getTopology().removeService(service);
> +                        if (log.isInfoEnabled()) {
> +                            log.info(String.format("Service %s removed",
> event.getServiceName()));
> +                        }
> +                    }
> +                    finally {
> +                        TopologyManager.releaseWriteLock();
> +                    }
> +                } else if
> (header.getEventClassName().equals(ClusterCreatedEvent.class.getName())) {
> +                    // Parse complete message and build event
> +                    TopologyEventMessage eventMessage =
> (TopologyEventMessage) jsonToObject(json, TopologyEventMessage.class);
> +                    ClusterCreatedEvent event = (ClusterCreatedEvent)
> jsonToObject(eventMessage.getBody(), ClusterCreatedEvent.class);
> +
> +                    // Validate event against the existing topology
> +                    try {
> +                        TopologyManager.acquireReadLock();
> +                        Service service =
> TopologyManager.getTopology().getService(event.getServiceName());
> +                        if (service == null) {
> +                            throw new
> RuntimeException(String.format("Service %s does not exist",
> event.getServiceName()));
> +                        }
> +                        if (service.clusterExists(event.getClusterId())) {
> +                            throw new
> RuntimeException(String.format("Cluster %s already exists in service %s",
> event.getClusterId(), event.getServiceName()));
> +                        }
> +                    }
> +                    finally {
> +                        TopologyManager.releaseReadLock();
> +                    }
> +
> +                    // Apply changes to the topology
> +                    try {
> +                        TopologyManager.acquireWriteLock();
> +                        Cluster cluster = new Cluster();
> +                        cluster.setClusterId(event.getClusterId());
> +                        cluster.setHostName(event.getHostName());
> +                        cluster.setTenantRange(event.getTenantRange());
> +                        cluster.setCloud(event.getCloud());
> +                        cluster.setRegion(event.getRegion());
> +                        cluster.setZone(event.getZone());
> +
> +                        Service service =
> TopologyManager.getTopology().getService(event.getServiceName());
> +                        service.addCluster(cluster);
> +                        if (log.isInfoEnabled()) {
> +                            log.info(String.format("Cluster %s created
> for service %s", event.getClusterId(), event.getServiceName()));
> +                        }
> +                    }
> +                    finally {
> +                        TopologyManager.releaseWriteLock();
> +                    }
> +                } else if
> (header.getEventClassName().endsWith(ClusterRemovedEvent.class.getName())) {
> +                    // Parse complete message and build event
> +                    TopologyEventMessage eventMessage =
> (TopologyEventMessage) jsonToObject(json, TopologyEventMessage.class);
> +                    ClusterRemovedEvent event = (ClusterRemovedEvent)
> jsonToObject(eventMessage.getBody(), ClusterRemovedEvent.class);
> +
> +                    // Validate event against the existing topology
> +                    try {
> +                        TopologyManager.acquireReadLock();
> +                        Service service =
> TopologyManager.getTopology().getService(event.getServiceName());
> +                        if (service == null) {
> +                            throw new
> RuntimeException(String.format("Service %s does not exist",
> event.getServiceName()));
> +                        }
> +                        if (!service.clusterExists(event.getClusterId()))
> {
> +                            throw new
> RuntimeException(String.format("Cluster %s does not exist in service %s",
> event.getClusterId(), event.getServiceName()));
> +                        }
> +                    }
> +                    finally {
> +                        TopologyManager.releaseReadLock();
> +                    }
> +
> +                    // Apply changes to the topology
> +                    try {
> +                        TopologyManager.acquireWriteLock();
> +                        Service service =
> TopologyManager.getTopology().getService(event.getServiceName());
> +                        service.removeCluster(event.getClusterId());
> +
> +                        if (log.isInfoEnabled()) {
> +                            log.info(String.format("Cluster %s removed
> from service %s", event.getClusterId(), event.getServiceName()));
> +                        }
> +                    }
> +                    finally {
> +                        TopologyManager.releaseWriteLock();
> +                    }
> +                } else if
> (header.getEventClassName().endsWith(MemberStartedEvent.class.getName())) {
> +                    // Parse complete message and build event
> +                    TopologyEventMessage eventMessage =
> (TopologyEventMessage) jsonToObject(json, TopologyEventMessage.class);
> +                    MemberStartedEvent event = (MemberStartedEvent)
> jsonToObject(eventMessage.getBody(), MemberStartedEvent.class);
> +
> +                    // Validate event against the existing topology
> +                    try {
> +                        TopologyManager.acquireReadLock();
> +                        Service service =
> TopologyManager.getTopology().getService(event.getServiceName());
> +                        if (service == null) {
> +                            throw new
> RuntimeException(String.format("Service %s does not exist",
> event.getServiceName()));
> +                        }
> +                        Cluster cluster =
> service.getCluster(event.getClusterId());
> +                        if (cluster == null) {
> +                            throw new
> RuntimeException(String.format("Cluster %s does not exist",
> event.getClusterId()));
> +                        }
> +                        if (cluster.memberExists(event.getMemberId())) {
> +                            throw new
> RuntimeException(String.format("Member %s already exist in cluster %s of
> service %s", event.getMemberId(), event.getClusterId(),
> event.getServiceName()));
> +                        }
> +                    }
> +                    finally {
> +                        TopologyManager.releaseReadLock();
> +                    }
> +
> +                    // Apply changes to the topology
> +                    try {
> +                        TopologyManager.acquireWriteLock();
> +
> +                        Member member = new Member();
> +                        member.setServiceName(event.getServiceName());
> +                        member.setClusterId(event.getClusterId());
> +                        member.setMemberId(event.getMemberId());
> +                        member.setHostName(event.getHostName());
> +                        member.setStatus(MemberStatus.Starting);
> +                        Service service =
> TopologyManager.getTopology().getService(event.getServiceName());
> +                        Cluster cluster =
> service.getCluster(event.getClusterId());
> +                        cluster.addMember(member);
> +
> +                        if (log.isInfoEnabled()) {
> +                            log.info(String.format("Member %s started in
> cluster %s of service %s", event.getMemberId(), event.getClusterId(),
> event.getServiceName()));
> +                        }
> +                    }
> +                    finally {
> +                        TopologyManager.releaseWriteLock();
> +                    }
> +                } else if
> (header.getEventClassName().endsWith(MemberActivatedEvent.class.getName()))
> {
> +                    // Parse complete message and build event
> +                    TopologyEventMessage eventMessage =
> (TopologyEventMessage) jsonToObject(json, TopologyEventMessage.class);
> +                    MemberActivatedEvent event = (MemberActivatedEvent)
> jsonToObject(eventMessage.getBody(), MemberActivatedEvent.class);
> +
> +                    // Validate event against the existing topology
> +                    try {
> +                        TopologyManager.acquireReadLock();
> +
> +                        Service service =
> TopologyManager.getTopology().getService(event.getServiceName());
> +                        if (service == null) {
> +                            throw new
> RuntimeException(String.format("Service %s does not exist",
> event.getServiceName()));
> +                        }
> +                        Cluster cluster =
> service.getCluster(event.getClusterId());
> +                        if (cluster == null) {
> +                            throw new
> RuntimeException(String.format("Cluster %s does not exist",
> event.getClusterId()));
> +                        }
> +                        Member member =
> cluster.getMember(event.getMemberId());
> +                        if (member == null) {
> +                            throw new
> RuntimeException(String.format("Member %s does not exist",
> event.getMemberId()));
> +                        }
> +                        if(member.getStatus() == MemberStatus.Activated) {
> +                            throw new
> RuntimeException(String.format("Member %s of cluster %s of service %s is
> already activated", event.getMemberId(), event.getClusterId(),
> event.getServiceName()));
> +                        }
> +                    }
> +                    finally {
> +                        TopologyManager.releaseReadLock();
> +                    }
> +
> +                    // Apply changes to the topology
> +                    try {
> +                        TopologyManager.acquireWriteLock();
> +
> +                        Service service =
> TopologyManager.getTopology().getService(event.getServiceName());
> +                        Cluster cluster =
> service.getCluster(event.getClusterId());
> +                        Member member =
> cluster.getMember(event.getMemberId());
> +                        member.setStatus(MemberStatus.Activated);
> +                        if (log.isInfoEnabled()) {
> +                            log.info(String.format("Member %s activated
> in cluster %s of service %s", event.getMemberId(), event.getClusterId(),
> event.getServiceName()));
> +                        }
> +                    }
> +                    finally {
> +                        TopologyManager.releaseWriteLock();
> +                    }
> +                } else if
> (header.getEventClassName().endsWith(MemberSuspendedEvent.class.getName()))
> {
> +                    // Parse complete message and build event
> +                    TopologyEventMessage eventMessage =
> (TopologyEventMessage) jsonToObject(json, TopologyEventMessage.class);
> +                    MemberSuspendedEvent event = (MemberSuspendedEvent)
> jsonToObject(eventMessage.getBody(), MemberSuspendedEvent.class);
> +
> +                    // Validate event against the existing topology
> +                    try {
> +                        TopologyManager.acquireReadLock();
> +
> +                        Service service =
> TopologyManager.getTopology().getService(event.getServiceName());
> +                        if (service == null) {
> +                            throw new
> RuntimeException(String.format("Service %s does not exist",
> event.getServiceName()));
> +                        }
> +                        Cluster cluster =
> service.getCluster(event.getClusterId());
> +                        if (cluster == null) {
> +                            throw new
> RuntimeException(String.format("Cluster %s does not exist",
> event.getClusterId()));
> +                        }
> +                        Member member =
> cluster.getMember(event.getMemberId());
> +                        if (member == null) {
> +                            throw new
> RuntimeException(String.format("Member %s does not exist",
> event.getMemberId()));
> +                        }
> +                        if(member.getStatus() == MemberStatus.Suspended) {
> +                            throw new
> RuntimeException(String.format("Member %s of cluster %s of service %s is
> already suspended", event.getMemberId(), event.getClusterId(),
> event.getServiceName()));
> +                        }
> +                    }
> +                    finally {
> +                        TopologyManager.releaseReadLock();
> +                    }
> +
> +                    // Apply changes to the topology
> +                    try {
> +                        TopologyManager.acquireWriteLock();
> +
> +                        Service service =
> TopologyManager.getTopology().getService(event.getServiceName());
> +                        Cluster cluster =
> service.getCluster(event.getClusterId());
> +                        Member member =
> cluster.getMember(event.getMemberId());
> +                        member.setStatus(MemberStatus.Suspended);
> +                        if (log.isInfoEnabled()) {
> +                            log.info(String.format("Member %s suspended
> in cluster %s of service %s", event.getMemberId(), event.getClusterId(),
> event.getServiceName()));
> +                        }
> +                    }
> +                    finally {
> +                        TopologyManager.releaseWriteLock();
> +                    }
> +                } else if
> (header.getEventClassName().endsWith(MemberTerminatedEvent.class.getName()))
> {
> +                    // Parse complete message and build event
> +                    TopologyEventMessage eventMessage =
> (TopologyEventMessage) jsonToObject(json, TopologyEventMessage.class);
> +                    MemberTerminatedEvent event = (MemberTerminatedEvent)
> jsonToObject(eventMessage.getBody(), MemberTerminatedEvent.class);
> +
> +                    // Validate event against the existing topology
> +                    try {
> +                        TopologyManager.acquireReadLock();
> +
> +                        Service service =
> TopologyManager.getTopology().getService(event.getServiceName());
> +                        if (service == null) {
> +                            throw new
> RuntimeException(String.format("Service %s does not exist",
> event.getServiceName()));
> +                        }
> +                        Cluster cluster =
> service.getCluster(event.getClusterId());
> +                        if (cluster == null) {
> +                            throw new
> RuntimeException(String.format("Cluster %s does not exist",
> event.getClusterId()));
> +                        }
> +                        Member member =
> cluster.getMember(event.getMemberId());
> +                        if (member == null) {
> +                            throw new
> RuntimeException(String.format("Member %s does not exist",
> event.getMemberId()));
> +                        }
> +                        if(member.getStatus() == MemberStatus.Terminated)
> {
> +                            throw new
> RuntimeException(String.format("Member %s of cluster %s of service %s is
> already terminated", event.getMemberId(), event.getClusterId(),
> event.getServiceName()));
> +                        }
> +                    }
> +                    finally {
> +                        TopologyManager.releaseReadLock();
> +                    }
> +
> +                    // Apply changes to the topology
> +                    try {
> +                        TopologyManager.acquireWriteLock();
> +
> +                        Service service =
> TopologyManager.getTopology().getService(event.getServiceName());
> +                        Cluster cluster =
> service.getCluster(event.getClusterId());
> +                        Member member =
> cluster.getMember(event.getMemberId());
> +                        member.setStatus(MemberStatus.Terminated);
> +                        if (log.isInfoEnabled()) {
> +                            log.info(String.format("Member %s terminated
> in cluster %s of service %s", event.getMemberId(), event.getClusterId(),
> event.getServiceName()));
> +                        }
> +                    }
> +                    finally {
> +                        TopologyManager.releaseWriteLock();
> +                    }
> +                }
> +
> +            } catch (Exception e) {
> +                e.printStackTrace();
> +            }
> +        }
> +    }
> +}
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/bff99291/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/topology/TopologyEventMessageReceiver.java
> ----------------------------------------------------------------------
> diff --git
> a/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/topology/TopologyEventMessageReceiver.java
> b/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/topology/TopologyEventMessageReceiver.java
> new file mode 100644
> index 0000000..8040314
> --- /dev/null
> +++
> b/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/topology/TopologyEventMessageReceiver.java
> @@ -0,0 +1,50 @@
> +/*
> + * Licensed to the Apache Software Foundation (ASF) under one
> + * or more contributor license agreements.  See the NOTICE file
> + * distributed with this work for additional information
> + * regarding copyright ownership.  The ASF licenses this file
> + * to you under the Apache License, Version 2.0 (the
> + * "License"); you may not use this file except in compliance
> + * with the License.  You may obtain a copy of the License at
> + *
> + *     http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing,
> + * software distributed under the License is distributed on an
> + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
> + * KIND, either express or implied.  See the License for the
> + * specific language governing permissions and limitations
> + * under the License.
> + */
> +package org.apache.stratos.lb.endpoint.topology;
> +
> +import javax.jms.JMSException;
> +import javax.jms.Message;
> +import javax.jms.MessageListener;
> +import javax.jms.TextMessage;
> +
> +import org.apache.stratos.lb.endpoint.LoadBalancerContext;
> +import org.apache.commons.logging.Log;
> +import org.apache.commons.logging.LogFactory;
> +
> +public class TopologyEventMessageReceiver implements MessageListener {
> +
> +    private static final Log log =
> LogFactory.getLog(TopologyEventMessageReceiver.class);
> +
> +    @Override
> +    public void onMessage(Message message) {
> +        if (message instanceof TextMessage) {
> +            TextMessage receivedMessage = (TextMessage) message;
> +            try {
> +                if (log.isDebugEnabled()) {
> +                    log.debug("Message received: " + ((TextMessage)
> message).getText());
> +                }
> +                // Add received message to the queue
> +
>  TopologyEventQueue.getInstance().add(receivedMessage.getText());
> +
> +            } catch (JMSException e) {
> +                log.error(e.getMessage(), e);
> +            }
> +        }
> +    }
> +}
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/bff99291/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/topology/TopologyEventQueue.java
> ----------------------------------------------------------------------
> diff --git
> a/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/topology/TopologyEventQueue.java
> b/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/topology/TopologyEventQueue.java
> new file mode 100644
> index 0000000..c2cebff
> --- /dev/null
> +++
> b/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/topology/TopologyEventQueue.java
> @@ -0,0 +1,44 @@
> +/*
> + * Licensed to the Apache Software Foundation (ASF) under one
> + * or more contributor license agreements.  See the NOTICE file
> + * distributed with this work for additional information
> + * regarding copyright ownership.  The ASF licenses this file
> + * to you under the Apache License, Version 2.0 (the
> + * "License"); you may not use this file except in compliance
> + * with the License.  You may obtain a copy of the License at
> + *
> + *     http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing,
> + * software distributed under the License is distributed on an
> + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
> + * KIND, either express or implied.  See the License for the
> + * specific language governing permissions and limitations
> + * under the License.
> + */
> +
> +package org.apache.stratos.lb.endpoint.topology;
> +
> +import java.util.concurrent.BlockingQueue;
> +import java.util.concurrent.LinkedBlockingQueue;
> +
> +/**
> + * Implements topology event queue.
> + */
> +public class TopologyEventQueue extends LinkedBlockingQueue<String>{
> +    private static volatile TopologyEventQueue instance;
> +
> +    private TopologyEventQueue(){
> +    }
> +
> +    public static synchronized TopologyEventQueue getInstance() {
> +        if (instance == null) {
> +            synchronized (TopologyEventQueue.class){
> +                if (instance == null) {
> +                    instance = new TopologyEventQueue ();
> +                }
> +            }
> +        }
> +        return instance;
> +    }
> +}
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/bff99291/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/topology/TopologyManager.java
> ----------------------------------------------------------------------
> diff --git
> a/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/topology/TopologyManager.java
> b/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/topology/TopologyManager.java
> new file mode 100644
> index 0000000..a27df39
> --- /dev/null
> +++
> b/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/topology/TopologyManager.java
> @@ -0,0 +1,57 @@
> +/*
> + * Licensed to the Apache Software Foundation (ASF) under one
> + * or more contributor license agreements.  See the NOTICE file
> + * distributed with this work for additional information
> + * regarding copyright ownership.  The ASF licenses this file
> + * to you under the Apache License, Version 2.0 (the
> + * "License"); you may not use this file except in compliance
> + * with the License.  You may obtain a copy of the License at
> + *
> + *     http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing,
> + * software distributed under the License is distributed on an
> + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
> + * KIND, either express or implied.  See the License for the
> + * specific language governing permissions and limitations
> + * under the License.
> + */
> +
> +package org.apache.stratos.lb.endpoint.topology;
> +
> +import org.apache.stratos.messaging.domain.topology.Topology;
> +import java.util.concurrent.locks.ReentrantReadWriteLock;
> +
> +public class TopologyManager {
> +    private static volatile Topology topology;
> +    private static volatile ReentrantReadWriteLock lock = new
> ReentrantReadWriteLock();
> +    private static volatile ReentrantReadWriteLock.ReadLock readLock =
> lock.readLock();
> +    private static volatile ReentrantReadWriteLock.WriteLock writeLock =
> lock.writeLock();
> +
> +    public static void acquireReadLock() {
> +        readLock.lock();
> +    }
> +
> +    public static void releaseReadLock() {
> +        readLock.unlock();
> +    }
> +
> +    public static void acquireWriteLock() {
> +        writeLock.lock();
> +    }
> +
> +    public static void releaseWriteLock() {
> +        writeLock.unlock();
> +    }
> +
> +    public static synchronized Topology getTopology() {
> +        if (topology == null) {
> +            synchronized (TopologyManager.class){
> +                if (topology == null) {
> +                    topology = new Topology();
> +                }
> +            }
> +        }
> +        return topology;
> +    }
> +}
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/bff99291/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/util/Transformer.java
> ----------------------------------------------------------------------
> diff --git
> a/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/util/Transformer.java
> b/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/util/Transformer.java
> new file mode 100644
> index 0000000..3d19b50
> --- /dev/null
> +++
> b/components/org.apache.stratos.lb.endpoint/src/main/java/org/apache/stratos/lb/endpoint/util/Transformer.java
> @@ -0,0 +1,41 @@
> +/*
> + * Licensed to the Apache Software Foundation (ASF) under one
> + * or more contributor license agreements.  See the NOTICE file
> + * distributed with this work for additional information
> + * regarding copyright ownership.  The ASF licenses this file
> + * to you under the Apache License, Version 2.0 (the
> + * "License"); you may not use this file except in compliance
> + * with the License.  You may obtain a copy of the License at
> + *
> + *     http://www.apache.org/licenses/LICENSE-2.0
> + *
> + * Unless required by applicable law or agreed to in writing,
> + * software distributed under the License is distributed on an
> + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
> + * KIND, either express or implied.  See the License for the
> + * specific language governing permissions and limitations
> + * under the License.
> + */
> +
> +package org.apache.stratos.lb.endpoint.util;
> +
> +import org.apache.axis2.clustering.Member;
> +import org.apache.stratos.messaging.domain.topology.Port;
> +
> +/**
> + * Implements domain model transformation logic.
> + */
> +public class Transformer {
> +    public static Member
> transform(org.apache.stratos.messaging.domain.topology.Member
> topologyMember) {
> +        Port httpPort = topologyMember.getPort("HTTP");
> +        Port httpsPort = topologyMember.getPort("HTTPS");
> +
> +        Member member = new Member(topologyMember.getHostName(),
> httpPort.getValue());
> +        member.setDomain(topologyMember.getHostName());
> +        member.setHttpPort(httpPort.getValue());
> +        member.setHttpsPort(httpsPort.getValue());
> +        member.setActive(topologyMember.isActive());
> +        member.setProperties(topologyMember.getProperties());
> +        return  member;
> +    }
> +}
>
>
> http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/bff99291/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Service.java
> ----------------------------------------------------------------------
> diff --git
> a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Service.java
> b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Service.java
> index 471c91a..2e838e9 100644
> ---
> a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Service.java
> +++
> b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Service.java
> @@ -43,7 +43,7 @@ public class Service {
>          this.serviceName = serviceName;
>      }
>
> -    public Collection<Cluster> getClusterMap() {
> +    public Collection<Cluster> getClusters() {
>          return clusterMap.values();
>      }
>
>
>
>
>
> --
> Best Regards,
> Nirmal
>
> Nirmal Fernando.
> PPMC Member & Committer of Apache Stratos,
> Senior Software Engineer, WSO2 Inc.
>
> Blog: http://nirmalfdo.blogspot.com/
>



-- 
Best Regards,
Nirmal

Nirmal Fernando.
PPMC Member & Committer of Apache Stratos,
Senior Software Engineer, WSO2 Inc.

Blog: http://nirmalfdo.blogspot.com/