You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by la...@apache.org on 2013/07/05 09:18:41 UTC
[3/3] git commit: Refactoring cartridge agent and messages component
Refactoring cartridge agent and messages component
Signed-off-by: Lakmal Warusawithana <la...@wso2.com>
Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/d6b584f6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/d6b584f6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/d6b584f6
Branch: refs/heads/master
Commit: d6b584f66e443ac3c2dcfa551c6b43c92cee8d54
Parents: abe070e
Author: Udara Liyanage <ud...@wso2.com>
Authored: Fri Jul 5 10:53:26 2013 +0530
Committer: Lakmal Warusawithana <la...@wso2.com>
Committed: Fri Jul 5 12:46:07 2013 +0530
----------------------------------------------------------------------
.../2.1.3/pom.xml | 128 +++++++++
.../cartridge/agent/ClusteringClient.java | 270 +++++++++++++++++++
.../cartridge/agent/ELBMembershipListener.java | 83 ++++++
.../InstanceStateNotificationClientThread.java | 63 +++++
.../apache/stratos/cartridge/agent/Main.java | 158 +++++++++++
.../agent/RegistrantMembershipListener.java | 71 +++++
.../exception/CartridgeAgentException.java | 41 +++
.../agent/internal/CartridgeAgentConstants.java | 28 ++
.../CartridgeAgentServiceComponent.java | 142 ++++++++++
.../cartridge/agent/internal/DataHolder.java | 45 ++++
.../cartridge/agent/registrant/PortMapping.java | 90 +++++++
.../cartridge/agent/registrant/Registrant.java | 199 ++++++++++++++
.../agent/registrant/RegistrantDatabase.java | 143 ++++++++++
.../registrant/RegistrantHealthChecker.java | 115 ++++++++
.../agent/registrant/RegistrantUtil.java | 177 ++++++++++++
.../agent/service/CartridgeAgentService.java | 68 +++++
.../src/main/resources/META-INF/services.xml | 37 +++
.../2.1.1/pom.xml | 78 ++++++
.../cartridge/messages/ClusterDomain.java | 117 ++++++++
.../messages/ClusterDomainManager.java | 35 +++
.../messages/CreateClusterDomainMessage.java | 129 +++++++++
.../CreateRemoveClusterDomainMessage.java | 77 ++++++
components/pom.xml | 5 +-
.../2.1.3/pom.xml | 128 ---------
.../cartridge/agent/ClusteringClient.java | 270 -------------------
.../cartridge/agent/ELBMembershipListener.java | 83 ------
.../InstanceStateNotificationClientThread.java | 63 -----
.../apache/stratos/cartridge/agent/Main.java | 158 -----------
.../agent/RegistrantMembershipListener.java | 71 -----
.../exception/CartridgeAgentException.java | 41 ---
.../agent/internal/CartridgeAgentConstants.java | 28 --
.../CartridgeAgentServiceComponent.java | 142 ----------
.../cartridge/agent/internal/DataHolder.java | 45 ----
.../cartridge/agent/registrant/PortMapping.java | 90 -------
.../cartridge/agent/registrant/Registrant.java | 199 --------------
.../agent/registrant/RegistrantDatabase.java | 143 ----------
.../registrant/RegistrantHealthChecker.java | 115 --------
.../agent/registrant/RegistrantUtil.java | 177 ------------
.../agent/service/CartridgeAgentService.java | 68 -----
.../src/main/resources/META-INF/services.xml | 37 ---
.../2.1.1/pom.xml | 78 ------
.../cartridge/messages/ClusterDomain.java | 117 --------
.../messages/ClusterDomainManager.java | 35 ---
.../messages/CreateClusterDomainMessage.java | 129 ---------
.../CreateRemoveClusterDomainMessage.java | 77 ------
components/stratos/cartridge-agent/pom.xml | 124 ---------
46 files changed, 2297 insertions(+), 2420 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/d6b584f6/components/org.apache.stratos.cartridge.agent/2.1.3/pom.xml
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cartridge.agent/2.1.3/pom.xml b/components/org.apache.stratos.cartridge.agent/2.1.3/pom.xml
new file mode 100644
index 0000000..d8b8de9
--- /dev/null
+++ b/components/org.apache.stratos.cartridge.agent/2.1.3/pom.xml
@@ -0,0 +1,128 @@
+<?xml version="1.0" encoding="utf-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing,
+ ~ software distributed under the License is distributed on an
+ ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ~ KIND, either express or implied. See the License for the
+ ~ specific language governing permissions and limitations
+ ~ under the License.
+ ~
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <parent>
+ <groupId>org.apache.stratos</groupId>
+ <artifactId>cartridge-agent-parent</artifactId>
+ <version>2.0.0</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+
+ <modelVersion>4.0.0</modelVersion>
+ <artifactId>org.apache.stratos.cartridge.agent</artifactId>
+ <packaging>bundle</packaging>
+ <version>2.1.3</version>
+ <name>Apache Stratos Cartridge Agent - agent</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.stratos</groupId>
+ <artifactId>org.apache.stratos.cartridge.messages</artifactId>
+ <version>2.1.1</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.axis2.wso2</groupId>
+ <artifactId>axis2</artifactId>
+ <version>${axis2.wso2.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.ws.commons.axiom.wso2</groupId>
+ <artifactId>axiom</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>commons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.wso2.carbon</groupId>
+ <artifactId>org.wso2.carbon.adc.instanceinfo.mgt.stub</artifactId>
+ <version>4.1.1</version>
+ </dependency>
+ <dependency>
+ <groupId>org.eclipse.osgi</groupId>
+ <artifactId>org.eclipse.osgi</artifactId>
+ <version>3.5.0.v20090311-1300</version>
+ </dependency>
+ <dependency>
+ <groupId>org.eclipse.osgi</groupId>
+ <artifactId>org.eclipse.osgi.services</artifactId>
+ <version>3.3.0.v20110513</version>
+ </dependency>
+ <dependency>
+ <groupId>org.wso2.carbon</groupId>
+ <artifactId>org.wso2.carbon.utils</artifactId>
+ <version>4.1.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.wso2.carbon</groupId>
+ <artifactId>org.wso2.carbon.adc.instanceinfo.mgt.stub</artifactId>
+ <version>4.1.1</version>
+ </dependency>
+
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.felix</groupId>
+ <artifactId>maven-scr-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.felix</groupId>
+ <artifactId>maven-bundle-plugin</artifactId>
+ <version>1.4.0</version>
+ <extensions>true</extensions>
+
+ <configuration>
+ <instructions>
+ <Bundle-SymbolicName>${project.artifactId}</Bundle-SymbolicName>
+ <Bundle-Name>${project.artifactId}</Bundle-Name>
+ <Private-Package>
+ org.apache.stratos.cartridge.agent.internal.*,
+ !org.apache.stratos.cartridge.agent.service.*
+ </Private-Package>
+ <Export-Package>
+ org.apache.stratos.cartridge.agent.service.*,
+ org.apache.stratos.cartridge.agent.client.*,
+ org.apache.stratos.cartridge.agent.registrant.*,
+ org.apache.stratos.cartridge.agent.exception.*,
+ org.apache.stratos.cartridge.agent.*
+ </Export-Package>
+ <Import-Package>
+ org.apache.axis2.*; version="${axis2.osgi.version.range}",
+ org.apache.axiom.*;
+ version="${axiom.osgi.version.range}",
+ org.apache.neethi.*;
+ version="${neethi.osgi.version.range.service-mgt}",
+ javax.xml.stream.*; version="1.0.1",
+ javax.wsdl.*; version="1.6.2",
+ org.osgi.framework.*,
+ *;resolution:=optional
+ </Import-Package>
+ <DynamicImport-Package>*</DynamicImport-Package>
+ </instructions>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/d6b584f6/components/org.apache.stratos.cartridge.agent/2.1.3/src/main/java/org/apache/stratos/cartridge/agent/ClusteringClient.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cartridge.agent/2.1.3/src/main/java/org/apache/stratos/cartridge/agent/ClusteringClient.java b/components/org.apache.stratos.cartridge.agent/2.1.3/src/main/java/org/apache/stratos/cartridge/agent/ClusteringClient.java
new file mode 100644
index 0000000..b81b542
--- /dev/null
+++ b/components/org.apache.stratos.cartridge.agent/2.1.3/src/main/java/org/apache/stratos/cartridge/agent/ClusteringClient.java
@@ -0,0 +1,270 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.cartridge.agent;
+
+import org.apache.axiom.om.impl.builder.StAXOMBuilder;
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.clustering.*;
+import org.apache.axis2.clustering.tribes.TribesClusteringAgent;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.description.Parameter;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.cartridge.agent.exception.CartridgeAgentException;
+import org.apache.stratos.cartridge.agent.registrant.PortMapping;
+import org.apache.stratos.cartridge.agent.registrant.Registrant;
+import org.apache.stratos.cartridge.agent.registrant.RegistrantDatabase;
+import org.apache.stratos.cartridge.agent.registrant.RegistrantUtil;
+import org.apache.stratos.cartridge.messages.CreateClusterDomainMessage;
+import org.apache.stratos.cartridge.messages.CreateRemoveClusterDomainMessage;
+
+import javax.xml.stream.XMLStreamException;
+import java.io.ByteArrayInputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.Random;
+
+/**
+ * This class is used for all Axis2 clustering related activities such as joining the load balancer
+ * & joining registrants to clustering groups.
+ */
+public class ClusteringClient {
+ private static final Log log = LogFactory.getLog(ClusteringClient.class);
+ public static final Random RANDOM = new Random();
+ private Properties conf;
+ private RegistrantDatabase registrantDatabase;
+ private ClusteringAgent loadBalancerAgent;
+
+ public ClusteringClient(RegistrantDatabase registrantDatabase) {
+ this.registrantDatabase = registrantDatabase;
+ }
+
+ public void init(Properties conf,
+ ConfigurationContext configurationContext,
+ MembershipListener membershipListener) throws CartridgeAgentException {
+ try {
+ this.conf = conf;
+ joinLoadBalancer(configurationContext, membershipListener);
+ } catch (Exception e) {
+ String msg = "Cannot initialize ClusteringClient";
+ log.error(msg, e);
+ throw new CartridgeAgentException(msg, e);
+ }
+ }
+
+ private void joinLoadBalancer(ConfigurationContext configurationContext,
+ MembershipListener membershipListener)
+ throws CartridgeAgentException {
+
+ try {
+ loadBalancerAgent = createClusteringAgent(configurationContext,
+ conf.getProperty("loadBalancerDomain"));
+ List<MembershipListener> membershipListeners = new ArrayList<MembershipListener>();
+ membershipListeners.add(membershipListener);
+ ((TribesClusteringAgent) loadBalancerAgent).setMembershipListeners(membershipListeners);
+ loadBalancerAgent.init();
+ } catch (Exception e) {
+ String msg = "Cannot join LB group";
+ log.error(msg, e);
+ throw new CartridgeAgentException(msg, e);
+ }
+ }
+
+ /**
+ * Join a cluster group of the Elastic Load Balancer
+ *
+ * @param registrant Registrant
+ * @param configurationContext ConfigurationContext
+ * @throws CartridgeAgentException If an error occurs while joining a cluster group
+ */
+ public void joinGroup(Registrant registrant,
+ ConfigurationContext configurationContext) throws CartridgeAgentException {
+ if (registrantDatabase.containsActive(registrant)) {
+ throw new CartridgeAgentException("Active registrant with key " +
+ registrant.getKey() + " already exists");
+ }
+ registrantDatabase.add(registrant);
+
+ if (!RegistrantUtil.isHealthy(registrant)) {
+ String msg = "Couldn't add registrant " + registrant + " due to a health check failure";
+ log.error(msg);
+ return;
+ //throw new CartridgeAgentException(msg);
+ }
+ try {
+ List<ClusteringCommand> clusteringCommands =
+ loadBalancerAgent.sendMessage(new CreateClusterDomainMessage(registrant.getService(),
+ registrant.retrieveClusterDomain(),
+ registrant.getHostName(),
+ registrant.getTenantRange(),
+ registrant.getMinInstanceCount(),
+ registrant.getMaxInstanceCount(),
+ registrant.getMaxRequestsPerSecond(),
+ registrant.getRoundsToAverage(),
+ registrant.getAlarmingUpperRate(),
+ registrant.getAlarmingLowerRate(),
+ registrant.getScaleDownFactor()),
+ true);
+ if (clusteringCommands != null && !clusteringCommands.isEmpty()) {
+ for (ClusteringCommand clusteringCommand : clusteringCommands) {
+ clusteringCommand.execute(configurationContext);
+ }
+ } else {
+ return;
+ }
+
+ } catch (ClusteringFault e) {
+ handleException("Cannot send CreateClusterDomainMessage to ELB", e);
+ }
+ ClusteringAgent agent;
+ try {
+ agent = createClusteringAgent(configurationContext,
+ registrant.retrieveClusterDomain());
+
+ } catch (ClusteringFault e) {
+ handleException("Cannot create ClusteringAgent for registrant", e);
+ return;
+ }
+
+ //Add port.mapping.<port> entries to member
+ /*
+ <parameter name="properties">
+ <property name="port.mapping.8281" value="9768"/>
+ <property name="port.mapping.8244" value="9448"/>
+ </parameter>
+ */
+ Parameter propsParam = new Parameter();
+ propsParam.setName("properties");
+
+ StringBuilder propertiesPayload = new StringBuilder("<parameter name=\"properties\">");
+ int httpPort = -1;
+ int httpsPort = -1;
+ for (PortMapping portMapping : registrant.getPortMappings()) {
+ propertiesPayload.append("<property name=\"portMapping.mapping.").append(portMapping.getProxyPort()).
+ append("\" value=\"").append(portMapping.getPrimaryPort()).append("\" />");
+ if (portMapping.getType().equals(PortMapping.PORT_TYPE_HTTP)) {
+ httpPort = portMapping.getPrimaryPort();
+ } else if (portMapping.getType().equals(PortMapping.PORT_TYPE_HTTPS)) {
+ httpsPort = portMapping.getPrimaryPort();
+ }
+ }
+
+ String remoteHost = registrant.getRemoteHost();
+ propertiesPayload.append("<property name=\"httpPort\" value=\"").append(httpPort).append("\" />");
+ propertiesPayload.append("<property name=\"httpsPort\" value=\"").append(httpsPort).append("\" />");
+ propertiesPayload.append("<property name=\"remoteHost\" value=\"").append(remoteHost).append("\" />");
+ propertiesPayload.append("<property name=\"subDomain\" value=\"__$default\" />");
+ propertiesPayload.append("</parameter>");
+
+ try {
+ StAXOMBuilder builder = new StAXOMBuilder(new ByteArrayInputStream(propertiesPayload.toString().getBytes()));
+ propsParam.setParameterElement(builder.getDocumentElement());
+ agent.addParameter(propsParam);
+ } catch (XMLStreamException e) {
+ handleException("Cannot create properties ClusteringAgent parameter", e);
+ } catch (AxisFault ignored) { // will not occur
+ }
+
+ int newMemberPort = Integer.parseInt(conf.getProperty("clustering.localMemberPort")) +
+ RANDOM.nextInt(5000) + 27;
+ addParameter(agent, "localMemberPort", newMemberPort + "");
+ try {
+ synchronized (registrant) {
+ if(!registrant.running()) {
+ registrant.start(agent);
+ }
+ }
+ } catch (ClusteringFault e) {
+ handleException("Cannot start registrant", e);
+ }
+
+ // Update instance state in stratos database, with active
+ new Thread(new InstanceStateNotificationClientThread(registrant, "ACTIVE")).start();
+ }
+
+
+ public void removeClusterDomain(String domain,
+ String subDomain,
+ String hostName,
+ ConfigurationContext configurationContext) throws CartridgeAgentException {
+ try {
+ List<ClusteringCommand> clusteringCommands =
+ loadBalancerAgent.sendMessage(new CreateRemoveClusterDomainMessage(domain, subDomain, hostName),
+ true);
+ if (clusteringCommands != null && !clusteringCommands.isEmpty()) {
+ for (ClusteringCommand clusteringCommand : clusteringCommands) {
+ clusteringCommand.execute(configurationContext);
+ }
+ } else {
+ return;
+ }
+ } catch (ClusteringFault e) {
+ handleException("Cannot send CreateClusterDomainMessage to ELB", e);
+ }
+ }
+
+ private void handleException(String msg, Exception e) throws CartridgeAgentException {
+ log.error(msg, e);
+ throw new CartridgeAgentException(msg, e);
+ }
+
+ private ClusteringAgent createClusteringAgent(ConfigurationContext configurationContext,
+ String clusterDomain) throws ClusteringFault {
+ TribesClusteringAgent agent = new TribesClusteringAgent();
+ addParameter(agent, "AvoidInitiation", "true");
+ for (String key : conf.stringPropertyNames()) {
+ if (key.startsWith("clustering.")) {
+ addParameter(agent,
+ key.substring(key.indexOf(".") + 1),
+ conf.getProperty(key));
+ }
+ }
+
+ List<Member> members = new ArrayList<Member>();
+ for (int i = 1; i < Integer.MAX_VALUE; i++) {
+ String host = conf.getProperty("members." + i + ".host");
+ String port = conf.getProperty("members." + i + ".port");
+ if (host == null || port == null) {
+ break;
+ }
+ members.add(new Member(host, Integer.parseInt(port)));
+ }
+ agent.setMembers(members);
+
+ addParameter(agent, "domain", clusterDomain);
+ agent.setConfigurationContext(configurationContext);
+
+ List<MembershipListener> membershipListeners = new ArrayList<MembershipListener>();
+ membershipListeners.add(new RegistrantMembershipListener(this, configurationContext));
+ agent.setMembershipListeners(membershipListeners);
+ return agent;
+ }
+
+ private static void addParameter(ClusteringAgent agent,
+ String paramName, String paramValue) {
+ Parameter parameter = new Parameter(paramName, paramValue);
+ try {
+ agent.removeParameter(parameter);
+ agent.addParameter(parameter);
+ } catch (AxisFault ignored) {
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/d6b584f6/components/org.apache.stratos.cartridge.agent/2.1.3/src/main/java/org/apache/stratos/cartridge/agent/ELBMembershipListener.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cartridge.agent/2.1.3/src/main/java/org/apache/stratos/cartridge/agent/ELBMembershipListener.java b/components/org.apache.stratos.cartridge.agent/2.1.3/src/main/java/org/apache/stratos/cartridge/agent/ELBMembershipListener.java
new file mode 100644
index 0000000..ca2cd96
--- /dev/null
+++ b/components/org.apache.stratos.cartridge.agent/2.1.3/src/main/java/org/apache/stratos/cartridge/agent/ELBMembershipListener.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.cartridge.agent;
+
+import org.apache.axis2.clustering.Member;
+import org.apache.axis2.clustering.MembershipListener;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.cartridge.agent.exception.CartridgeAgentException;
+import org.apache.stratos.cartridge.agent.registrant.RegistrantDatabase;
+import org.apache.stratos.cartridge.agent.registrant.RegistrantHealthChecker;
+import org.apache.stratos.cartridge.agent.registrant.RegistrantUtil;
+
+/**
+ * This membership listener will get notified when the Elastic Load Balancer (ELB) joins or
+ * leaves the cluster
+ *
+ * When the ELB leaves the cluster, we have to disconnect all Registrants, and when the ELB rejoins,
+ * we have to reconnect all the Registrants to the ELB
+ */
+public class ELBMembershipListener implements MembershipListener {
+ private static final Log log = LogFactory.getLog(ELBMembershipListener.class);
+
+ private ClusteringClient clusteringClient;
+ private ConfigurationContext configurationContext;
+ private RegistrantDatabase registrantDatabase;
+ private RegistrantHealthChecker healthChecker;
+
+ public ELBMembershipListener(ClusteringClient clusteringClient,
+ ConfigurationContext configurationContext,
+ RegistrantDatabase registrantDatabase,
+ RegistrantHealthChecker healthChecker) {
+ this.clusteringClient = clusteringClient;
+ this.configurationContext = configurationContext;
+ this.registrantDatabase = registrantDatabase;
+ this.healthChecker = healthChecker;
+ }
+
+ public void memberAdded(Member member, boolean b) {
+ log.info("ELB Member [" + member + "] joined cluster");
+ Runnable runnable = new Runnable() {
+ public void run() {
+ try {
+ try {
+ Thread.sleep(5000);
+ } catch (InterruptedException ignored) {
+ }
+ RegistrantUtil.reloadRegistrants(clusteringClient,
+ configurationContext,
+ registrantDatabase);
+ healthChecker.setELBRunning(true);
+ } catch (CartridgeAgentException e) {
+ log.error("Could not reload registrants", e);
+ }
+ }
+ };
+ new Thread(runnable).start();
+ }
+
+ public void memberDisappeared(Member member, boolean b) {
+ log.info("ELB Member [" + member + "] left cluster");
+ healthChecker.setELBRunning(false);
+ registrantDatabase.stopAll();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/d6b584f6/components/org.apache.stratos.cartridge.agent/2.1.3/src/main/java/org/apache/stratos/cartridge/agent/InstanceStateNotificationClientThread.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cartridge.agent/2.1.3/src/main/java/org/apache/stratos/cartridge/agent/InstanceStateNotificationClientThread.java b/components/org.apache.stratos.cartridge.agent/2.1.3/src/main/java/org/apache/stratos/cartridge/agent/InstanceStateNotificationClientThread.java
new file mode 100644
index 0000000..8d3839d
--- /dev/null
+++ b/components/org.apache.stratos.cartridge.agent/2.1.3/src/main/java/org/apache/stratos/cartridge/agent/InstanceStateNotificationClientThread.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.cartridge.agent;
+
+import org.apache.axis2.AxisFault;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.cartridge.agent.registrant.Registrant;
+import org.wso2.carbon.adc.instanceinfo.mgt.stub.InstanceInformationManagementServiceStub;
+
+import java.rmi.RemoteException;
+
+public class InstanceStateNotificationClientThread implements Runnable {
+
+ private Registrant registrant;
+ private String state;
+
+ private static final Log log = LogFactory
+ .getLog(InstanceStateNotificationClientThread.class);
+
+ public InstanceStateNotificationClientThread(Registrant registrant,
+ String state) {
+ this.registrant = registrant;
+ this.state = state;
+ }
+
+ public void run() {
+ try {
+ log.info("Instance State is updated to " + state + " "
+ + registrant.getRemoteHost());
+ String serviceURL = "https://" + System.getProperty("adc.host")
+ + ":" + System.getProperty("adc.port")
+ + "/services/InstanceInformationManagementService";
+ InstanceInformationManagementServiceStub stub = new InstanceInformationManagementServiceStub(
+ serviceURL);
+ stub.updateInstanceState(registrant.getRemoteHost(), 123,
+ registrant.retrieveClusterDomain(), "__$default",
+ registrant.getService(), state);
+ } catch (AxisFault e) {
+ log.warn("Error notifying state " + state + " of registrant " + registrant + " to ADC", e);
+ } catch (RemoteException e) {
+ log.warn("Error notifying state " + state + " of registrant " + registrant + " to ADC", e);
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/d6b584f6/components/org.apache.stratos.cartridge.agent/2.1.3/src/main/java/org/apache/stratos/cartridge/agent/Main.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cartridge.agent/2.1.3/src/main/java/org/apache/stratos/cartridge/agent/Main.java b/components/org.apache.stratos.cartridge.agent/2.1.3/src/main/java/org/apache/stratos/cartridge/agent/Main.java
new file mode 100644
index 0000000..f091e0b
--- /dev/null
+++ b/components/org.apache.stratos.cartridge.agent/2.1.3/src/main/java/org/apache/stratos/cartridge/agent/Main.java
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.cartridge.agent;
+
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.engine.AxisServer;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.cartridge.agent.internal.CartridgeAgentConstants;
+import org.apache.stratos.cartridge.agent.registrant.RegistrantDatabase;
+import org.apache.stratos.cartridge.agent.registrant.RegistrantHealthChecker;
+import org.apache.stratos.cartridge.agent.service.CartridgeAgentService;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.Properties;
+
+/**
+ * The main class which starts up the Cartridge agent
+ */
+//public class Main {
+// private static final Log log = LogFactory.getLog(Main.class);
+// private static RegistrantHealthChecker healthChecker;
+// public static void main(String[] args) {
+// FileInputStream confFileIPStream = null;
+// try {
+// long start = System.currentTimeMillis();
+// log.info("Starting WSO2 Cartridge Agent...");
+// Properties conf = new Properties();
+// confFileIPStream = new FileInputStream("conf" + File.separator + "agent.properties");
+// conf.load(confFileIPStream);
+// RegistrantDatabase registrantDatabase = new RegistrantDatabase();
+// AxisServer axisServer = new AxisServer();
+// ConfigurationContext configurationContext = axisServer.getConfigurationContext();
+// ClusteringClient clusteringClient = new ClusteringClient(registrantDatabase);
+// configurationContext.setProperty(CartridgeAgentConstants.CLUSTERING_CLIENT, clusteringClient);
+// String healthCheckInterval = conf.getProperty("registrant.heathCheckInterval");
+// String threadPoolSize = conf.getProperty("registrant.healthCheckThreadPoolSize");
+// int healthCheckIntervalInt =
+// (healthCheckInterval == null) ? 2000 : Integer.parseInt(healthCheckInterval);
+// int threadPoolSizeInt =
+// (threadPoolSize == null) ? 10 : Integer.parseInt(healthCheckInterval);
+// log.info("Registrant health check interval: " + healthCheckIntervalInt + "s");
+// healthChecker = new RegistrantHealthChecker(registrantDatabase,
+// clusteringClient,
+// configurationContext,
+// healthCheckIntervalInt,
+// threadPoolSizeInt
+// );
+// clusteringClient.init(conf,
+// configurationContext,
+// new ELBMembershipListener(clusteringClient,
+// configurationContext,
+// registrantDatabase,
+// healthChecker));
+// healthChecker.startAll();
+// axisServer.deployService(CartridgeAgentService.class.getName());
+//
+//
+// // Starting cliet..
+// String trustStorePath = conf.getProperty("wso2.carbon.truststore");
+// System.setProperty("javax.net.ssl.trustStore", trustStorePath);
+// System.setProperty("javax.net.ssl.trustStorePassword", "wso2carbon");
+// // new InstanceStateNotificationClient().notify(null, null);
+//
+// System.setProperty("adc.host", conf.getProperty("adc.host"));
+// System.setProperty("adc.port", conf.getProperty("adc.port"));
+// // ----------------------
+//
+// writePID(".");
+// Runtime.getRuntime().addShutdownHook(new Thread(){
+//
+// @Override
+// public void run() {
+// log.info("Shutting down WSO2 Cartridge Agent...");
+// }
+// });
+// log.info("Started Cartridge Agent in " + (System.currentTimeMillis() - start) + "ms");
+// } catch (Exception e) {
+// log.fatal("Could not start Cartridge Agent", e);
+// System.exit(1);
+// } finally {
+// if (confFileIPStream != null) {
+// try {
+// confFileIPStream.close();
+// } catch (IOException e) {
+// log.error("Cannot close agent.properties file", e);
+// }
+// }
+// }
+// }
+//
+// public static RegistrantHealthChecker getHealthChecker(){
+// return healthChecker;
+// }
+// /**
+// * Write the process ID of this process to the file
+// *
+// * @param carbonHome carbon.home sys property value.
+// */
+// private static void writePID(String carbonHome) {
+// byte[] bo = new byte[100];
+// String[] cmd = {"bash", "-c", "echo $PPID"};
+// Process p;
+// try {
+// p = Runtime.getRuntime().exec(cmd);
+// } catch (IOException e) {
+// //ignored. We might be invoking this on a Window platform. Therefore if an error occurs
+// //we simply ignore the error.
+// return;
+// }
+//
+// try {
+// int bytes = p.getInputStream().read(bo);
+// } catch (IOException e) {
+// log.error(e.getMessage(), e);
+// }
+//
+// String pid = new String(bo);
+// if (pid.length() != 0) {
+// BufferedWriter out = null;
+// try {
+// FileWriter writer = new FileWriter(carbonHome + File.separator + "wso2carbon.pid");
+// out = new BufferedWriter(writer);
+// out.write(pid);
+// } catch (IOException e) {
+// log.warn("Cannot write wso2carbon.pid file");
+// } finally {
+// if (out != null) {
+// try {
+// out.close();
+// } catch (IOException ignored) {
+// }
+// }
+// }
+// }
+// }
+//}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/d6b584f6/components/org.apache.stratos.cartridge.agent/2.1.3/src/main/java/org/apache/stratos/cartridge/agent/RegistrantMembershipListener.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cartridge.agent/2.1.3/src/main/java/org/apache/stratos/cartridge/agent/RegistrantMembershipListener.java b/components/org.apache.stratos.cartridge.agent/2.1.3/src/main/java/org/apache/stratos/cartridge/agent/RegistrantMembershipListener.java
new file mode 100644
index 0000000..05ff859
--- /dev/null
+++ b/components/org.apache.stratos.cartridge.agent/2.1.3/src/main/java/org/apache/stratos/cartridge/agent/RegistrantMembershipListener.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.cartridge.agent;
+
+import org.apache.axis2.clustering.Member;
+import org.apache.axis2.clustering.MembershipListener;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.cartridge.agent.registrant.Registrant;
+
+/**
+ * @author wso2
+ *
+ */
+public class RegistrantMembershipListener implements MembershipListener{
+
+ private ClusteringClient clusteringClient;
+ private ConfigurationContext configurationContext;
+
+ private static final Log log = LogFactory.getLog(RegistrantMembershipListener.class);
+
+ public RegistrantMembershipListener(ClusteringClient clusteringClient,
+ ConfigurationContext configurationContext) {
+ this.clusteringClient = clusteringClient;
+ this.configurationContext = configurationContext;
+ }
+
+ public void memberAdded(Member arg0, boolean arg1) {
+ log.info(" ********* Member is added to the group ... " + arg0);
+ }
+
+ public void memberDisappeared(Member member, boolean arg1) {
+ log.info(" ********** Member is removed from group ... " + member);
+ Registrant registrant = new Registrant();
+
+ //HostName : cartridgeName + "." + cartridgeInfo.getHostName()
+ // sajithphpautoscale.php.slive.com.php.domain
+ //Domain : HostName.php.domain
+ // TODO
+ /* String hostName = getHostNameFromDomain(member.getDomain());
+ log.info("Host name is returned as: " + hostName);
+ try {
+ clusteringClient.removeClusterDomain(member.getDomain(), "__$default", hostName, configurationContext);
+ } catch (CartridgeAgentException e) {
+ e.printStackTrace();
+ }*/
+ }
+
+ private String getHostNameFromDomain(String domain) {
+ return domain.substring(0, domain.length()-11);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/d6b584f6/components/org.apache.stratos.cartridge.agent/2.1.3/src/main/java/org/apache/stratos/cartridge/agent/exception/CartridgeAgentException.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cartridge.agent/2.1.3/src/main/java/org/apache/stratos/cartridge/agent/exception/CartridgeAgentException.java b/components/org.apache.stratos.cartridge.agent/2.1.3/src/main/java/org/apache/stratos/cartridge/agent/exception/CartridgeAgentException.java
new file mode 100644
index 0000000..40e38e4
--- /dev/null
+++ b/components/org.apache.stratos.cartridge.agent/2.1.3/src/main/java/org/apache/stratos/cartridge/agent/exception/CartridgeAgentException.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.cartridge.agent.exception;
+
+/**
+ * CartridgeAgentException
+ */
+public class CartridgeAgentException extends Exception {
+ public CartridgeAgentException() {
+ super();
+ }
+
+ public CartridgeAgentException(String s) {
+ super(s);
+ }
+
+ public CartridgeAgentException(String s, Throwable throwable) {
+ super(s, throwable);
+ }
+
+ public CartridgeAgentException(Throwable throwable) {
+ super(throwable);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/d6b584f6/components/org.apache.stratos.cartridge.agent/2.1.3/src/main/java/org/apache/stratos/cartridge/agent/internal/CartridgeAgentConstants.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cartridge.agent/2.1.3/src/main/java/org/apache/stratos/cartridge/agent/internal/CartridgeAgentConstants.java b/components/org.apache.stratos.cartridge.agent/2.1.3/src/main/java/org/apache/stratos/cartridge/agent/internal/CartridgeAgentConstants.java
new file mode 100644
index 0000000..2bd4a99
--- /dev/null
+++ b/components/org.apache.stratos.cartridge.agent/2.1.3/src/main/java/org/apache/stratos/cartridge/agent/internal/CartridgeAgentConstants.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+package org.apache.stratos.cartridge.agent.internal;
+
+/**
+ * Constants used in the Cartridge Agent
+ */
+public final class CartridgeAgentConstants {
+ public static final String CLUSTERING_CLIENT = "clustering.client";
+}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/d6b584f6/components/org.apache.stratos.cartridge.agent/2.1.3/src/main/java/org/apache/stratos/cartridge/agent/internal/CartridgeAgentServiceComponent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cartridge.agent/2.1.3/src/main/java/org/apache/stratos/cartridge/agent/internal/CartridgeAgentServiceComponent.java b/components/org.apache.stratos.cartridge.agent/2.1.3/src/main/java/org/apache/stratos/cartridge/agent/internal/CartridgeAgentServiceComponent.java
new file mode 100644
index 0000000..b31cc7a
--- /dev/null
+++ b/components/org.apache.stratos.cartridge.agent/2.1.3/src/main/java/org/apache/stratos/cartridge/agent/internal/CartridgeAgentServiceComponent.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+package org.apache.stratos.cartridge.agent.internal;
+
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.cartridge.agent.ClusteringClient;
+import org.apache.stratos.cartridge.agent.ELBMembershipListener;
+import org.apache.stratos.cartridge.agent.registrant.RegistrantDatabase;
+import org.apache.stratos.cartridge.agent.registrant.RegistrantHealthChecker;
+import org.osgi.service.component.ComponentContext;
+import org.wso2.carbon.utils.CarbonUtils;
+import org.wso2.carbon.utils.ConfigurationContextService;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.Properties;
+
+/**
+ * @scr.component name="org.wso2.carbon.cartridge.agent" immediate="true"
+ *
+ * @scr.reference name="config.context.service"
+ * interface="org.wso2.carbon.utils.ConfigurationContextService"
+ * cardinality="1..1" policy="dynamic"
+ * bind="setConfigurationContextService"
+ * unbind="unsetConfigurationContextService"
+ */
+public class CartridgeAgentServiceComponent {
+ private static final Log log = LogFactory.getLog(CartridgeAgentServiceComponent.class);
+
+ protected void activate(ComponentContext ctx) {
+ RegistrantHealthChecker healthChecker;
+ ConfigurationContext configurationContext = DataHolder.getServerConfigContext();
+
+ FileInputStream confFileIPStream = null;
+ try {
+ long start = System.currentTimeMillis();
+ log.info("Starting WSO2 Cartridge Agent...");
+ Properties conf = new Properties();
+ confFileIPStream = new FileInputStream(CarbonUtils.getCarbonConfigDirPath() +
+ File.separator + "agent.properties");
+ conf.load(confFileIPStream);
+ for (String name : conf.stringPropertyNames()) {
+ String value = conf.getProperty(name);
+ System.setProperty(name, value);
+ }
+
+ RegistrantDatabase registrantDatabase = new RegistrantDatabase();
+// AxisServer axisServer = new AxisServer();
+// ConfigurationContext configurationContext = axisServer.getConfigurationContext();
+ ClusteringClient clusteringClient = new ClusteringClient(registrantDatabase);
+ configurationContext.setProperty(CartridgeAgentConstants.CLUSTERING_CLIENT, clusteringClient);
+
+ String healthCheckInterval = conf.getProperty("registrant.heathCheckInterval");
+ String threadPoolSize = conf.getProperty("registrant.healthCheckThreadPoolSize");
+ int healthCheckIntervalInt =
+ (healthCheckInterval == null) ? 2000 : Integer.parseInt(healthCheckInterval);
+ int threadPoolSizeInt =
+ (threadPoolSize == null) ? 10 : Integer.parseInt(healthCheckInterval);
+ log.info("Registrant health check interval: " + healthCheckIntervalInt + "s");
+ healthChecker = new RegistrantHealthChecker(registrantDatabase,
+ clusteringClient,
+ configurationContext,
+ healthCheckIntervalInt,
+ threadPoolSizeInt
+ );
+ clusteringClient.init(conf,
+ configurationContext,
+ new ELBMembershipListener(clusteringClient,
+ configurationContext,
+ registrantDatabase,
+ healthChecker));
+ healthChecker.startAll();
+ DataHolder.setHealthChecker(healthChecker);
+ /*configurationContext.deployService(AxisService.createService(CartridgeAgentService.class.getName().toString(),
+ configurationContext.getAxisConfiguration()));*/
+// axisServer.deployService(CartridgeAgentService.class.getName());
+
+
+ // Starting cliet..
+// String trustStorePath = conf.getProperty("wso2.carbon.truststore");
+// System.setProperty("javax.net.ssl.trustStore", trustStorePath);
+// System.setProperty("javax.net.ssl.trustStorePassword", "wso2carbon");
+// // new InstanceStateNotificationClient().notify(null, null);
+//
+// System.setProperty("adc.host", conf.getProperty("adc.host"));
+// System.setProperty("adc.port", conf.getProperty("adc.port"));
+ // ----------------------
+
+ Runtime.getRuntime().addShutdownHook(new Thread(){
+
+ @Override
+ public void run() {
+ log.info("Shutting down WSO2 Cartridge Agent...");
+ }
+ });
+ log.info("Started Cartridge Agent in " + (System.currentTimeMillis() - start) + "ms");
+ } catch (Exception e) {
+ log.fatal("Could not start Cartridge Agent", e);
+ //System.exit(1);
+ } finally {
+ if (confFileIPStream != null) {
+ try {
+ confFileIPStream.close();
+ } catch (IOException e) {
+ log.error("Cannot close agent.properties file", e);
+ }
+ }
+ }
+
+ }
+
+ protected void deactivate(ComponentContext ctx) {
+ }
+
+ protected void setConfigurationContextService(ConfigurationContextService contextService) {
+ DataHolder.setServerConfigContext(contextService.getServerConfigContext());
+ }
+
+ protected void unsetConfigurationContextService(ConfigurationContextService contextService) {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/d6b584f6/components/org.apache.stratos.cartridge.agent/2.1.3/src/main/java/org/apache/stratos/cartridge/agent/internal/DataHolder.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cartridge.agent/2.1.3/src/main/java/org/apache/stratos/cartridge/agent/internal/DataHolder.java b/components/org.apache.stratos.cartridge.agent/2.1.3/src/main/java/org/apache/stratos/cartridge/agent/internal/DataHolder.java
new file mode 100644
index 0000000..955c931
--- /dev/null
+++ b/components/org.apache.stratos.cartridge.agent/2.1.3/src/main/java/org/apache/stratos/cartridge/agent/internal/DataHolder.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+package org.apache.stratos.cartridge.agent.internal;
+
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.stratos.cartridge.agent.registrant.RegistrantHealthChecker;
+
+public class DataHolder {
+ private static ConfigurationContext serverConfigContext;
+ private static RegistrantHealthChecker healthChecker;
+
+ public static void setServerConfigContext(ConfigurationContext serverConfigContext) {
+ DataHolder.serverConfigContext = serverConfigContext;
+ }
+
+ public static void setHealthChecker(RegistrantHealthChecker healthChecker) {
+ DataHolder.healthChecker = healthChecker;
+ }
+
+ public static ConfigurationContext getServerConfigContext() {
+ return serverConfigContext;
+ }
+
+ public static RegistrantHealthChecker getHealthChecker() {
+ return healthChecker;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/d6b584f6/components/org.apache.stratos.cartridge.agent/2.1.3/src/main/java/org/apache/stratos/cartridge/agent/registrant/PortMapping.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cartridge.agent/2.1.3/src/main/java/org/apache/stratos/cartridge/agent/registrant/PortMapping.java b/components/org.apache.stratos.cartridge.agent/2.1.3/src/main/java/org/apache/stratos/cartridge/agent/registrant/PortMapping.java
new file mode 100644
index 0000000..3d1bc8b
--- /dev/null
+++ b/components/org.apache.stratos.cartridge.agent/2.1.3/src/main/java/org/apache/stratos/cartridge/agent/registrant/PortMapping.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.cartridge.agent.registrant;
+
+import java.io.Serializable;
+
+/**
+ * This class provides the mapping between the actual port & proxy port of a {@link org.apache.stratos.cartridge.agent.registrant.Registrant}
+ *
+ * e.g. if the Registrant is serving HTTP requests on port 9763, and the requests to that port have
+ * to be proxied via port 80 on the ELB, the <code>primaryPort</code> has to be specified as 9763
+ * & <code>proxyPort</code> has to be specified as 80.
+ *
+ * @see org.apache.stratos.cartridge.agent.registrant.Registrant
+ */
+@SuppressWarnings("unused")
+public class PortMapping implements Serializable {
+
+ private static final long serialVersionUID = 8020727939469156788L;
+ public static final String PORT_TYPE_HTTP = "http";
+ public static final String PORT_TYPE_HTTPS = "https";
+
+ private int primaryPort;
+ private int proxyPort;
+ private String type;
+
+ public PortMapping() {
+ }
+
+ public int getPrimaryPort() {
+ return primaryPort;
+ }
+
+ public int getProxyPort() {
+ return proxyPort;
+ }
+
+ public void setPrimaryPort(int primaryPort) {
+ this.primaryPort = primaryPort;
+ }
+
+ public void setProxyPort(int proxyPort) {
+ this.proxyPort = proxyPort;
+ }
+
+ public String getType() {
+ return type;
+ }
+
+ public void setType(String type) {
+ if (type.equalsIgnoreCase(PORT_TYPE_HTTP) && type.equalsIgnoreCase(PORT_TYPE_HTTPS)) {
+ throw new RuntimeException("Invalid port type " + type);
+ }
+ this.type = type;
+ }
+
+ public boolean equals (Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ PortMapping that = (PortMapping) o;
+ return (type.equalsIgnoreCase(that.type) &&
+ (primaryPort == that.primaryPort) &&
+ (proxyPort == that.proxyPort));
+ }
+
+ @Override
+ public int hashCode() {
+ return type.hashCode() +
+ Integer.toString(primaryPort).hashCode() +
+ Integer.toString(proxyPort).hashCode();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/d6b584f6/components/org.apache.stratos.cartridge.agent/2.1.3/src/main/java/org/apache/stratos/cartridge/agent/registrant/Registrant.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cartridge.agent/2.1.3/src/main/java/org/apache/stratos/cartridge/agent/registrant/Registrant.java b/components/org.apache.stratos.cartridge.agent/2.1.3/src/main/java/org/apache/stratos/cartridge/agent/registrant/Registrant.java
new file mode 100644
index 0000000..a6ed860
--- /dev/null
+++ b/components/org.apache.stratos.cartridge.agent/2.1.3/src/main/java/org/apache/stratos/cartridge/agent/registrant/Registrant.java
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.cartridge.agent.registrant;
+
+import org.apache.axis2.clustering.ClusteringAgent;
+import org.apache.axis2.clustering.ClusteringFault;
+
+import java.io.Serializable;
+import java.util.Arrays;
+
+/**
+ * This class represents a process which registers itself with this Cartridge Agent
+ */
+@SuppressWarnings("unused")
+public class Registrant implements Serializable {
+
+ private static final long serialVersionUID = 1026289225178520964L;
+
+ private String key;
+ //private int tenantId;
+ private String tenantRange;
+ private String service;
+ private String hostName;
+ private PortMapping[] portMappings;
+ private String remoteHost;
+ private transient ClusteringAgent clusteringAgent;
+ private int minInstanceCount;
+ private int maxInstanceCount;
+ private int maxRequestsPerSecond;
+ private int roundsToAverage;
+ private double alarmingUpperRate;
+ private double alarmingLowerRate;
+ private double scaleDownFactor;
+
+
+ public String getService() {
+ return service;
+ }
+
+ public String getTenantRange() {
+ return tenantRange;
+ }
+
+ public void setTenantRange(String tenantRange) {
+ this.tenantRange = tenantRange;
+ }
+
+ public void setService(String service) {
+ this.service = service;
+ }
+
+ public String getHostName() {
+ return hostName;
+ }
+
+ public void setHostName(String hostName) {
+ this.hostName = hostName;
+ }
+
+ public PortMapping[] getPortMappings() {
+ return Arrays.copyOf(portMappings, portMappings.length);
+ }
+
+ public void setPortMappings(PortMapping[] portMappings) {
+ this.portMappings = Arrays.copyOf(portMappings, portMappings.length);
+ }
+
+ public String getKey() {
+ return key;
+ }
+
+ public void setKey(String key) {
+ this.key = key;
+ }
+
+ public int getMinInstanceCount() {
+ return minInstanceCount;
+ }
+
+ public void setMinInstanceCount(int minInstanceCount) {
+ this.minInstanceCount = minInstanceCount;
+ }
+
+ public int getMaxInstanceCount() {
+ return maxInstanceCount;
+ }
+
+ public void setMaxInstanceCount(int maxInstanceCount) {
+ this.maxInstanceCount = maxInstanceCount;
+ }
+
+ public String retrieveClusterDomain() {
+ // alias.hostname.php.domain
+ return hostName+"."+service+".domain";
+ }
+
+ public void start(ClusteringAgent clusteringAgent) throws ClusteringFault {
+ this.clusteringAgent = clusteringAgent;
+ clusteringAgent.init();
+ }
+
+ public void stop() {
+ if (clusteringAgent != null) {
+ clusteringAgent.stop();
+ }
+ clusteringAgent = null;
+ }
+
+ public boolean running() {
+ return clusteringAgent != null;
+ }
+
+ public String getRemoteHost() {
+ return remoteHost;
+ }
+
+ public void setRemoteHost(String remoteHost) {
+ this.remoteHost = remoteHost;
+ }
+
+ public int getMaxRequestsPerSecond() {
+ return maxRequestsPerSecond;
+ }
+
+ public void setMaxRequestsPerSecond(int maxRequestsPerSecond) {
+ this.maxRequestsPerSecond = maxRequestsPerSecond;
+ }
+
+ public int getRoundsToAverage() {
+ return roundsToAverage;
+ }
+
+ public void setRoundsToAverage(int roundsToAverage) {
+ this.roundsToAverage = roundsToAverage;
+ }
+
+ public double getAlarmingUpperRate() {
+ return alarmingUpperRate;
+ }
+
+ public void setAlarmingUpperRate(double alarmingUpperRate) {
+ this.alarmingUpperRate = alarmingUpperRate;
+ }
+
+ public double getAlarmingLowerRate() {
+ return alarmingLowerRate;
+ }
+
+ public void setAlarmingLowerRate(double alarmingLowerRate) {
+ this.alarmingLowerRate = alarmingLowerRate;
+ }
+
+ public double getScaleDownFactor() {
+ return scaleDownFactor;
+ }
+
+ public void setScaleDownFactor(double scaleDownFactor) {
+ this.scaleDownFactor = scaleDownFactor;
+ }
+
+ @Override
+ public String toString() {
+ return "Registrant{" +
+ "key='" + key + '\'' +
+ ", remoteHost='" + remoteHost + '\'' +
+ '}';
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ Registrant that = (Registrant) o;
+ return key.equals(that.key);
+ }
+
+ @Override
+ public int hashCode() {
+ return key.hashCode();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/d6b584f6/components/org.apache.stratos.cartridge.agent/2.1.3/src/main/java/org/apache/stratos/cartridge/agent/registrant/RegistrantDatabase.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cartridge.agent/2.1.3/src/main/java/org/apache/stratos/cartridge/agent/registrant/RegistrantDatabase.java b/components/org.apache.stratos.cartridge.agent/2.1.3/src/main/java/org/apache/stratos/cartridge/agent/registrant/RegistrantDatabase.java
new file mode 100644
index 0000000..dc511dc
--- /dev/null
+++ b/components/org.apache.stratos.cartridge.agent/2.1.3/src/main/java/org/apache/stratos/cartridge/agent/registrant/RegistrantDatabase.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.cartridge.agent.registrant;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.cartridge.agent.InstanceStateNotificationClientThread;
+import org.apache.stratos.cartridge.agent.exception.CartridgeAgentException;
+
+import java.io.*;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+/**
+ * This class represents a database for {@link Registrant}s. Registrants added to this database will be
+ * persisted, so that when the Cartridge Agent is restarted, the Registrants can be restored.
+ *
+ * @see Registrant
+ */
+public class RegistrantDatabase {
+ private static final Log log = LogFactory.getLog(RegistrantDatabase.class);
+
+ private List<Registrant> registrants = new CopyOnWriteArrayList<Registrant>();
+
+ public void add(Registrant registrant) throws CartridgeAgentException {
+ if (registrants.contains(registrant) && registrant.running()) {
+ throw new CartridgeAgentException("Active registrant with key " +
+ registrant.getKey() + " already exists");
+ }
+ synchronized (registrant) {
+ if (!isAlreadyAdded(registrant)) {
+ persist(registrant);
+ registrants.add(registrant);
+ log.info("Added registrant " + registrant);
+
+ } else {
+ log.info("Registrant " + registrant + "has been already added");
+ }
+ }
+ }
+
+ private void persist(Registrant registrant) throws CartridgeAgentException {
+ try {
+ ObjectOutput out = null;
+ try {
+ // Serialize to a file
+ if (!new File("registrants").exists() && !new File("registrants").mkdirs()) {
+ throw new IOException("Cannot create registrants directory");
+ }
+ out = new ObjectOutputStream(new FileOutputStream("registrants" + File.separator +
+ registrant.getKey() + ".ser"));
+ out.writeObject(registrant);
+ out.close();
+ } finally {
+ if (out != null) {
+ out.close();
+ }
+ }
+ } catch (IOException e) {
+ log.error("Could not serialize registrant " + registrant, e);
+ }
+ }
+
+ public void stopAll() {
+ for (Registrant registrant : registrants) {
+ new Thread(new InstanceStateNotificationClientThread(registrant, "INACTIVE")).start();
+ registrant.stop();
+ }
+ }
+
+ public boolean containsActive(Registrant registrant) {
+ return registrants.contains(registrant) &&
+ registrants.get(registrants.indexOf(registrant)).running();
+ }
+
+ public List<Registrant> getRegistrants() {
+ return Collections.unmodifiableList(registrants);
+ }
+
+ public boolean isAlreadyAdded(Registrant registrant) {
+
+ boolean alreadyAdded = false;
+ for (Registrant registrantFromDb : registrants) {
+ if(registrantFromDb.getRemoteHost().equals(registrant.getRemoteHost())) {
+
+ PortMapping[] portMappingofRegistrantOfDB = registrantFromDb.getPortMappings();
+ PortMapping[] portMappingofRegistrant = registrant.getPortMappings();
+
+ if(portMappingofRegistrant.length != portMappingofRegistrantOfDB.length) {
+ continue;
+
+ } else {
+ alreadyAdded = checkPortMappings(registrant, registrantFromDb);
+ }
+
+ } else {
+ continue;
+ }
+ }
+ return alreadyAdded;
+ }
+
+ private boolean checkPortMappings (Registrant newRegistrant, Registrant existingRegistrant) {
+
+ PortMapping[] portMappingsOfNewRegistrant = newRegistrant.getPortMappings();
+ PortMapping[] portMappingsOfExistingRegistrant = existingRegistrant.getPortMappings();
+
+ for (PortMapping portMappingOfNewRegistrant : portMappingsOfNewRegistrant) {
+ boolean matchFound = false;
+ for (PortMapping portMappingOfExistingRegistrant : portMappingsOfExistingRegistrant) {
+ if(portMappingOfExistingRegistrant.equals(portMappingOfNewRegistrant)) {
+ matchFound = true;
+ break;
+ }
+ }
+ if(!matchFound) {
+ return false;
+ }
+ }
+ if(log.isDebugEnabled()) {
+ log.debug("***********************************Found matching registrant for " + newRegistrant + " in the Registrant database");
+ }
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/d6b584f6/components/org.apache.stratos.cartridge.agent/2.1.3/src/main/java/org/apache/stratos/cartridge/agent/registrant/RegistrantHealthChecker.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cartridge.agent/2.1.3/src/main/java/org/apache/stratos/cartridge/agent/registrant/RegistrantHealthChecker.java b/components/org.apache.stratos.cartridge.agent/2.1.3/src/main/java/org/apache/stratos/cartridge/agent/registrant/RegistrantHealthChecker.java
new file mode 100644
index 0000000..2f8a227
--- /dev/null
+++ b/components/org.apache.stratos.cartridge.agent/2.1.3/src/main/java/org/apache/stratos/cartridge/agent/registrant/RegistrantHealthChecker.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.cartridge.agent.registrant;
+
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.axis2.clustering.management.GroupManagementAgent;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.cartridge.agent.ClusteringClient;
+import org.apache.stratos.cartridge.agent.InstanceStateNotificationClientThread;
+
+/**
+ * This health checker periodically checks the health of the {@link Registrant}s
+ *
+ * If a registrant is found to be unhealthy, then it is stopped. This task will also try to
+ * connect to reactivate a registrant which was previously found to be unhealthy.
+ *
+ * If the Elastic Load Balancer (ELB) is not running, this health checker will not continue with
+ * registrant health checks since it is futile to try to connect the registrants to the
+ * unavailable ELB.
+ */
+public class RegistrantHealthChecker {
+ private static final Log log = LogFactory.getLog(RegistrantHealthChecker.class);
+
+ private RegistrantDatabase database;
+ private ClusteringClient clusteringClient;
+ private ConfigurationContext configurationContext;
+ private ScheduledExecutorService scheduler;
+ private volatile boolean isELBRunning;
+ private int healthCheckInterval;
+
+ public RegistrantHealthChecker(RegistrantDatabase database,
+ ClusteringClient clusteringClient,
+ ConfigurationContext configurationContext,
+ int healthCheckInterval,
+ int threadPoolSize) {
+ this.database = database;
+ this.clusteringClient = clusteringClient;
+ this.configurationContext = configurationContext;
+ this.healthCheckInterval = healthCheckInterval;
+ scheduler = Executors.newScheduledThreadPool(threadPoolSize);
+ }
+
+ public void startAll() {
+ List<Registrant> registrants = database.getRegistrants();
+ for (Registrant registrant : registrants) {
+ scheduler.scheduleWithFixedDelay(new HealthCheckerTask(registrant), 45,
+ healthCheckInterval, TimeUnit.SECONDS);
+ if (log.isDebugEnabled()) {
+ log.debug("Started a health checker for " + registrant + " ...");
+ }
+ }
+ }
+
+ public void start(Registrant registrant){
+ scheduler.scheduleWithFixedDelay(new HealthCheckerTask(registrant), 45,
+ healthCheckInterval, TimeUnit.SECONDS);
+ if (log.isDebugEnabled()) {
+ log.debug("Added a health checker for " + registrant + " ...");
+ }
+ }
+
+ public void setELBRunning(boolean ELBRunning) {
+ isELBRunning = ELBRunning;
+ }
+
+ private final class HealthCheckerTask implements Runnable {
+ Registrant registrant;
+ public HealthCheckerTask(Registrant registrant){
+ this.registrant = registrant;
+ }
+ public void run() {
+ if(!isELBRunning){
+ return;
+ }
+ try {
+ boolean healthyRegistrant = RegistrantUtil.isHealthy(registrant);
+ if (!healthyRegistrant && registrant.running()) {
+ registrant.stop();
+ new Thread(new InstanceStateNotificationClientThread(registrant, "INACTIVE")).start();
+ log.warn("Stopped registrant " + registrant + " since it is unhealthy." );
+ } else if (healthyRegistrant && !registrant.running()) {
+ registrant.stop();
+ new Thread(new InstanceStateNotificationClientThread(registrant, "INACTIVE")).start();
+ clusteringClient.joinGroup(registrant, configurationContext);
+ log.info("Restarted registrant " + registrant + " after it became active");
+ }
+ } catch (Exception e) {
+ log.error("Error occurred while running registrant health check", e);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/d6b584f6/components/org.apache.stratos.cartridge.agent/2.1.3/src/main/java/org/apache/stratos/cartridge/agent/registrant/RegistrantUtil.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cartridge.agent/2.1.3/src/main/java/org/apache/stratos/cartridge/agent/registrant/RegistrantUtil.java b/components/org.apache.stratos.cartridge.agent/2.1.3/src/main/java/org/apache/stratos/cartridge/agent/registrant/RegistrantUtil.java
new file mode 100644
index 0000000..60208b4
--- /dev/null
+++ b/components/org.apache.stratos.cartridge.agent/2.1.3/src/main/java/org/apache/stratos/cartridge/agent/registrant/RegistrantUtil.java
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.cartridge.agent.registrant;
+
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.cartridge.agent.ClusteringClient;
+import org.apache.stratos.cartridge.agent.exception.CartridgeAgentException;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.net.*;
+
+/**
+ * Utility method collection for handling {@link Registrant}s
+ *
+ * @see Registrant
+ */
+public class RegistrantUtil {
+ private static final Log log = LogFactory.getLog(RegistrantUtil.class);
+ private static boolean registrantsReloaded = false;
+ /**
+ * Before adding a member, we will try to verify whether we can connect to it
+ *
+ * @param registrant The member whose connectvity needs to be verified
+ * @return true, if the member can be contacted; false, otherwise.
+ */
+ public static boolean isHealthy(Registrant registrant) {
+ if (log.isDebugEnabled()) {
+ log.debug("Trying to connect to registrant " + registrant + "...");
+ }
+ if(log.isDebugEnabled()) {
+ log.debug("***********************************************isHealthy() method for registrant " + registrant);
+ }
+ String registrantRemoteHost = registrant.getRemoteHost();
+ log.debug("remote host : " + registrantRemoteHost);
+ if(registrantRemoteHost == null){
+ registrantRemoteHost = "localhost";
+ }
+ InetAddress addr;
+ try {
+ addr = InetAddress.getByName(registrantRemoteHost);
+ if(log.isDebugEnabled()) {
+ log.debug("***********************************************Host resolved for registrant " + registrant);
+ }
+ } catch (UnknownHostException e) {
+ log.error("Registrant " + registrant + " is unhealthy");
+ return false;
+ }
+ PortMapping[] portMappings = registrant.getPortMappings();
+
+ int maxRetries = Integer.parseInt(System.getProperty("clustering.maxRetries"));
+
+ for (int retries = maxRetries; retries > 0; retries--) {
+ try {
+ for (PortMapping portMapping : portMappings) {
+ int port = portMapping.getPrimaryPort();
+ if(log.isDebugEnabled()) {
+ log.debug("***********************************************primary port" + port + " registrant " + registrant);
+ }
+ if (port != -1 && port != 0) {
+ if(log.isDebugEnabled()) {
+ log.debug("***********************************************connecting to " + registrant +
+ " re-try: " + retries);
+ }
+ SocketAddress httpSockaddr = new InetSocketAddress(addr, port);
+ new Socket().connect(httpSockaddr, 10000);
+ if(log.isDebugEnabled()) {
+ log.debug("***********************************************connected successfully to port: " + port);
+ }
+ }
+ }
+ return true;
+ } catch (IOException e) {
+ if (log.isDebugEnabled()) {
+ log.debug("Error occurred.. " + e.getMessage());
+ }
+ String msg = e.getMessage();
+ if (!msg.contains("Connection refused") && !msg.contains("connect timed out")) {
+ String msg2 = "Cannot connect to registrant " + registrant;
+ log.error(msg2, e);
+ }
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ignored) {
+ }
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Reload all the registrants persisted in the file system
+ * @param clusteringClient ClusteringClient
+ * @param configurationContext ConfigurationContext
+ * @param registrantDatabase RegistrantDatabase
+ * @throws CartridgeAgentException If reloading registrants fails
+ */
+ public static void reloadRegistrants(ClusteringClient clusteringClient,
+ ConfigurationContext configurationContext,
+ RegistrantDatabase registrantDatabase) throws CartridgeAgentException {
+
+ if(registrantsReloaded) {
+ log.info("Registrants already re-loaded, therefore not re-loading again");
+ return;
+ }
+
+ File registrants = new File("registrants");
+ if (!registrants.exists()) {
+ log.info("Registrant information doesn't exist in the file system");
+ return;
+ }
+ File[] files = registrants.listFiles();
+ if (files == null) {
+ log.error("Directory 'Registrants' is invalid");
+ return;
+
+ } else if (files.length == 0) {
+ log.info("No registrant information found in the Registrants directory");
+ return;
+ }
+
+ for (File file : files) {
+ try {
+ Registrant registrant =
+ deserializeRegistrant("registrants" + File.separator + file.getName());
+ if (!registrantDatabase.containsActive(registrant)) {
+ clusteringClient.joinGroup(registrant, configurationContext);
+ }
+ } catch (IOException e) {
+ log.error("Cannot deserialize registrant file " + file.getName(), e);
+ }
+ }
+ registrantsReloaded = true;
+ }
+
+ private static Registrant deserializeRegistrant(String fileName) throws IOException {
+ Registrant registrant = null;
+ ObjectInputStream in = null;
+
+ try {
+ // Deserialize from a file
+ File file = new File(fileName);
+ in = new ObjectInputStream(new FileInputStream(file));
+ // Deserialize the object
+ registrant = (Registrant) in.readObject();
+ } catch (ClassNotFoundException ignored) {
+ } finally {
+ if (in != null) {
+ in.close();
+ }
+ }
+ return registrant;
+ }
+
+
+}