You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by ni...@apache.org on 2014/10/08 18:53:08 UTC

[14/27] git commit: on start containers method, do a label query and find the Pods being created and creating corresponding MemberContexts.

on start containers method, do a label query and find the Pods being created and creating corresponding MemberContexts.


Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/55150013
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/55150013
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/55150013

Branch: refs/heads/container-autoscaling
Commit: 55150013c6122d6217a5e1235cd1a8ae8f5baf64
Parents: 1dddf29
Author: Nirmal Fernando <ni...@gmail.com>
Authored: Tue Oct 7 18:15:22 2014 +0530
Committer: Nirmal Fernando <ni...@gmail.com>
Committed: Wed Oct 8 22:21:29 2014 +0530

----------------------------------------------------------------------
 .../impl/CloudControllerServiceImpl.java        | 166 ++++++++++---------
 1 file changed, 87 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/stratos/blob/55150013/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/impl/CloudControllerServiceImpl.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/impl/CloudControllerServiceImpl.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/impl/CloudControllerServiceImpl.java
index 18269e6..a413882 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/impl/CloudControllerServiceImpl.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/impl/CloudControllerServiceImpl.java
@@ -28,8 +28,9 @@ import org.apache.stratos.cloud.controller.concurrent.PartitionValidatorCallable
 import org.apache.stratos.cloud.controller.concurrent.ThreadExecutor;
 import org.apache.stratos.cloud.controller.deployment.partition.Partition;
 import org.apache.stratos.cloud.controller.exception.*;
-import org.apache.stratos.cloud.controller.functions.MemberContextToKubernetesService;
-import org.apache.stratos.cloud.controller.functions.MemberContextToReplicationController;
+import org.apache.stratos.cloud.controller.functions.ContainerClusterContextToKubernetesService;
+import org.apache.stratos.cloud.controller.functions.ContainerClusterContextToReplicationController;
+import org.apache.stratos.cloud.controller.functions.PodToMemberContext;
 import org.apache.stratos.cloud.controller.interfaces.CloudControllerService;
 import org.apache.stratos.cloud.controller.interfaces.Iaas;
 import org.apache.stratos.cloud.controller.persist.Deserializer;
@@ -45,6 +46,8 @@ import org.apache.stratos.cloud.controller.validate.interfaces.PartitionValidato
 import org.apache.stratos.common.constants.StratosConstants;
 import org.apache.stratos.kubernetes.client.KubernetesApiClient;
 import org.apache.stratos.kubernetes.client.exceptions.KubernetesClientException;
+import org.apache.stratos.kubernetes.client.model.Label;
+import org.apache.stratos.kubernetes.client.model.Pod;
 import org.apache.stratos.kubernetes.client.model.ReplicationController;
 import org.apache.stratos.kubernetes.client.model.Service;
 import org.apache.stratos.messaging.domain.topology.Member;
@@ -1330,33 +1333,32 @@ public class CloudControllerServiceImpl implements CloudControllerService {
     }
 
 	@Override
