You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by im...@apache.org on 2015/03/03 14:14:21 UTC
[3/3] stratos git commit: Introducing load balancer topology provider
model to be able to provide the topology required for load balancing in a
generic manner
Introducing load balancer topology provider model to be able to provide the topology required for load balancing in a generic manner
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/c799abce
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/c799abce
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/c799abce
Branch: refs/heads/master
Commit: c799abceb9e8b0086bc2603cc36377972aec79ae
Parents: d765594
Author: Imesh Gunaratne <im...@apache.org>
Authored: Tue Mar 3 18:43:57 2015 +0530
Committer: Imesh Gunaratne <im...@apache.org>
Committed: Tue Mar 3 18:44:15 2015 +0530
----------------------------------------------------------------------
.../load/balancer/common/domain/Cluster.java | 159 +++++++
.../load/balancer/common/domain/Member.java | 87 ++++
.../load/balancer/common/domain/Port.java | 53 +++
.../load/balancer/common/domain/Service.java | 73 +++
.../load/balancer/common/domain/Topology.java | 56 +++
...dBalancerApplicationSignUpEventReceiver.java | 74 +++
.../LoadBalancerDomainMappingEventReceiver.java | 86 ++++
.../LoadBalancerTopologyEventReceiver.java | 354 ++++++++++++++
.../LoadBalancerStatisticsReader.java | 11 +-
.../common/topology/TopologyProvider.java | 175 +++++++
.../algorithm/LoadBalanceAlgorithm.java | 2 +-
.../load/balancer/algorithm/RoundRobin.java | 4 +-
.../conf/LoadBalancerConfiguration.java | 80 ++--
.../balancer/context/LoadBalancerContext.java | 51 +-
.../context/LoadBalancerContextUtil.java | 475 -------------------
.../context/map/ClusterIdClusterMap.java | 57 ---
.../context/map/HostNameAppContextMap.java | 4 +
.../context/map/HostNameClusterMap.java | 63 ---
.../context/map/MultiTenantClusterMap.java | 77 ---
.../map/ServiceNameServiceContextMap.java | 58 ---
.../TenantIdSynapseEnvironmentServiceMap.java | 63 ---
.../balancer/endpoint/RequestDelegator.java | 48 +-
.../TenantAwareLoadBalanceEndpoint.java | 135 +++---
.../internal/LoadBalancerServiceComponent.java | 118 ++---
.../internal/ServiceReferenceHolder.java | 26 +
.../balancer/mediators/LocationReWriter.java | 5 +-
...dBalancerApplicationSignUpEventReceiver.java | 69 ---
.../LoadBalancerDomainMappingEventReceiver.java | 72 ---
.../LoadBalancerTenantEventReceiver.java | 148 ------
.../LoadBalancerTopologyEventReceiver.java | 396 ----------------
.../LoadBalancerStatisticsCollector.java | 15 +-
.../balancer/util/LoadBalancerConstants.java | 1 +
.../test/LoadBalancerConfigurationTest.java | 21 +-
.../stratos/rest/endpoint/mock/MockContext.java | 23 -
34 files changed, 1380 insertions(+), 1759 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/stratos/blob/c799abce/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/domain/Cluster.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/domain/Cluster.java b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/domain/Cluster.java
new file mode 100644
index 0000000..7f2fd9c
--- /dev/null
+++ b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/domain/Cluster.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.load.balancer.common.domain;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Load balancer cluster definition.
+ */
+public class Cluster {
+
+ private static final Log log = LogFactory.getLog(Cluster.class);
+
+ private String serviceName;
+ private String clusterId;
+ private Set<String> hostNames;
+ private String tenantRange;
+ private Map<String, Member> memberMap;
+ private Map<String, String> hostNameToContextPathMap;
+ private String loadBalanceAlgorithmName;
+
+ public Cluster(String serviceName, String clusterId) {
+ this.serviceName = serviceName;
+ this.clusterId = clusterId;
+ this.hostNames = Collections.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
+ this.memberMap = new HashMap<String, Member>();
+ this.hostNameToContextPathMap = new ConcurrentHashMap<String, String>();
+ }
+
+ public String getServiceName() {
+ return serviceName;
+ }
+
+ public String getClusterId() {
+ return clusterId;
+ }
+
+ public Set<String> getHostNames() {
+ return hostNames;
+ }
+
+ public void addHostName(String hostName) {
+ hostNames.add(hostName);
+ }
+
+ public void addHostName(String hostName, String contextPath) {
+ hostNames.add(hostName);
+ hostNameToContextPathMap.put(hostName, contextPath);
+ }
+
+ public void addMember(Member member) {
+ memberMap.put(member.getMemberId(), member);
+ if(log.isInfoEnabled()) {
+ log.info(String.format("Member added to cluster: [cluster] %s [member] %s [hostname] %s [ports] %s",
+ clusterId, member.getMemberId(), member.getHostName(), member.getPorts()));
+ }
+ }
+
+ public void removeMember(String memberId) {
+ Member member = memberMap.get(memberId);
+ if(member == null) {
+ if(log.isWarnEnabled()) {
+ log.warn(String.format("Could not remove member, member not found: [member] %s", memberId));
+ }
+ }
+
+ memberMap.remove(memberId);
+ if(log.isInfoEnabled()) {
+ log.info(String.format("Member removed from cluster: [cluster] %s [member] %s [hostname] %s",
+ clusterId, member.getMemberId(), member.getHostName()));
+ }
+ }
+
+ public Member getMember(String memberId) {
+ return memberMap.get(memberId);
+ }
+
+ public Collection getMembers() {
+ return memberMap.values();
+ }
+
+ public String getTenantRange() {
+ return tenantRange;
+ }
+
+ public void setTenantRange(String tenantRange) {
+ this.tenantRange = tenantRange;
+ }
+
+ /**
+ * Check whether a given tenant id is in tenant range of the cluster.
+ *
+ * @param tenantId
+ * @return
+ */
+ public boolean tenantIdInRange(int tenantId) {
+ if (StringUtils.isEmpty(getTenantRange())) {
+ return false;
+ }
+
+ if ("*".equals(getTenantRange())) {
+ return true;
+ } else {
+ String[] array = getTenantRange().split("-");
+ int tenantStart = Integer.parseInt(array[0]);
+ if (tenantStart <= tenantId) {
+ String tenantEndStr = array[1];
+ if ("*".equals(tenantEndStr)) {
+ return true;
+ } else {
+ int tenantEnd = Integer.parseInt(tenantEndStr);
+ if (tenantId <= tenantEnd) {
+ return true;
+ }
+ }
+ }
+ }
+ return false;
+ }
+
+ public void setLoadBalanceAlgorithmName(String loadBalanceAlgorithmName) {
+ this.loadBalanceAlgorithmName = loadBalanceAlgorithmName;
+ }
+
+ public String getLoadBalanceAlgorithmName() {
+ return loadBalanceAlgorithmName;
+ }
+
+ public String getContextPath(String hostName) {
+ return hostNameToContextPathMap.get(hostName);
+ }
+
+ public void removeHostName(String hostName) {
+ hostNames.remove(hostName);
+ hostNameToContextPathMap.remove(hostName);
+ }
+}
http://git-wip-us.apache.org/repos/asf/stratos/blob/c799abce/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/domain/Member.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/domain/Member.java b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/domain/Member.java
new file mode 100644
index 0000000..dae7e8c
--- /dev/null
+++ b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/domain/Member.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.load.balancer.common.domain;
+
+import java.util.*;
+
+/**
+ * Load balancer member definition.
+ */
+public class Member {
+
+ private String serviceName;
+ private String clusterId;
+ private String memberId;
+ private String hostName;
+ private Map<Integer, Port> portMap;
+
+ public Member(String serviceName, String clusterId, String memberId, String hostName) {
+ this.serviceName = serviceName;
+ this.clusterId = clusterId;
+ this.memberId = memberId;
+ this.hostName = hostName;
+ this.portMap = new HashMap<Integer, Port>();
+ }
+
+ public String getClusterId() {
+ return clusterId;
+ }
+
+ public String getMemberId() {
+ return memberId;
+ }
+
+ public String getHostName() {
+ return hostName;
+ }
+
+ public Port getPort(int proxy) {
+ if(portMap.containsKey(proxy)) {
+ return portMap.get(proxy);
+ }
+ return null;
+ }
+
+ public void addPort(Port port) {
+ this.portMap.put(port.getProxy(), port);
+ }
+
+ public void addPorts(Collection<Port> ports) {
+ for(Port port : ports) {
+ addPort(port);
+ }
+ }
+
+ public void removePort(Port port) {
+ this.portMap.remove(port.getProxy());
+ }
+
+ public boolean portExists(Port port) {
+ return this.portMap.containsKey(port.getProxy());
+ }
+
+ public Collection getPorts() {
+ return portMap.values();
+ }
+
+ public String getServiceName() {
+ return serviceName;
+ }
+}
http://git-wip-us.apache.org/repos/asf/stratos/blob/c799abce/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/domain/Port.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/domain/Port.java b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/domain/Port.java
new file mode 100644
index 0000000..07d1784
--- /dev/null
+++ b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/domain/Port.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.load.balancer.common.domain;
+
+/**
+ * Load balancer port definition.
+ */
+public class Port {
+
+ private String protocol;
+ private int value;
+ private int proxy;
+
+ public Port(String protocol, int value, int proxy) {
+ this.protocol = protocol;
+ this.value = value;
+ this.proxy = proxy;
+ }
+
+ public String getProtocol() {
+ return protocol;
+ }
+
+ public int getValue() {
+ return value;
+ }
+
+ public int getProxy() {
+ return proxy;
+ }
+
+ @Override
+ public String toString() {
+ return "Port [protocol=" + protocol + ", value=" + value + ", proxy=" + proxy + "]";
+ }
+}
http://git-wip-us.apache.org/repos/asf/stratos/blob/c799abce/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/domain/Service.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/domain/Service.java b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/domain/Service.java
new file mode 100644
index 0000000..579035b
--- /dev/null
+++ b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/domain/Service.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.load.balancer.common.domain;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Load balancer topology definition.
+ */
+public class Service {
+ private final String serviceName;
+ private final boolean isMultiTenant;
+ // Key: Cluster.clusterId
+ private Map<String, Cluster> clusterIdClusterMap;
+ private boolean multiTenant;
+
+ public Service(String serviceName, boolean isMultiTenant) {
+ this.serviceName = serviceName;
+ this.clusterIdClusterMap = new ConcurrentHashMap<String, Cluster>();
+ this.isMultiTenant = isMultiTenant;
+ }
+
+ public String getServiceName() {
+ return serviceName;
+ }
+
+ public Collection<Cluster> getClusters() {
+ return clusterIdClusterMap.values();
+ }
+
+ public void addCluster(Cluster cluster) {
+ this.clusterIdClusterMap.put(cluster.getClusterId(), cluster);
+ }
+
+ public void removeCluster(String clusterId) {
+ this.clusterIdClusterMap.remove(clusterId);
+ }
+
+ public boolean clusterExists(String clusterId) {
+ return this.clusterIdClusterMap.containsKey(clusterId);
+ }
+
+ public Cluster getCluster(String clusterId) {
+ return this.clusterIdClusterMap.get(clusterId);
+ }
+
+ public boolean isMultiTenant() {
+ return multiTenant;
+ }
+
+ public void setMultiTenant(boolean multiTenant) {
+ this.multiTenant = multiTenant;
+ }
+}
http://git-wip-us.apache.org/repos/asf/stratos/blob/c799abce/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/domain/Topology.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/domain/Topology.java b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/domain/Topology.java
new file mode 100644
index 0000000..59b598d
--- /dev/null
+++ b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/domain/Topology.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.load.balancer.common.domain;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Load balancer topology definition.
+ */
+public class Topology {
+
+ private Map<String, Service> serviceMap;
+
+ public Topology() {
+ serviceMap = new ConcurrentHashMap<String, Service>();
+ }
+
+ public void addService(Service service) {
+ this.serviceMap.put(service.getServiceName(), service);
+ }
+
+ public void removeService(String serviceName) {
+ this.serviceMap.remove(serviceName);
+ }
+
+ public Service getService(String serviceName) {
+ return this.serviceMap.get(serviceName);
+ }
+
+ public boolean serviceExists(String serviceName) {
+ return this.serviceMap.containsKey(serviceName);
+ }
+
+ public Collection<Service> getServices() {
+ return serviceMap.values();
+ }
+}
http://git-wip-us.apache.org/repos/asf/stratos/blob/c799abce/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/event/receivers/LoadBalancerApplicationSignUpEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/event/receivers/LoadBalancerApplicationSignUpEventReceiver.java b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/event/receivers/LoadBalancerApplicationSignUpEventReceiver.java
new file mode 100644
index 0000000..1f4e26b
--- /dev/null
+++ b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/event/receivers/LoadBalancerApplicationSignUpEventReceiver.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.load.balancer.common.event.receivers;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.load.balancer.common.topology.TopologyProvider;
+import org.apache.stratos.load.balancer.common.domain.Cluster;
+import org.apache.stratos.messaging.domain.application.signup.ApplicationSignUp;
+import org.apache.stratos.messaging.domain.application.signup.DomainMapping;
+import org.apache.stratos.messaging.event.Event;
+import org.apache.stratos.messaging.event.application.signup.CompleteApplicationSignUpsEvent;
+import org.apache.stratos.messaging.listener.application.signup.CompleteApplicationSignUpsEventListener;
+import org.apache.stratos.messaging.message.receiver.application.signup.ApplicationSignUpEventReceiver;
+
+/**
+ * Load balancer application signup event receiver updates the topology in the given topology provider
+ * with the hostnames found in application signup events.
+ */
+public class LoadBalancerApplicationSignUpEventReceiver extends ApplicationSignUpEventReceiver {
+
+ private static final Log log = LogFactory.getLog(LoadBalancerApplicationSignUpEventReceiver.class);
+
+ private TopologyProvider topologyProvider;
+
+ public LoadBalancerApplicationSignUpEventReceiver(TopologyProvider topologyProvider) {
+ this.topologyProvider = topologyProvider;
+ addEventListeners();
+ }
+
+ private void addEventListeners() {
+ addEventListener(new CompleteApplicationSignUpsEventListener() {
+ @Override
+ protected void onEvent(Event event) {
+ if (log.isDebugEnabled()) {
+ log.debug("Complete application signup event received");
+ }
+
+ CompleteApplicationSignUpsEvent completeApplicationSignUpsEvent = (CompleteApplicationSignUpsEvent)event;
+ for(ApplicationSignUp applicationSignUp : completeApplicationSignUpsEvent.getApplicationSignUps()) {
+ if(applicationSignUp.getDomainMappings() != null) {
+ for (DomainMapping domainMapping : applicationSignUp.getDomainMappings()) {
+ if(domainMapping != null) {
+ Cluster cluster = topologyProvider.getClusterByClusterId(domainMapping.getClusterId());
+ if(cluster != null) {
+ cluster.addHostName(domainMapping.getDomainName());
+ log.info(String.format("Domain mapping added: [cluster] %s [domain] %s",
+ cluster.getClusterId(), domainMapping.getDomainName()));
+ }
+ }
+ }
+ }
+ }
+ }
+ });
+ }
+}
http://git-wip-us.apache.org/repos/asf/stratos/blob/c799abce/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/event/receivers/LoadBalancerDomainMappingEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/event/receivers/LoadBalancerDomainMappingEventReceiver.java b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/event/receivers/LoadBalancerDomainMappingEventReceiver.java
new file mode 100644
index 0000000..8634abc
--- /dev/null
+++ b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/event/receivers/LoadBalancerDomainMappingEventReceiver.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.load.balancer.common.event.receivers;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.load.balancer.common.topology.TopologyProvider;
+import org.apache.stratos.load.balancer.common.domain.Cluster;
+import org.apache.stratos.messaging.event.Event;
+import org.apache.stratos.messaging.event.domain.mapping.DomainMappingAddedEvent;
+import org.apache.stratos.messaging.event.domain.mapping.DomainMappingRemovedEvent;
+import org.apache.stratos.messaging.listener.domain.mapping.DomainMappingAddedEventListener;
+import org.apache.stratos.messaging.listener.domain.mapping.DomainMappingRemovedEventListener;
+import org.apache.stratos.messaging.message.receiver.domain.mapping.DomainMappingEventReceiver;
+
+/**
+ * Load balancer domain mapping event receiver updates the topology in the given topology provider
+ * with the domains found in domain mapping events.
+ */
+public class LoadBalancerDomainMappingEventReceiver extends DomainMappingEventReceiver {
+
+ private static final Log log = LogFactory.getLog(LoadBalancerDomainMappingEventReceiver.class);
+
+ private TopologyProvider topologyProvider;
+
+ public LoadBalancerDomainMappingEventReceiver(TopologyProvider topologyProvider) {
+ this.topologyProvider = topologyProvider;
+ addEventListeners();
+ }
+
+ private void addEventListeners() {
+
+ addEventListener(new DomainMappingAddedEventListener() {
+ @Override
+ protected void onEvent(Event event) {
+ DomainMappingAddedEvent domainMappingAddedEvent = (DomainMappingAddedEvent)event;
+
+ String domainName = domainMappingAddedEvent.getDomainName();
+ String contextPath = domainMappingAddedEvent.getContextPath();
+
+ String clusterId = domainMappingAddedEvent.getClusterId();
+ Cluster cluster = topologyProvider.getClusterByClusterId(clusterId);
+ if(cluster == null) {
+ log.warn(String.format("Could not add domain mapping, cluster not found: [cluster] %s", clusterId));
+ }
+
+ cluster.addHostName(domainName, contextPath);
+ log.info(String.format("Domain mapping added: [cluster] %s [domain] %s", clusterId, domainName));
+ }
+ });
+
+ addEventListener(new DomainMappingRemovedEventListener() {
+ @Override
+ protected void onEvent(Event event) {
+ DomainMappingRemovedEvent domainMappingRemovedEvent = (DomainMappingRemovedEvent)event;
+
+ String clusterId = domainMappingRemovedEvent.getClusterId();
+ Cluster cluster = topologyProvider.getClusterByClusterId(clusterId);
+ if(cluster == null) {
+ log.warn(String.format("Could not remove domain mapping, cluster not found: [cluster] %s", clusterId));
+ }
+
+ String domainName = domainMappingRemovedEvent.getDomainName();
+ cluster.removeHostName(domainName);
+ log.info(String.format("Domain mapping removed: [cluster] %s [domain] %s", clusterId, domainName));
+ }
+ });
+ }
+}
http://git-wip-us.apache.org/repos/asf/stratos/blob/c799abce/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/event/receivers/LoadBalancerTopologyEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/event/receivers/LoadBalancerTopologyEventReceiver.java b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/event/receivers/LoadBalancerTopologyEventReceiver.java
new file mode 100644
index 0000000..ee5b5b3
--- /dev/null
+++ b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/event/receivers/LoadBalancerTopologyEventReceiver.java
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.load.balancer.common.event.receivers;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.load.balancer.common.topology.TopologyProvider;
+import org.apache.stratos.messaging.domain.topology.Cluster;
+import org.apache.stratos.messaging.domain.topology.Member;
+import org.apache.stratos.messaging.domain.topology.MemberStatus;
+import org.apache.stratos.messaging.domain.topology.Service;
+import org.apache.stratos.messaging.event.Event;
+import org.apache.stratos.messaging.event.topology.*;
+import org.apache.stratos.messaging.listener.topology.*;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyEventReceiver;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
+
+/**
+ * Load balancer topology receiver updates the topology in the given topology provider
+ * according to topology events.
+ */
+public class LoadBalancerTopologyEventReceiver extends TopologyEventReceiver {
+
+ private static final Log log = LogFactory.getLog(LoadBalancerTopologyEventReceiver.class);
+
+ private TopologyProvider topologyProvider;
+
+ public LoadBalancerTopologyEventReceiver(TopologyProvider topologyProvider) {
+ this.topologyProvider = topologyProvider;
+ addEventListeners();
+ }
+
+ public void execute() {
+ super.execute();
+ if (log.isInfoEnabled()) {
+ log.info("Load balancer topology receiver thread started");
+ }
+ }
+
+ private void addEventListeners() {
+
+ addEventListener(new CompleteTopologyEventListener() {
+ private boolean initialized;
+
+ @Override
+ protected void onEvent(Event event) {
+ if (!initialized) {
+ try {
+ TopologyManager.acquireReadLock();
+ for (Service service : TopologyManager.getTopology().getServices()) {
+ for (Cluster cluster : service.getClusters()) {
+ for (Member member : cluster.getMembers()) {
+ if (member.getStatus() == MemberStatus.Active) {
+ addMember(cluster.getServiceName(), member.getClusterId(), member.getMemberId());
+ }
+ }
+ }
+ }
+ initialized = true;
+ } catch (Exception e) {
+ log.error("Error processing event", e);
+ } finally {
+ TopologyManager.releaseReadLock();
+ }
+ }
+ }
+ });
+
+ addEventListener(new MemberActivatedEventListener() {
+ @Override
+ protected void onEvent(Event event) {
+
+ MemberActivatedEvent memberActivatedEvent = (MemberActivatedEvent) event;
+
+ String serviceName = memberActivatedEvent.getServiceName();
+ String clusterId = memberActivatedEvent.getClusterId();
+ String memberId = memberActivatedEvent.getMemberId();
+
+ try {
+ TopologyManager.acquireReadLockForCluster(serviceName, clusterId);
+ addMember(serviceName, clusterId, memberId);
+ } catch (Exception e) {
+ log.error("Error processing event", e);
+ } finally {
+ TopologyManager.releaseReadLockForCluster(serviceName, clusterId);
+ }
+ }
+ });
+
+ addEventListener(new MemberMaintenanceListener() {
+ @Override
+ protected void onEvent(Event event) {
+
+ MemberMaintenanceModeEvent memberMaintenanceModeEvent = (MemberMaintenanceModeEvent) event;
+
+ String serviceName = memberMaintenanceModeEvent.getServiceName();
+ String clusterId = memberMaintenanceModeEvent.getClusterId();
+ String memberId = memberMaintenanceModeEvent.getMemberId();
+
+ try {
+ TopologyManager.acquireReadLockForCluster(serviceName, clusterId);
+ removeMember(serviceName, clusterId, memberId);
+ } catch (Exception e) {
+ log.error("Error processing event", e);
+ } finally {
+ TopologyManager.releaseReadLockForCluster(serviceName,
+ clusterId);
+ }
+ }
+ });
+
+ addEventListener(new MemberSuspendedEventListener() {
+ @Override
+ protected void onEvent(Event event) {
+
+ MemberSuspendedEvent memberSuspendedEvent = (MemberSuspendedEvent) event;
+
+ String serviceName = memberSuspendedEvent.getServiceName();
+ String clusterId = memberSuspendedEvent.getClusterId();
+ String memberId = memberSuspendedEvent.getMemberId();
+
+ try {
+ TopologyManager.acquireReadLockForCluster(serviceName, clusterId);
+ removeMember(serviceName, clusterId, memberId);
+ } catch (Exception e) {
+ log.error("Error processing event", e);
+ } finally {
+ TopologyManager.releaseReadLockForCluster(memberSuspendedEvent.getServiceName(),
+ memberSuspendedEvent.getClusterId());
+ }
+ }
+ });
+
+ addEventListener(new MemberTerminatedEventListener() {
+ @Override
+ protected void onEvent(Event event) {
+
+ MemberTerminatedEvent memberTerminatedEvent = (MemberTerminatedEvent) event;
+
+ String serviceName = memberTerminatedEvent.getServiceName();
+ String clusterId = memberTerminatedEvent.getClusterId();
+ String memberId = memberTerminatedEvent.getMemberId();
+
+ try {
+ TopologyManager.acquireReadLockForCluster(serviceName, clusterId);
+ removeMember(serviceName, clusterId, memberId);
+ } catch (Exception e) {
+ log.error("Error processing event", e);
+ } finally {
+ TopologyManager.releaseReadLockForCluster(serviceName, clusterId);
+ }
+ }
+ });
+
+ addEventListener(new ClusterRemovedEventListener() {
+ @Override
+ protected void onEvent(Event event) {
+
+ ClusterRemovedEvent clusterRemovedEvent = (ClusterRemovedEvent) event;
+ String serviceName = clusterRemovedEvent.getServiceName();
+ String clusterId = clusterRemovedEvent.getClusterId();
+
+ try {
+ TopologyManager.acquireReadLockForCluster(serviceName, clusterId);
+
+ Service service = TopologyManager.getTopology().getService(serviceName);
+ if (service == null) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Service not found in topology: [service] %s", serviceName));
+ }
+ return;
+ }
+
+ Cluster cluster = service.getCluster(clusterId);
+ removeCluster(cluster);
+ } catch (Exception e) {
+ log.error("Error processing event", e);
+ } finally {
+ TopologyManager.releaseReadLockForCluster(serviceName, clusterId);
+ }
+ }
+ });
+
+ addEventListener(new ServiceRemovedEventListener() {
+ @Override
+ protected void onEvent(Event event) {
+
+ ServiceRemovedEvent serviceRemovedEvent = (ServiceRemovedEvent) event;
+ String serviceName = serviceRemovedEvent.getServiceName();
+
+ try {
+ TopologyManager.acquireReadLockForService(serviceName);
+
+ Service service = TopologyManager.getTopology().getService(serviceName);
+ if (service == null) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Service not found in topology: [service] %s",
+ serviceName));
+ }
+ return;
+ }
+ for(Cluster cluster : service.getClusters()) {
+ removeCluster(cluster);
+ }
+ } catch (Exception e) {
+ log.error("Error processing event", e);
+ } finally {
+ TopologyManager.releaseReadLockForService(serviceName);
+ }
+ }
+ });
+ }
+
+ /**
+ * Remove cluster from topology provider
+ * @param cluster
+ */
+ private void removeCluster(Cluster cluster) {
+ for(Member member : cluster.getMembers()) {
+ removeMember(member.getServiceName(), member.getClusterId(), member.getMemberId());
+ }
+ }
+
+ /**
+ * Add member to topology provider
+ * @param serviceName
+ * @param clusterId
+ * @param memberId
+ */
+ private void addMember(String serviceName, String clusterId, String memberId) {
+ Service service = TopologyManager.getTopology().getService(serviceName);
+ if (service == null) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Service not found in topology: [service] %s",
+ serviceName));
+ }
+ return;
+ }
+ Cluster cluster = service.getCluster(clusterId);
+ if (cluster == null) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Cluster not found in topology: [service] %s [cluster] %s",
+ serviceName, clusterId));
+ }
+ return;
+ }
+ validateHostNames(cluster);
+
+ // Add cluster if not exists
+ if(!topologyProvider.clusterExistsByClusterId(cluster.getClusterId())) {
+ topologyProvider.addCluster(transformCluster(cluster));
+ }
+
+ Member member = cluster.getMember(memberId);
+ if (member == null) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Member not found in topology: [service] %s [cluster] %s [member] %s",
+ serviceName, clusterId,
+ memberId));
+ }
+ return;
+ }
+ topologyProvider.addMember(transformMember(member));
+ }
+
+ /**
+ * Remove member from topology provider
+ * @param serviceName
+ * @param clusterId
+ * @param memberId
+ */
+ private void removeMember(String serviceName, String clusterId, String memberId) {
+ Service service = TopologyManager.getTopology().getService(serviceName);
+ if (service == null) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Service not found in topology: [service] %s",
+ serviceName));
+ }
+ return;
+ }
+
+ Cluster cluster = service.getCluster(clusterId);
+ if (cluster == null) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Cluster not found in topology: [service] %s [cluster] %s",
+ serviceName, clusterId));
+ }
+ return;
+ }
+ validateHostNames(cluster);
+
+ Member member = cluster.getMember(memberId);
+ if (member == null) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Member not found in topology: [service] %s [cluster] %s [member] %s",
+ serviceName, clusterId,
+ memberId));
+ }
+ return;
+ }
+
+ if (member != null) {
+ topologyProvider.removeMember(cluster.getClusterId(), member.getMemberId());
+ }
+ }
+
+ private void validateHostNames(Cluster cluster) {
+ if((cluster.getHostNames() == null) || (cluster.getHostNames().size() == 0)) {
+ throw new RuntimeException(String.format("Host names not found in cluster: " +
+ "[cluster] %s", cluster.getClusterId()));
+ }
+ }
+
+ private org.apache.stratos.load.balancer.common.domain.Cluster transformCluster(Cluster messagingCluster) {
+ org.apache.stratos.load.balancer.common.domain.Cluster cluster =
+ new org.apache.stratos.load.balancer.common.domain.Cluster(messagingCluster.getServiceName(),
+ messagingCluster.getClusterId());
+ cluster.setTenantRange(messagingCluster.getTenantRange());
+ if(messagingCluster.getHostNames() != null) {
+ for (String hostName : messagingCluster.getHostNames()) {
+ cluster.addHostName(hostName);
+ }
+ }
+ return cluster;
+ }
+
+ private org.apache.stratos.load.balancer.common.domain.Member transformMember(Member messagingMember) {
+ boolean useMemberPublicIP = Boolean.getBoolean("load.balancer.member.public.ip");
+ String hostName = (useMemberPublicIP) ? messagingMember.getDefaultPublicIP() :
+ messagingMember.getDefaultPrivateIP();
+ org.apache.stratos.load.balancer.common.domain.Member member =
+ new org.apache.stratos.load.balancer.common.domain.Member(messagingMember.getServiceName(),
+ messagingMember.getClusterId(), messagingMember.getMemberId(),
+ hostName);
+ return member;
+ }
+}
http://git-wip-us.apache.org/repos/asf/stratos/blob/c799abce/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/LoadBalancerStatisticsReader.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/LoadBalancerStatisticsReader.java b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/LoadBalancerStatisticsReader.java
index 79386bd..c0016fe 100644
--- a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/LoadBalancerStatisticsReader.java
+++ b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/LoadBalancerStatisticsReader.java
@@ -27,11 +27,18 @@ import org.apache.stratos.messaging.domain.topology.Cluster;
public interface LoadBalancerStatisticsReader {
/**
- * Get in-flight request count of a sliding window configured e.g. Requests in flight of last minute.
+ * Returns in-flight request count of sliding window configured.
* @param clusterId
*/
int getInFlightRequestCount(String clusterId);
- int getServedRequestCount(String clusterId);
+
int getActiveInstancesCount(Cluster cluster);
+
+ /**
+ * Returns the number of requests served since the last time this method was called.
+ * @param clusterId
+ * @return
+ */
+ int getServedRequestCount(String clusterId);
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/c799abce/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/topology/TopologyProvider.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/topology/TopologyProvider.java b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/topology/TopologyProvider.java
new file mode 100644
index 0000000..0eb77f8
--- /dev/null
+++ b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/topology/TopologyProvider.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.load.balancer.common.topology;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.load.balancer.common.domain.Cluster;
+import org.apache.stratos.load.balancer.common.domain.Member;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Load balancer topology provider.
+ */
+public class TopologyProvider {
+
+ private static final Log log = LogFactory.getLog(TopologyProvider.class);
+
+ private Map<String, Cluster> clusterIdToClusterMap;
+ private Map<String, Cluster> hostNameToClusterMap;
+ private Map<String, Map<Integer, Cluster>> hostNameToTenantIdToClusterMap;
+
+ public TopologyProvider() {
+ this.clusterIdToClusterMap = new ConcurrentHashMap<String, Cluster>();
+ this.hostNameToClusterMap = new ConcurrentHashMap<String, Cluster>();
+ this.hostNameToTenantIdToClusterMap = new ConcurrentHashMap<String, Map<Integer, Cluster>>();
+ }
+
+ public void addCluster(Cluster cluster) {
+ if(cluster != null) {
+ clusterIdToClusterMap.put(cluster.getClusterId(), cluster);
+
+ for(String hostName : cluster.getHostNames()) {
+ hostNameToClusterMap.put(hostName, cluster);
+ }
+
+ if((cluster.getHostNames() != null) && (cluster.getHostNames().size() > 0)) {
+ log.info(String.format("Cluster added: [cluster] %s [hostnames] %s", cluster.getClusterId(),
+ cluster.getHostNames()));
+ }
+ }
+ }
+
+ public void removeCluster(String clusterId) {
+ Cluster cluster = getClusterByClusterId(clusterId);
+ if(cluster == null) {
+ log.warn(String.format("Could not remove cluster, cluster not found: [cluster] %s", clusterId));
+ return;
+ }
+
+ for(String hostName : cluster.getHostNames()) {
+ hostNameToClusterMap.remove(hostName);
+ }
+ clusterIdToClusterMap.remove(cluster.getClusterId());
+
+ if((cluster.getHostNames() != null) && (cluster.getHostNames().size() > 0)) {
+ log.info(String.format("Cluster removed: [cluster] %s [hostnames] %s", cluster.getClusterId(),
+ cluster.getHostNames()));
+ }
+ }
+
+ public boolean clusterExistsByClusterId(String clusterId) {
+ return (getClusterByClusterId(clusterId) != null);
+ }
+
+ public boolean clusterExistsByHostName(String hostName) {
+ return (hostNameToClusterMap.containsKey(hostName) || hostNameToTenantIdToClusterMap.containsKey(hostName));
+ }
+
+ public Cluster getClusterByClusterId(String clusterId) {
+ return clusterIdToClusterMap.get(clusterId);
+ }
+
+ public void addCluster(Cluster cluster, int tenantId) {
+ if(cluster != null) {
+ boolean subscribed = false;
+ for(String hostName : cluster.getHostNames()) {
+ Map<Integer, Cluster> tenantIdToClusterMap = hostNameToTenantIdToClusterMap.get(hostName);
+ if(tenantIdToClusterMap == null) {
+ tenantIdToClusterMap = new ConcurrentHashMap<Integer, Cluster>();
+ hostNameToTenantIdToClusterMap.put(hostName, tenantIdToClusterMap);
+ }
+ tenantIdToClusterMap.put(tenantId, cluster);
+ subscribed = true;
+ }
+ if(subscribed) {
+ log.info(String.format("Tenant subscribed to cluster: [tenant] %d [cluster] %s [hostnames] %s",
+ tenantId, cluster.getClusterId(), cluster.getHostNames()));
+ }
+ }
+ }
+
+ public void removeCluster(String clusterId, int tenantId) {
+ Cluster cluster = getClusterByClusterId(clusterId);
+ if(cluster == null) {
+ log.warn(String.format("Could not remove cluster, cluster not found: [cluster] %s", clusterId));
+ }
+
+ for(String hostName : cluster.getHostNames()) {
+ Map<Integer, Cluster> tenantIdToClusterMap = hostNameToTenantIdToClusterMap.get(hostName);
+ if (tenantIdToClusterMap != null) {
+ Cluster cluster_ = tenantIdToClusterMap.get(tenantId);
+ if (cluster_ != null) {
+ tenantIdToClusterMap.remove(tenantId);
+ log.info(String.format("Tenant un-subscribed from cluster: [tenant] %d [cluster] %s [hostnames] %s",
+ tenantId, cluster.getClusterId(), cluster.getHostNames()));
+ }
+ }
+ }
+ }
+
+ public Cluster getClusterByHostName(String hostName) {
+ return hostNameToClusterMap.get(hostName);
+ }
+
+ public Cluster getClusterByHostName(String hostName, int tenantId) {
+ Map<Integer, Cluster> tenantIdToClusterMap = hostNameToTenantIdToClusterMap.get(hostName);
+ if(tenantIdToClusterMap != null) {
+ return tenantIdToClusterMap.get(tenantId);
+ }
+ return null;
+ }
+
+ public void addMember(Member member) {
+ Cluster cluster = getClusterByClusterId(member.getClusterId());
+ if(cluster == null) {
+ log.warn(String.format("Could not add member, cluster not found: [cluster] %s",
+ member.getClusterId()));
+ return;
+ }
+
+ cluster.addMember(member);
+ log.info(String.format("Member added to cluster: [cluster] %s [member] %s",
+ member.getClusterId(), member.getHostName()));
+ }
+
+ public void removeMember(String clusterId, String memberId) {
+ Cluster cluster = getClusterByClusterId(clusterId);
+ if(cluster == null) {
+ log.warn(String.format("Could not remove member, cluster not found: [cluster] %s", clusterId));
+ return;
+ }
+
+ Member member = cluster.getMember(memberId);
+ if(member != null) {
+ cluster.removeMember(memberId);
+ log.info(String.format("Member removed from cluster: [cluster] %s [member] %s",
+ clusterId, member.getHostName()));
+
+ if(cluster.getMembers().size() == 0) {
+ log.info(String.format("No members found in cluster, removing cluster: " +
+ "[cluster] %s", cluster.getClusterId()));
+ removeCluster(cluster.getClusterId());
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/stratos/blob/c799abce/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/algorithm/LoadBalanceAlgorithm.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/algorithm/LoadBalanceAlgorithm.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/algorithm/LoadBalanceAlgorithm.java
index d561eb6..1344c0b 100755
--- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/algorithm/LoadBalanceAlgorithm.java
+++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/algorithm/LoadBalanceAlgorithm.java
@@ -20,7 +20,7 @@
package org.apache.stratos.load.balancer.algorithm;
import org.apache.stratos.load.balancer.context.AlgorithmContext;
-import org.apache.stratos.messaging.domain.topology.Member;
+import org.apache.stratos.load.balancer.common.domain.Member;
import java.util.List;
http://git-wip-us.apache.org/repos/asf/stratos/blob/c799abce/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/algorithm/RoundRobin.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/algorithm/RoundRobin.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/algorithm/RoundRobin.java
index f96e44c..af08f79 100644
--- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/algorithm/RoundRobin.java
+++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/algorithm/RoundRobin.java
@@ -22,7 +22,7 @@ package org.apache.stratos.load.balancer.algorithm;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.stratos.load.balancer.context.AlgorithmContext;
-import org.apache.stratos.messaging.domain.topology.Member;
+import org.apache.stratos.load.balancer.common.domain.Member;
import java.util.List;
import java.util.concurrent.locks.Lock;
@@ -67,7 +67,7 @@ public class RoundRobin implements LoadBalanceAlgorithm {
currentMemberIndex++;
}
index--;
- } while ((!current.isActive()) && index > 0);
+ } while (index > 0);
algorithmContext.setCurrentMemberIndex(currentMemberIndex);
} finally {
lock.unlock();
http://git-wip-us.apache.org/repos/asf/stratos/blob/c799abce/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/LoadBalancerConfiguration.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/LoadBalancerConfiguration.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/LoadBalancerConfiguration.java
index 9e86a6e..658ce54 100644
--- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/LoadBalancerConfiguration.java
+++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/LoadBalancerConfiguration.java
@@ -23,6 +23,10 @@ import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.stratos.common.constants.StratosConstants;
+import org.apache.stratos.load.balancer.common.domain.Cluster;
+import org.apache.stratos.load.balancer.common.domain.Member;
+import org.apache.stratos.load.balancer.common.domain.Port;
+import org.apache.stratos.load.balancer.common.topology.TopologyProvider;
import org.apache.stratos.load.balancer.conf.domain.Algorithm;
import org.apache.stratos.load.balancer.conf.domain.MemberIpType;
import org.apache.stratos.load.balancer.conf.domain.TenantIdentifier;
@@ -30,10 +34,7 @@ import org.apache.stratos.load.balancer.conf.structure.Node;
import org.apache.stratos.load.balancer.conf.structure.NodeBuilder;
import org.apache.stratos.load.balancer.conf.util.Constants;
import org.apache.stratos.load.balancer.context.LoadBalancerContext;
-import org.apache.stratos.load.balancer.context.LoadBalancerContextUtil;
import org.apache.stratos.load.balancer.exception.InvalidConfigurationException;
-import org.apache.stratos.messaging.domain.topology.*;
-import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
import java.io.File;
import java.util.*;
@@ -66,6 +67,7 @@ public class LoadBalancerConfiguration {
private String networkPartitionId;
private boolean reWriteLocationHeader;
private boolean domainMappingEnabled;
+ private TopologyProvider topologyProvider;
/**
* Load balancer configuration is singleton.
@@ -100,8 +102,6 @@ public class LoadBalancerConfiguration {
instance = null;
// Clear load balancer context
LoadBalancerContext.getInstance().clear();
- // Clear topology
- TopologyManager.getTopology().clear();
}
}
@@ -269,6 +269,14 @@ public class LoadBalancerConfiguration {
this.domainMappingEnabled = domainMappingEnabled;
}
+ public void setTopologyProvider(TopologyProvider topologyProvider) {
+ this.topologyProvider = topologyProvider;
+ }
+
+ public TopologyProvider getTopologyProvider() {
+ return topologyProvider;
+ }
+
private static class LoadBalancerConfigurationReader {
public LoadBalancerConfiguration readFromFile() {
@@ -323,7 +331,7 @@ public class LoadBalancerConfiguration {
} else {
// Endpoint timeout is not found, set default value
configuration.setEndpointTimeout(Constants.DEFAULT_ENDPOINT_TIMEOUT);
- if(log.isWarnEnabled()) {
+ if (log.isWarnEnabled()) {
log.warn(String.format("Endpoint timeout not found, using default: %d", configuration.getEndpointTimeout()));
}
}
@@ -334,7 +342,7 @@ public class LoadBalancerConfiguration {
} else {
// Session timeout is not found, set default value
configuration.setSessionTimeout(Constants.DEFAULT_SESSION_TIMEOUT);
- if(log.isWarnEnabled()) {
+ if (log.isWarnEnabled()) {
log.warn(String.format("Session timeout not found, using default: %d", configuration.getSessionTimeout()));
}
}
@@ -343,8 +351,8 @@ public class LoadBalancerConfiguration {
validateRequiredPropertyInNode(Constants.CONF_PROPERTY_TOPOLOGY_EVENT_LISTENER, topologyEventListenerEnabled, Constants.CONF_ELEMENT_LOADBALANCER);
configuration.setTopologyEventListenerEnabled(Boolean.parseBoolean(topologyEventListenerEnabled));
- if(configuration.isTopologyEventListenerEnabled()) {
- String topologyMemberIpType = loadBalancerNode.getProperty(Constants.CONF_PROPERTY_TOPOLOGY_MEMBER_IP_TYPE);
+ if (configuration.isTopologyEventListenerEnabled()) {
+ String topologyMemberIpType = loadBalancerNode.getProperty(Constants.CONF_PROPERTY_TOPOLOGY_MEMBER_IP_TYPE);
validateRequiredPropertyInNode(Constants.CONF_PROPERTY_TOPOLOGY_MEMBER_IP_TYPE, topologyMemberIpType, Constants.CONF_ELEMENT_LOADBALANCER);
configuration.setTopologyMemberIpType(transformMemberIpType(topologyMemberIpType));
}
@@ -366,7 +374,7 @@ public class LoadBalancerConfiguration {
}
String clusterFilter = loadBalancerNode.getProperty(Constants.CONF_PROPERTY_TOPOLOGY_CLUSTER_FILTER);
if (StringUtils.isNotBlank(clusterFilter)) {
- log.info("Cluster filter::: " +clusterFilter);
+ log.info("Cluster filter::: " + clusterFilter);
configuration.setTopologyClusterFilter(clusterFilter);
}
String memberFilter = loadBalancerNode.getProperty(Constants.CONF_PROPERTY_TOPOLOGY_MEMBER_FILTER);
@@ -410,11 +418,11 @@ public class LoadBalancerConfiguration {
throw new InvalidConfigurationException(String.format("Invalid tenant identifier regular expression: %s", tenantIdentifierRegex), e);
}
List<String> regexList = new ArrayList<String>();
- if(tenantIdentifierRegex.contains(StratosConstants.FILTER_VALUE_SEPARATOR)) {
+ if (tenantIdentifierRegex.contains(StratosConstants.FILTER_VALUE_SEPARATOR)) {
String[] regexArray;
regexArray = tenantIdentifierRegex.split(StratosConstants.FILTER_VALUE_SEPARATOR);
- for(String regex: regexArray) {
- regexList.add(regex);
+ for (String regex : regexArray) {
+ regexList.add(regex);
}
} else {
regexList.add(tenantIdentifierRegex);
@@ -433,12 +441,12 @@ public class LoadBalancerConfiguration {
}
String rewriteLocationHeader = loadBalancerNode.getProperty(Constants.CONF_PROPERTY_REWRITE_LOCATION_HEADER);
- if(StringUtils.isNotEmpty(rewriteLocationHeader)) {
+ if (StringUtils.isNotEmpty(rewriteLocationHeader)) {
configuration.setRewriteLocationHeader(Boolean.parseBoolean(topologyEventListenerEnabled));
}
String mapDomainNames = loadBalancerNode.getProperty(Constants.CONF_PROPERTY_MAP_DOMAIN_NAMES);
- if(StringUtils.isNotEmpty(mapDomainNames)) {
+ if (StringUtils.isNotEmpty(mapDomainNames)) {
configuration.setDomainMappingEnabled(Boolean.parseBoolean(mapDomainNames));
}
@@ -446,24 +454,26 @@ public class LoadBalancerConfiguration {
Node servicesNode = loadBalancerNode.findChildNodeByName(Constants.CONF_ELEMENT_SERVICES);
validateRequiredNode(servicesNode, Constants.CONF_ELEMENT_SERVICES);
+ TopologyProvider topologyProvider = new TopologyProvider();
for (Node serviceNode : servicesNode.getChildNodes()) {
- ServiceType serviceType = ServiceType.SingleTenant;
+ boolean isMultiTenant = false;
String multiTenant = serviceNode.getProperty(Constants.CONF_PROPERTY_MULTI_TENANT);
if (StringUtils.isNotBlank(multiTenant) && (Boolean.parseBoolean(multiTenant))) {
- serviceType = ServiceType.MultiTenant;
+ isMultiTenant = true;
}
- Service service = new Service(serviceNode.getName(), serviceType);
+
+ String serviceName = serviceNode.getName();
Node clustersNode = serviceNode.findChildNodeByName(Constants.CONF_ELEMENT_CLUSTERS);
for (Node clusterNode : clustersNode.getChildNodes()) {
String clusterId = clusterNode.getName();
- Cluster cluster = new Cluster(service.getServiceName(), clusterId, null, null, null);
+ Cluster cluster = new Cluster(serviceName, clusterId);
String tenantRange = clusterNode.getProperty(Constants.CONF_PROPERTY_TENANT_RANGE);
if (StringUtils.isNotBlank(tenantRange)) {
- if (service.getServiceType() != ServiceType.MultiTenant) {
+ if (!isMultiTenant) {
throw new InvalidConfigurationException(String.format("%s property is not valid for non multi-tenant service cluster: [service] %s [cluster] %s",
- Constants.CONF_PROPERTY_TENANT_RANGE, service.getServiceName(), cluster.getClusterId()));
+ Constants.CONF_PROPERTY_TENANT_RANGE, serviceName, cluster.getClusterId()));
}
cluster.setTenantRange(tenantRange);
}
@@ -488,14 +498,10 @@ public class LoadBalancerConfiguration {
String memberId = memberNode.getName();
// we are making it as 1 because we are not using this for static loadbalancer configuration
long initTime = -1;
- Member member = new Member(cluster.getServiceName(), cluster.getClusterId(), memberId,
- memberId, Constants.STATIC_NETWORK_PARTITION, Constants.STATIC_PARTITION, initTime);
String ip = memberNode.getProperty(Constants.CONF_PROPERTY_IP);
validateRequiredPropertyInNode(Constants.CONF_PROPERTY_IP, ip, String.format("member %s", memberId));
- List<String> memberPrivateIPs = new ArrayList<String>();
- memberPrivateIPs.add(ip);
- member.setMemberPrivateIPs(memberPrivateIPs);
- member.setDefaultPrivateIP(ip);
+ Member member = new Member(cluster.getServiceName(), cluster.getClusterId(), memberId, ip);
+
Node portsNode = memberNode.findChildNodeByName(Constants.CONF_ELEMENT_PORTS);
validateRequiredNode(portsNode, Constants.CONF_ELEMENT_PORTS, String.format("member %s", memberId));
@@ -509,29 +515,13 @@ public class LoadBalancerConfiguration {
Port port = new Port(portNode.getName(), Integer.valueOf(value), Integer.valueOf(proxy));
member.addPort(port);
}
- member.setStatus(MemberStatus.Active);
cluster.addMember(member);
}
// Add cluster to service
- service.addCluster(cluster);
-
- // Add service to topology manager if not exists
- try {
- // TODO - fix properly!
- // this lock is not needed since, this Topology is not shared. This is
- // used by LB only
- //TopologyManager.acquireWriteLock();
- if (!TopologyManager.getTopology().serviceExists(service.getServiceName())) {
- TopologyManager.getTopology().addService(service);
- }
- } finally {
- //TopologyManager.releaseWriteLock();
- }
-
- // Add cluster to load balancer context
- LoadBalancerContextUtil.addClusterAgainstHostNames(cluster);
+ topologyProvider.addCluster(cluster);
}
}
+ configuration.setTopologyProvider(topologyProvider);
}
return configuration;
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/c799abce/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/context/LoadBalancerContext.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/context/LoadBalancerContext.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/context/LoadBalancerContext.java
index 895d0d9..0eec59e 100644
--- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/context/LoadBalancerContext.java
+++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/context/LoadBalancerContext.java
@@ -32,41 +32,25 @@ public class LoadBalancerContext {
private static volatile LoadBalancerContext instance;
- // Following map is updated by the service component.
- // Map<TenantId, SynapseEnvironmentService>
- private TenantIdSynapseEnvironmentServiceMap tenantIdSynapseEnvironmentServiceMap;
-
- // Following maps are updated on demand by the request delegator.
- // Map<ServiceName, ServiceContext>
- private ServiceNameServiceContextMap serviceNameServiceContextMap;
// Map<ClusterId, ClusterContext>
private ClusterIdClusterContextMap clusterIdClusterContextMap;
// Following maps are updated by load balancer topology & tenant receivers.
// Map<ClusterId, Cluster>
// Keep track of all clusters
- private ClusterIdClusterMap clusterIdClusterMap;
// Map<Host/Domain-Name, Cluster>
// Keep tack of all clusters
- private HostNameClusterMap hostNameClusterMap;
+
// Map<Host/Domain-Name, AppContext>
private HostNameAppContextMap hostNameAppContextMap;
// Map<HostName, Map<TenantId, Cluster>>
- // Keep track of multi-tenant service clusters
- private MultiTenantClusterMap multiTenantClusterMap;
// Map<MemberIp, Hostname>
// Keep track of cluster hostnames of of all members against their ip addresses
private MemberIpHostnameMap memberIpHostnameMap;
- private boolean clustered;
private LoadBalancerContext() {
- tenantIdSynapseEnvironmentServiceMap = new TenantIdSynapseEnvironmentServiceMap();
- serviceNameServiceContextMap = new ServiceNameServiceContextMap();
clusterIdClusterContextMap = new ClusterIdClusterContextMap();
- clusterIdClusterMap = new ClusterIdClusterMap();
- hostNameClusterMap = new HostNameClusterMap();
hostNameAppContextMap = new HostNameAppContextMap();
- multiTenantClusterMap = new MultiTenantClusterMap();
memberIpHostnameMap = new MemberIpHostnameMap();
}
@@ -82,49 +66,20 @@ public class LoadBalancerContext {
}
public void clear() {
- tenantIdSynapseEnvironmentServiceMap.clear();
- serviceNameServiceContextMap.clear();
clusterIdClusterContextMap.clear();
- multiTenantClusterMap.clear();
- }
-
- public TenantIdSynapseEnvironmentServiceMap getTenantIdSynapseEnvironmentServiceMap() {
- return tenantIdSynapseEnvironmentServiceMap;
- }
-
- public ServiceNameServiceContextMap getServiceNameServiceContextMap() {
- return serviceNameServiceContextMap;
+ hostNameAppContextMap.clear();
+ memberIpHostnameMap.clear();
}
public ClusterIdClusterContextMap getClusterIdClusterContextMap() {
return clusterIdClusterContextMap;
}
- public ClusterIdClusterMap getClusterIdClusterMap() {
- return clusterIdClusterMap;
- }
-
- public HostNameClusterMap getHostNameClusterMap() {
- return hostNameClusterMap;
- }
-
public HostNameAppContextMap getHostNameContextPathMap() {
return hostNameAppContextMap;
}
- public MultiTenantClusterMap getMultiTenantClusterMap() {
- return multiTenantClusterMap;
- }
-
public MemberIpHostnameMap getMemberIpHostnameMap() {
return memberIpHostnameMap;
}
-
- public boolean isClustered() {
- return clustered;
- }
-
- public void setClustered(boolean clustered) {
- this.clustered = clustered;
- }
}