You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by im...@apache.org on 2015/09/10 20:00:03 UTC
stratos git commit: Optimizing kubernetes service creation logic
Repository: stratos
Updated Branches:
refs/heads/stratos-4.1.x 81fe08c22 -> 0248d2068
Optimizing kubernetes service creation logic
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/0248d206
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/0248d206
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/0248d206
Branch: refs/heads/stratos-4.1.x
Commit: 0248d206848f7aa3984fc3eab5215dce29dab7cc
Parents: 81fe08c
Author: Imesh Gunaratne <im...@apache.org>
Authored: Thu Sep 10 23:29:11 2015 +0530
Committer: Imesh Gunaratne <im...@apache.org>
Committed: Thu Sep 10 23:29:51 2015 +0530
----------------------------------------------------------------------
.../cloud/controller/domain/ClusterContext.java | 19 +++-
.../kubernetes/KubernetesClusterContext.java | 69 +++++++----
.../iaases/kubernetes/KubernetesIaas.java | 114 ++++++++++---------
.../messaging/topology/TopologyBuilder.java | 9 +-
4 files changed, 124 insertions(+), 87 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/stratos/blob/0248d206/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/domain/ClusterContext.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/domain/ClusterContext.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/domain/ClusterContext.java
index 09b987d..a559a1e 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/domain/ClusterContext.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/domain/ClusterContext.java
@@ -23,7 +23,9 @@ import org.apache.stratos.common.Properties;
import org.apache.stratos.messaging.domain.topology.KubernetesService;
import java.io.Serializable;
-import java.util.List;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
/**
* Holds runtime data of a Cluster.
@@ -44,12 +46,13 @@ public class ClusterContext implements Serializable {
// on an unregistration.
private long timeoutInMillis;
private Properties properties;
- private List<KubernetesService> kubernetesServices;
+ private Map<String, KubernetesService> kubernetesServices;
private String kubernetesClusterId;
public ClusterContext(String applicationId, String cartridgeType, String clusterId, String payload, String hostName,
boolean isLbCluster, Properties properties) {
+ this.kubernetesServices = new HashMap<>();
this.applicationId = applicationId;
this.cartridgeType = cartridgeType;
this.clusterId = clusterId;
@@ -115,12 +118,16 @@ public class ClusterContext implements Serializable {
this.properties = properties;
}
- public List<KubernetesService> getKubernetesServices() {
- return kubernetesServices;
+ public Collection<KubernetesService> getKubernetesServices() {
+ return kubernetesServices.values();
}
- public void setKubernetesServices(List<KubernetesService> kubernetesServices) {
- this.kubernetesServices = kubernetesServices;
+ public void addKubernetesService(KubernetesService kubernetesService) {
+ this.kubernetesServices.put(kubernetesService.getId(), kubernetesService);
+ }
+
+ public void removeKubernetesService(String serviceName) {
+ kubernetesServices.remove(serviceName);
}
public void setKubernetesClusterId(String kubernetesClusterId) {
http://git-wip-us.apache.org/repos/asf/stratos/blob/0248d206/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/domain/kubernetes/KubernetesClusterContext.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/domain/kubernetes/KubernetesClusterContext.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/domain/kubernetes/KubernetesClusterContext.java
index 38c49a0..7e6d557 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/domain/kubernetes/KubernetesClusterContext.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/domain/kubernetes/KubernetesClusterContext.java
@@ -21,10 +21,10 @@ package org.apache.stratos.cloud.controller.domain.kubernetes;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.stratos.kubernetes.client.KubernetesApiClient;
+import org.apache.stratos.messaging.domain.topology.KubernetesService;
import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
+import java.util.*;
import java.util.concurrent.atomic.AtomicLong;
/**
@@ -33,28 +33,28 @@ import java.util.concurrent.atomic.AtomicLong;
public class KubernetesClusterContext implements Serializable {
private static final long serialVersionUID = -802025758806195791L;
+
private static final Log log = LogFactory.getLog(KubernetesClusterContext.class);
- // id of the Kubernetes cluster
private String kubernetesClusterId;
private int upperPort;
private int lowerPort;
- // kubernetes master ip
private String masterIp;
private String masterPort;
- // available list of ports
- private List<Integer> servicePorts;
- // kubernetes client API instance
+ private List<Integer> servicePortSequence;
+ private Map<String, KubernetesService> kubernetesServices;
private transient KubernetesApiClient kubApi;
private AtomicLong serviceSeqNo;
private AtomicLong podSeqNo;
public KubernetesClusterContext(String id, String masterIp, String masterPort, int lowerPort, int upperPort) {
- servicePorts = new ArrayList<Integer>();
+ this.servicePortSequence = new ArrayList<>();
+ this.kubernetesServices = new HashMap<>();
+
this.lowerPort = lowerPort;
this.upperPort = upperPort;
// Generate the ports
- generateServicePorts(lowerPort, upperPort);
+ initializeServicePortSequence(lowerPort, upperPort);
this.kubernetesClusterId = id;
this.masterIp = masterIp;
this.masterPort = masterPort;
@@ -77,33 +77,58 @@ public class KubernetesClusterContext implements Serializable {
}
public List<Integer> getServicePorts() {
- return servicePorts;
+ return servicePortSequence;
}
public void setServicePorts(List<Integer> servicePorts) {
- this.servicePorts = servicePorts;
+ this.servicePortSequence = servicePorts;
}
+ /***
+ * Get next available service port.
+ * @return
+ */
public int getNextServicePort() {
- if (servicePorts.isEmpty()) {
+ if (servicePortSequence.isEmpty()) {
return -1;
}
- return servicePorts.remove(0);
+ return servicePortSequence.remove(0);
}
+ /**
+ * Deallocate a service port by adding it again to the sequence.
+ * @param port
+ */
public void deallocatePort(int port) {
- if (!servicePorts.contains(port)) {
- servicePorts.add(port);
- // TODO Sort elements
+ if (!servicePortSequence.contains(port)) {
+ servicePortSequence.add(port);
+ Collections.sort(servicePortSequence);
}
}
- private void generateServicePorts(int lowerPort, int upperPort) {
+ /**
+ * Initialize service port sequence according to the given port range.
+ * @param lowerPort
+ * @param upperPort
+ */
+ private void initializeServicePortSequence(int lowerPort, int upperPort) {
for (int port = lowerPort; port <= upperPort; port++) {
- servicePorts.add(port);
+ servicePortSequence.add(port);
}
}
+ public void addKubernetesService(KubernetesService service) {
+ kubernetesServices.put(service.getId(), service);
+ }
+
+ public void removeKubernetesService(String serviceName) {
+ kubernetesServices.remove(serviceName);
+ }
+
+ public Collection<KubernetesService> getKubernetesServices() {
+ return kubernetesServices.values();
+ }
+
public String getMasterIp() {
return masterIp;
}
@@ -151,7 +176,7 @@ public class KubernetesClusterContext implements Serializable {
public int hashCode() {
final int prime = 31;
int result = 1;
- result = prime * result + ((servicePorts == null) ? 0 : servicePorts.hashCode());
+ result = prime * result + ((servicePortSequence == null) ? 0 : servicePortSequence.hashCode());
result = prime * result + ((kubernetesClusterId == null) ? 0 : kubernetesClusterId.hashCode());
result = prime * result + lowerPort;
result = prime * result + ((masterIp == null) ? 0 : masterIp.hashCode());
@@ -169,10 +194,10 @@ public class KubernetesClusterContext implements Serializable {
if (getClass() != obj.getClass())
return false;
KubernetesClusterContext other = (KubernetesClusterContext) obj;
- if (servicePorts == null) {
- if (other.servicePorts != null)
+ if (servicePortSequence == null) {
+ if (other.servicePortSequence != null)
return false;
- } else if (!servicePorts.equals(other.servicePorts))
+ } else if (!servicePortSequence.equals(other.servicePortSequence))
return false;
if (kubernetesClusterId == null) {
if (other.kubernetesClusterId != null)
http://git-wip-us.apache.org/repos/asf/stratos/blob/0248d206/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/iaases/kubernetes/KubernetesIaas.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/iaases/kubernetes/KubernetesIaas.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/iaases/kubernetes/KubernetesIaas.java
index 7c987b6..9796a8c 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/iaases/kubernetes/KubernetesIaas.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/iaases/kubernetes/KubernetesIaas.java
@@ -19,6 +19,7 @@
package org.apache.stratos.cloud.controller.iaases.kubernetes;
+import com.google.common.collect.Lists;
import io.fabric8.kubernetes.api.model.*;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang.NotImplementedException;
@@ -467,27 +468,10 @@ public class KubernetesIaas extends Iaas {
sessionAffinity = sessionAffinityProperty.getValue();
}
- List<KubernetesService> kubernetesServices = clusterContext.getKubernetesServices();
- if (kubernetesServices == null) {
- kubernetesServices = new ArrayList<KubernetesService>();
- }
-
// Prepare minion public IP addresses
- List<String> minionPrivateIPList = new ArrayList<String>();
- List<String> minionPublicIPList = new ArrayList<String>();
- KubernetesHost[] kubernetesHosts = kubernetesCluster.getKubernetesHosts();
- if ((kubernetesHosts == null) || (kubernetesHosts.length == 0) || (kubernetesHosts[0] == null)) {
- throw new RuntimeException("Hosts not found in kubernetes cluster: [cluster] "
- + kubernetesCluster.getClusterId());
- }
- for (KubernetesHost host : kubernetesHosts) {
- if (host != null) {
- minionPrivateIPList.add(host.getPrivateIPAddress());
- minionPublicIPList.add(host.getPublicIPAddress());
- }
- }
+ List<String> minionPublicIPList = prepareMinionIPAddresses(kubernetesCluster);
if (log.isDebugEnabled()) {
- log.debug(String.format("Minion private IPs: %s", minionPrivateIPList));
+ log.debug(String.format("Minion public IPs: %s", minionPublicIPList));
}
Collection<ClusterPortMapping> clusterPortMappings = CloudControllerContext.getInstance()
@@ -495,15 +479,12 @@ public class KubernetesIaas extends Iaas {
if (clusterPortMappings != null) {
String serviceLabel = DigestUtils.md5Hex(clusterId);
- if(log.isDebugEnabled()) {
- log.debug("Retrieving existing kubernetes services...");
- }
- List<Service> services = kubernetesApi.getServices();
+ Collection<KubernetesService> kubernetesServices = kubernetesClusterContext.getKubernetesServices();
for (ClusterPortMapping clusterPortMapping : clusterPortMappings) {
// Skip if already created
int containerPort = clusterPortMapping.getPort();
- if (kubernetesServiceExist(services, serviceLabel, containerPort)) {
+ if (kubernetesServiceExist(kubernetesServices, containerPort)) {
if(log.isDebugEnabled()) {
log.debug(String.format("Kubernetes service already exists: [kubernetes-cluster] %s " +
"[cluster] %s [service-label] %s [container-port] %d ",
@@ -514,12 +495,12 @@ public class KubernetesIaas extends Iaas {
// Find next service sequence no
long serviceSeqNo = kubernetesClusterContext.getServiceSeqNo().incrementAndGet();
- String serviceId = KubernetesIaasUtil.fixSpecialCharacters("service" + "-" + (serviceSeqNo));
+ String serviceName = KubernetesIaasUtil.fixSpecialCharacters("service" + "-" + (serviceSeqNo));
if (log.isInfoEnabled()) {
log.info(String.format("Creating kubernetes service: [cluster] %s [service] %s [service-label] %s " +
"[protocol] %s [service-port] %d [container-port] %s", clusterId,
- serviceId, serviceLabel, clusterPortMapping.getProtocol(),
+ serviceName, serviceLabel, clusterPortMapping.getProtocol(),
clusterPortMapping.getKubernetesServicePort(), containerPort));
}
@@ -530,15 +511,15 @@ public class KubernetesIaas extends Iaas {
try {
// If kubernetes service is already created, skip creating a new one
- if (kubernetesApi.getService(serviceId) == null) {
+ if (kubernetesApi.getService(serviceName) == null) {
// Services need to use minions private IP addresses for creating iptable rules
- kubernetesApi.createService(serviceId, serviceLabel, servicePort, serviceType, containerPortName,
+ kubernetesApi.createService(serviceName, serviceLabel, servicePort, serviceType, containerPortName,
containerPort, sessionAffinity);
} else {
if (log.isDebugEnabled()) {
log.debug(String.format("Kubernetes service is already created: [cluster] %s [service] %s " +
"[protocol] %s [service-port] %d [container-port] %d", clusterId,
- serviceId, clusterPortMapping.getProtocol(), servicePort, containerPort));
+ serviceName, clusterPortMapping.getProtocol(), servicePort, containerPort));
}
}
} finally {
@@ -551,7 +532,10 @@ public class KubernetesIaas extends Iaas {
} catch (InterruptedException ignore) {
}
- Service service = kubernetesApi.getService(serviceId);
+ Service service = kubernetesApi.getService(serviceName);
+ if(service == null) {
+ throw new KubernetesClientException("Kubernetes service not found: [service] " + serviceName);
+ }
KubernetesService kubernetesService = new KubernetesService();
kubernetesService.setId(service.getMetadata().getName());
@@ -572,37 +556,45 @@ public class KubernetesIaas extends Iaas {
}
kubernetesService.setContainerPort(containerPort);
- kubernetesServices.add(kubernetesService);
+
+ kubernetesClusterContext.addKubernetesService(kubernetesService);
+ clusterContext.addKubernetesService(kubernetesService);
+ CloudControllerContext.getInstance().persist();
if (log.isInfoEnabled()) {
log.info(String.format("Kubernetes service successfully created: [cluster] %s [service] %s " +
"[protocol] %s [node-port] %d [container-port] %s", clusterId,
- serviceId, clusterPortMapping.getProtocol(), servicePort, containerPort));
+ serviceName, clusterPortMapping.getProtocol(), servicePort, containerPort));
}
}
}
+ }
- // Add kubernetes services to cluster context and persist
- clusterContext.setKubernetesServices(kubernetesServices);
- CloudControllerContext.getInstance().persist();
+ private List<String> prepareMinionIPAddresses(KubernetesCluster kubernetesCluster) {
+ List<String> minionPublicIPList = new ArrayList<String>();
+ KubernetesHost[] kubernetesHosts = kubernetesCluster.getKubernetesHosts();
+ if ((kubernetesHosts == null) || (kubernetesHosts.length == 0) || (kubernetesHosts[0] == null)) {
+ throw new RuntimeException("Hosts not found in kubernetes cluster: [cluster] "
+ + kubernetesCluster.getClusterId());
+ }
+ for (KubernetesHost host : kubernetesHosts) {
+ if (host != null) {
+ minionPublicIPList.add(host.getPublicIPAddress());
+ }
+ }
+ return minionPublicIPList;
}
/**
* Returns true if a kubernetes service exists with the given container port
* @param services
- * @param serviceLabel
* @param containerPort
* @return
*/
- private boolean kubernetesServiceExist(List<Service> services, String serviceLabel, int containerPort) {
- for(Service service : services) {
- Map<String, String> labels = service.getMetadata().getLabels();
- if((labels != null) && (labels.get(KubernetesConstants.LABEL_NAME).equals(serviceLabel))) {
- for (ServicePort port : service.getSpec().getPorts()) {
- if (port.getPort() == containerPort) {
- return true;
- }
- }
+ private boolean kubernetesServiceExist(Collection<KubernetesService> services, int containerPort) {
+ for(KubernetesService service : services) {
+ if (service.getContainerPort() == containerPort) {
+ return true;
}
}
return false;
@@ -740,13 +732,16 @@ public class KubernetesIaas extends Iaas {
KubernetesApiClient kubApi = kubClusterContext.getKubApi();
// Remove kubernetes services
- List<KubernetesService> kubernetesServices = clusterContext.getKubernetesServices();
+ List<KubernetesService> kubernetesServices = Lists.newArrayList(clusterContext.getKubernetesServices());
if (kubernetesServices != null) {
for (KubernetesService kubernetesService : kubernetesServices) {
try {
- kubApi.deleteService(kubernetesService.getId());
- int allocatedPort = kubernetesService.getPort();
- kubClusterContext.deallocatePort(allocatedPort);
+ String serviceId = kubernetesService.getId();
+ kubApi.deleteService(serviceId);
+
+ kubClusterContext.deallocatePort(kubernetesService.getPort());
+ kubClusterContext.removeKubernetesService(serviceId);
+ clusterContext.removeKubernetesService(serviceId);
} catch (KubernetesClientException e) {
log.error("Could not remove kubernetes service: [cluster-id] " + clusterId, e);
}
@@ -934,23 +929,32 @@ public class KubernetesIaas extends Iaas {
*/
public static void removeKubernetesServices(String applicationId, String clusterId) {
- ClusterContext clusterContext =
- CloudControllerContext.getInstance().getClusterContext(clusterId);
+ ClusterContext clusterContext = CloudControllerContext.getInstance().getClusterContext(clusterId);
+
if (clusterContext != null) {
String kubernetesClusterId = clusterContext.getKubernetesClusterId();
+
if (org.apache.commons.lang3.StringUtils.isNotBlank(kubernetesClusterId)) {
KubernetesClusterContext kubernetesClusterContext =
CloudControllerContext.getInstance().getKubernetesClusterContext(kubernetesClusterId);
+
if (kubernetesClusterContext != null) {
KubernetesApiClient kubernetesApiClient = kubernetesClusterContext.getKubApi();
- for (KubernetesService kubernetesService : clusterContext.getKubernetesServices()) {
+ ArrayList<KubernetesService> kubernetesServices = Lists.newArrayList(clusterContext.getKubernetesServices());
+
+ for (KubernetesService kubernetesService : kubernetesServices) {
+ String serviceId = kubernetesService.getId();
log.info(String.format("Deleting kubernetes service: [application-id] %s " +
- "[service-id] %s", applicationId, kubernetesService.getId()));
+ "[service-id] %s", applicationId, serviceId));
+
try {
- kubernetesApiClient.deleteService(kubernetesService.getId());
+ kubernetesApiClient.deleteService(serviceId);
+ kubernetesClusterContext.deallocatePort(kubernetesService.getPort());
+ kubernetesClusterContext.removeKubernetesService(serviceId);
+ clusterContext.removeKubernetesService(serviceId);
} catch (KubernetesClientException e) {
log.error(String.format("Could not delete kubernetes service: [application-id] %s " +
- "[service-id] %s", applicationId, kubernetesService.getId()));
+ "[service-id] %s", applicationId, serviceId));
}
}
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/0248d206/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java
index ecd2728..26d1bbd 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/topology/TopologyBuilder.java
@@ -18,6 +18,7 @@
*/
package org.apache.stratos.cloud.controller.messaging.topology;
+import com.google.common.collect.Lists;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -455,7 +456,7 @@ public class TopologyBuilder {
Cluster cluster = service.getCluster(memberContext.getClusterId());
String clusterId = cluster.getClusterId();
ClusterContext clusterContext = CloudControllerContext.getInstance().getClusterContext(clusterId);
- List<KubernetesService> kubernetesServices = clusterContext.getKubernetesServices();
+ List<KubernetesService> kubernetesServices = Lists.newArrayList(clusterContext.getKubernetesServices());
if (kubernetesServices != null) {
cluster.setKubernetesServices(kubernetesServices);
@@ -481,7 +482,7 @@ public class TopologyBuilder {
}
}
- private static int findKubernetesServicePort(String clusterId, List<KubernetesService> kubernetesServices,
+ private static int findKubernetesServicePort(String clusterId, Collection<KubernetesService> kubernetesServices,
PortMapping portMapping) {
for (KubernetesService kubernetesService : kubernetesServices) {
if (kubernetesService.getProtocol().equals(portMapping.getProtocol())) {
@@ -606,7 +607,7 @@ public class TopologyBuilder {
List<PortMapping> portMappings = Arrays.asList(cartridge.getPortMappings());
String clusterId = cluster.getClusterId();
ClusterContext clusterContext = CloudControllerContext.getInstance().getClusterContext(clusterId);
- List<KubernetesService> kubernetesServices = clusterContext.getKubernetesServices();
+ Collection<KubernetesService> kubernetesServices = clusterContext.getKubernetesServices();
for (PortMapping portMapping : portMappings) {
if (kubernetesServices != null) {
@@ -853,7 +854,7 @@ public class TopologyBuilder {
clusterStatusClusterActivatedEvent.getInstanceId());
try {
TopologyManager.acquireWriteLock();
- List<KubernetesService> kubernetesServices = clusterContext.getKubernetesServices();
+ Collection<KubernetesService> kubernetesServices = clusterContext.getKubernetesServices();
if (kubernetesServices != null) {