-	public MemberContext startContainers(MemberContext memberContext)
+	public MemberContext[] startContainers(ContainerClusterContext containerClusterContext)
 			throws UnregisteredCartridgeException {
 		
 		if(log.isDebugEnabled()) {
-    		log.debug("CloudControllerServiceImpl:startContainer");
+    		log.debug("CloudControllerServiceImpl:startContainers");
     	}
 
-        if (memberContext == null) {
-            String msg = "Instance start-up failed. Member is null.";
+        if (containerClusterContext == null) {
+            String msg = "Instance start-up failed. ContainerClusterContext is null.";
             log.error(msg);
             throw new IllegalArgumentException(msg);
         }
 
-        String clusterId = memberContext.getClusterId();
+        String clusterId = containerClusterContext.getClusterId();
         if(log.isDebugEnabled()) {
-        	log.debug("Received an instance spawn request : " + memberContext.toString());
+        	log.debug("Received an instance spawn request : " + containerClusterContext.toString());
         }
 
         ClusterContext ctxt = dataHolder.getClusterContext(clusterId);
 
         if (ctxt == null) {
-            String msg = "Instance start-up failed. Invalid cluster id. " + memberContext.toString();
+            String msg = "Instance start-up failed. Invalid cluster id. " + containerClusterContext.toString();
             log.error(msg);
             throw new IllegalArgumentException(msg);
         }
         
-        
         String cartridgeType = ctxt.getCartridgeType();
 
         Cartridge cartridge = dataHolder.getCartridge(cartridgeType);
@@ -1364,59 +1366,28 @@ public class CloudControllerServiceImpl implements CloudControllerService {
         if (cartridge == null) {
             String msg =
                          "Instance start-up failed. No matching Cartridge found [type] "+cartridgeType +". "+
-                                 memberContext.toString();
+                                 containerClusterContext.toString();
             log.error(msg);
             throw new UnregisteredCartridgeException(msg);
         }
 
-        memberContext.setCartridgeType(cartridgeType);
-
         try {
-            // generating the Unique member ID...
-            String memberID = generateMemberId(clusterId);
-            memberContext.setMemberId(memberID);
-
-			String kubernetesClusterId = CloudControllerUtil.getProperty(ctxt.getProperties(), 
-					StratosConstants.KUBERNETES_CLUSTER_ID);
-			
-			if (kubernetesClusterId == null) {
-				String msg = "Instance start-up failed. Cannot find '"+
-						StratosConstants.KUBERNETES_CLUSTER_ID+"'. " + ctxt;
-				log.error(msg);
-				throw new IllegalArgumentException(msg);
-			}
-			
-			String kubernetesMasterIp = CloudControllerUtil.getProperty(memberContext.getProperties(), 
-					StratosConstants.KUBERNETES_MASTER_IP);
-			
-			if (kubernetesMasterIp == null) {
-				String msg = "Instance start-up failed. Cannot find '"+
-						StratosConstants.KUBERNETES_MASTER_IP+"'. " + memberContext;
-				log.error(msg);
-				throw new IllegalArgumentException(msg);
-			}
-			
-			String kubernetesPortRange = CloudControllerUtil.getProperty(memberContext.getProperties(), 
-					StratosConstants.KUBERNETES_PORT_RANGE);
-			
-			if (kubernetesPortRange == null) {
-				String msg = "Instance start-up failed. Cannot find '"+
-						StratosConstants.KUBERNETES_PORT_RANGE+"'. " + memberContext;
-				log.error(msg);
-				throw new IllegalArgumentException(msg);
-			}
+            validateProperty(StratosConstants.KUBERNETES_MIN_REPLICAS, ctxt);
+            String kubernetesClusterId = validateProperty(StratosConstants.KUBERNETES_CLUSTER_ID, ctxt);
+            String kubernetesMasterIp = validateProperty(StratosConstants.KUBERNETES_MASTER_IP, containerClusterContext);
+            String kubernetesPortRange = validateProperty(StratosConstants.KUBERNETES_PORT_RANGE, containerClusterContext);
 			
 			KubernetesClusterContext kubClusterContext = getKubernetesClusterContext(kubernetesClusterId, kubernetesMasterIp, kubernetesPortRange);
 			
 			KubernetesApiClient kubApi = kubClusterContext.getKubApi();
 			
 			// first let's create a replication controller.
-			MemberContextToReplicationController controllerFunction = new MemberContextToReplicationController();
-			ReplicationController controller = controllerFunction.apply(memberContext);
+			ContainerClusterContextToReplicationController controllerFunction = new ContainerClusterContextToReplicationController();
+			ReplicationController controller = controllerFunction.apply(containerClusterContext);
 			
 			if (log.isDebugEnabled()) {
 				log.debug("Cloud Controller is delegating request to start a replication controller "+controller+
-						" for "+ memberContext + " to Kubernetes layer.");
+						" for "+ containerClusterContext + " to Kubernetes layer.");
 			}
 			
 			kubApi.createReplicationController(controller);
@@ -1427,12 +1398,12 @@ public class CloudControllerServiceImpl implements CloudControllerService {
 			}
 			
 			// secondly let's create a kubernetes service proxy to load balance these containers
-			MemberContextToKubernetesService serviceFunction = new MemberContextToKubernetesService();
-			Service service = serviceFunction.apply(memberContext);
+			ContainerClusterContextToKubernetesService serviceFunction = new ContainerClusterContextToKubernetesService();
+			Service service = serviceFunction.apply(containerClusterContext);
 			
 			if (log.isDebugEnabled()) {
 				log.debug("Cloud Controller is delegating request to start a service "+service+
-						" for "+ memberContext + " to Kubernetes layer.");
+						" for "+ containerClusterContext + " to Kubernetes layer.");
 			}
 			
 			kubApi.createService(service);
@@ -1442,44 +1413,81 @@ public class CloudControllerServiceImpl implements CloudControllerService {
 						+ controller + " via Kubernetes layer.");
 			}
 			
-            memberContext.setPublicIpAddress(kubernetesMasterIp);
-            memberContext.setPrivateIpAddress(kubernetesMasterIp);
-            memberContext.setProperties(CloudControllerUtil.addProperty(memberContext
-                    .getProperties(), StratosConstants.ALLOCATED_SERVICE_HOST_PORT,
-                    CloudControllerUtil.getProperty(ctxt.getProperties(),
-                            StratosConstants.ALLOCATED_SERVICE_HOST_PORT)));
-            dataHolder.addMemberContext(memberContext);
-
+			// create a label query
+			Label l = new Label();
+			l.setName(clusterId);
+			// execute the label query
+			Pod[] newlyCreatedPods = kubApi.getSelectedPods(new Label[]{l});
+			List<MemberContext> memberContexts = new ArrayList<MemberContext>();
+			
+			PodToMemberContext podToMemberContextFunc = new PodToMemberContext();
+			// generate Member Contexts
+			for (Pod pod : newlyCreatedPods) {
+                MemberContext context = podToMemberContextFunc.apply(pod);
+                context.setCartridgeType(cartridgeType);
+                context.setClusterId(clusterId);
+                
+                context.setProperties(CloudControllerUtil.addProperty(containerClusterContext
+                        .getProperties(), StratosConstants.ALLOCATED_SERVICE_HOST_PORT,
+                        CloudControllerUtil.getProperty(ctxt.getProperties(),
+                                StratosConstants.ALLOCATED_SERVICE_HOST_PORT)));
+                dataHolder.addMemberContext(context);
+                
+                // trigger topology
+                // update the topology with the newly spawned member
+                TopologyBuilder.handleMemberSpawned(cartridgeType, clusterId, null,
+                        kubernetesMasterIp, kubernetesMasterIp, context);
+                // publish data
+                // TODO
+//                CartridgeInstanceDataPublisher.publish(context.getMemberId(), null, null, context.getClusterId(), cartridgeType, MemberStatus.Created.toString(), node);
+                
+                memberContexts.add(context);
+            }
+			
 			// persist in registry
 			persist();
 
-			// trigger topology
-			// update the topology with the newly spawned member
-			TopologyBuilder.handleMemberSpawned(cartridgeType, clusterId, null,
-					kubernetesMasterIp, kubernetesMasterIp, memberContext);
-
-			// publish data
-			// TODO
-			// CartridgeInstanceDataPublisher.publish(memberID,
-			// memberContext.getPartition().getId(),
-			// memberContext.getNetworkPartitionId(),
-			// memberContext.getClusterId(),
-			// cartridgeType,
-			// MemberStatus.Created.toString(),
-			// node);
+            log.info("Kubernetes entities are successfully starting up. "+containerClusterContext.toString());
 
-            log.info("Kubernetes entities are successfully starting up. "+memberContext.toString());
-
-            return memberContext;
+            return memberContexts.toArray(new MemberContext[0]);
 
         } catch (Exception e) {
-            String msg = "Failed to start an instance. " + memberContext.toString()+" Cause: "+e.getMessage();
+            String msg = "Failed to start an instance. " + containerClusterContext.toString()+" Cause: "+e.getMessage();
             log.error(msg, e);
             throw new IllegalStateException(msg, e);
         }
 	}
 
-	private KubernetesClusterContext getKubernetesClusterContext(
+	private String validateProperty(String property, ClusterContext ctxt) {
+
+	    String propVal = CloudControllerUtil.getProperty(ctxt.getProperties(), property);
+        
+        if (propVal == null) {
+            String msg = "Instance start-up failed. Cannot find '"+
+                    StratosConstants.KUBERNETES_MIN_REPLICAS+"' in " + ctxt;
+            log.error(msg);
+            throw new IllegalArgumentException(msg);
+        }
+        
+        return propVal;
+    }
+	
+	private String validateProperty(String property, ContainerClusterContext ctxt) {
+
+        String propVal = CloudControllerUtil.getProperty(ctxt.getProperties(), property);
+        
+        if (propVal == null) {
+            String msg = "Instance start-up failed. Cannot find '"+
+                    StratosConstants.KUBERNETES_MIN_REPLICAS+"' in " + ctxt;
+            log.error(msg);
+            throw new IllegalArgumentException(msg);
+        }
+        
+        return propVal;
+        
+    }
+
+    private KubernetesClusterContext getKubernetesClusterContext(
 			String kubernetesClusterId, String kubernetesMasterIp,
 			String kubernetesPortRange) {