You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@airavata.apache.org by he...@apache.org on 2013/03/12 15:41:27 UTC

svn commit: r1455554 - in /airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/ec2: ./ AmazonInstanceScheduler.java AmazonUtil.java GreedyScheduler.java SchedulingAlgorithm.java

Author: heshan
Date: Tue Mar 12 14:41:26 2013
New Revision: 1455554

URL: http://svn.apache.org/r1455554
Log:
AIRAVATA-782 Adding code for EC2 scheduling algorithm. This is a greedy algorithm which monitors CPU, memory utilization of running instances to schedule the next compute job. Based on the scheduling decision, the next job will run on the instance which is unter-utilized. These, are not yet being used by the Workflow Interpreter. Once, the UI components for EC2 are added to the trunk, we can integrate these to the Workflow Interpreter. 

Added:
    airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/ec2/
    airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/ec2/AmazonInstanceScheduler.java
    airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/ec2/AmazonUtil.java
    airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/ec2/GreedyScheduler.java
    airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/ec2/SchedulingAlgorithm.java

Added: airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/ec2/AmazonInstanceScheduler.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/ec2/AmazonInstanceScheduler.java?rev=1455554&view=auto
==============================================================================
--- airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/ec2/AmazonInstanceScheduler.java (added)
+++ airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/ec2/AmazonInstanceScheduler.java Tue Mar 12 14:41:26 2013
@@ -0,0 +1,212 @@
+package org.apache.airavata.gfac.ec2;
+
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.services.cloudwatch.AmazonCloudWatchClient;
+import com.amazonaws.services.cloudwatch.model.Datapoint;
+import com.amazonaws.services.cloudwatch.model.Dimension;
+import com.amazonaws.services.cloudwatch.model.GetMetricStatisticsRequest;
+import com.amazonaws.services.cloudwatch.model.GetMetricStatisticsResult;
+import com.amazonaws.services.ec2.AmazonEC2Client;
+import com.amazonaws.services.ec2.model.DescribeInstancesResult;
+import com.amazonaws.services.ec2.model.Instance;
+import com.amazonaws.services.ec2.model.Reservation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.security.NoSuchAlgorithmException;
+import java.security.spec.InvalidKeySpecException;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+public class AmazonInstanceScheduler {
+    private static final Logger log = LoggerFactory.getLogger(AmazonInstanceScheduler.class);
+
+    /* Maximum number of instances that the Scheduler will create*/
+    //private static final int MAX_INSTANCE_COUNT = 3;
+
+    /* Maximum number of minutes an instance should be kept alive*/
+    public static final int INSTANCE_UP_TIME_THRESHOLD = 60;
+
+    private static volatile AmazonInstanceScheduler scheduler = null;
+
+    private static String imageId = null;
+
+    private static AWSCredentials credential = null;
+
+    private static AmazonEC2Client ec2client = null;
+
+    /* The time interval(minutes) in which the instances will be checked whether they have timed-out*/
+    public static final long TERMINATE_THREAD_UPDATE_INTERVAL = 5;
+
+    public static AmazonInstanceScheduler getInstance(String imageId, String accessKey, String secretKey)
+            throws IOException, InvalidKeySpecException, NoSuchAlgorithmException {
+
+        if(scheduler == null) {
+            synchronized (AmazonInstanceScheduler.class) {
+                if(scheduler == null) {
+                    new Thread() {
+                        @Override
+                        public void run() {
+                            //noinspection InfiniteLoopStatement
+                            while(true) {
+                                try {
+                                    Thread.sleep(TERMINATE_THREAD_UPDATE_INTERVAL * 60 * 1000);
+                                } catch (InterruptedException e ) {
+                                    // do-nothing
+                                }
+
+                                try {
+                                    terminateTimedOutAmazonInstances();
+                                } catch (Throwable e) {
+                                    e.printStackTrace();
+                                }
+                            }
+
+                        }
+
+                    }.start();
+
+                    scheduler = new AmazonInstanceScheduler();
+                }
+            }
+        }
+
+        AmazonInstanceScheduler.imageId = imageId;
+        AmazonInstanceScheduler.credential = new BasicAWSCredentials(accessKey, secretKey);
+        AmazonInstanceScheduler.ec2client = new AmazonEC2Client(credential);
+
+        return scheduler;
+    }
+
+
+    /**
+     * Returns the amazon instance id of the amazon instance which is having the minimum
+     * CPU utilization (out of the already running instances). If the instance which
+     * is having the minimum CPU utilization exceeds 80%, ami-id will be returned
+     * instead of a an instance id. If a particular running instance's uptime is
+     * greater than 55 minutes, that instance will be shut down.
+     *
+     * @return instance id
+     * @throws NoSuchAlgorithmException
+     * @throws InvalidKeySpecException
+     * @throws IOException
+     */
+    public String getScheduledAmazonInstance()
+            throws NoSuchAlgorithmException, InvalidKeySpecException, IOException {
+
+        SchedulingAlgorithm greedyAglo = new GreedyScheduler();
+        return greedyAglo.getScheduledAmazonInstance(ec2client,imageId, credential);
+    }
+
+    /**
+     * Terminates the Amazon instances that are timed out. Timed out refers to the
+     * instances which have been running for more than the INSTANCE_UP_TIME_THRESHOLD.
+     */
+    private static void terminateTimedOutAmazonInstances(){
+        System.out.println("Checking for timed-out instances");
+        List<Instance> instanceList = loadInstances(ec2client);
+        for (Instance instance : instanceList) {
+            String instanceId = instance.getInstanceId();
+
+            long upTime = getInstanceUptime(instance);
+            // if the instance up time is greater than the threshold, terminate the instance
+            if (upTime > INSTANCE_UP_TIME_THRESHOLD) {
+                List<String> requestIds = new ArrayList<String>();
+                requestIds.add(instanceId);
+                // terminate instance
+                System.out.println("Terminating the instance " + instanceId +
+                        " as the up time threshold is exceeded");
+                AmazonUtil.terminateInstances(requestIds);
+            }
+        }
+
+    }
+
+    /**
+     * Calculates the instance up time in minutes.
+     *
+     * @param instance instance to be monitored.
+     * @return up time of the instance.
+     */
+    private static long getInstanceUptime(Instance instance) {
+        Date startTime = instance.getLaunchTime();
+        Date today = new Date();
+        long diff = (today.getTime() - startTime.getTime()) / (1000 * 60);
+        System.out.println("Instance launch time   : " + startTime);
+        System.out.println("Instance up time (mins): " + diff);
+        return diff;
+    }
+
+    /**
+     * Monitors the CPU Utilization using Amazon Cloud Watch. In order to monitor the instance, Cloud Watch Monitoring
+     * should be enabled for the running instance.
+     *
+     * @param credential EC2 credentials
+     * @param instanceId instance id
+     * @return average CPU utilization of the instance
+     */
+    public static double monitorInstance(AWSCredentials credential, String instanceId) {
+        try {
+            AmazonCloudWatchClient cw = new AmazonCloudWatchClient(credential) ;
+
+            long offsetInMilliseconds = 1000 * 60 * 60 * 24;
+            GetMetricStatisticsRequest request = new GetMetricStatisticsRequest()
+                    .withStartTime(new Date(new Date().getTime() - offsetInMilliseconds))
+                    .withNamespace("AWS/EC2")
+                    .withPeriod(60 * 60)
+                    .withDimensions(new Dimension().withName("InstanceId").withValue(instanceId))
+                    .withMetricName("CPUUtilization")
+                    .withStatistics("Average", "Maximum")
+                    .withEndTime(new Date());
+            GetMetricStatisticsResult getMetricStatisticsResult = cw.getMetricStatistics(request);
+
+            double avgCPUUtilization = 0;
+            List dataPoint = getMetricStatisticsResult.getDatapoints();
+            for (Object aDataPoint : dataPoint) {
+                Datapoint dp = (Datapoint) aDataPoint;
+                avgCPUUtilization = dp.getAverage();
+                log.info(instanceId + " instance's average CPU utilization : " + dp.getAverage());
+            }
+
+            return avgCPUUtilization;
+
+        } catch (AmazonServiceException ase) {
+            log.error("Caught an AmazonServiceException, which means the request was made  "
+                    + "to Amazon EC2, but was rejected with an error response for some reason.");
+            log.error("Error Message:    " + ase.getMessage());
+            log.error("HTTP Status Code: " + ase.getStatusCode());
+            log.error("AWS Error Code:   " + ase.getErrorCode());
+            log.error("Error Type:       " + ase.getErrorType());
+            log.error("Request ID:       " + ase.getRequestId());
+
+        }
+        return 0;
+    }
+
+    /**
+     * Load instances associated with the given ec2 client
+     *
+     * @param ec2client ec2 client
+     * @return list of instances
+     */
+    public static List<Instance> loadInstances(AmazonEC2Client ec2client) {
+        List<Instance> resultList = new ArrayList<Instance>();
+        DescribeInstancesResult describeInstancesResult = ec2client.describeInstances();
+        List<Reservation> reservations = describeInstancesResult.getReservations();
+        for (Reservation reservation : reservations) {
+            for (Instance instance : reservation.getInstances()) {
+                System.out.println("instance       : " + instance);
+                if ("running".equalsIgnoreCase(instance.getState().getName())) {
+                    resultList.add(instance);
+                }
+            }
+        }
+        return resultList;
+    }
+
+}
+

Added: airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/ec2/AmazonUtil.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/ec2/AmazonUtil.java?rev=1455554&view=auto
==============================================================================
--- airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/ec2/AmazonUtil.java (added)
+++ airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/ec2/AmazonUtil.java Tue Mar 12 14:41:26 2013
@@ -0,0 +1,121 @@
+package org.apache.airavata.gfac.ec2;
+
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.services.ec2.AmazonEC2;
+import com.amazonaws.services.ec2.AmazonEC2Client;
+import com.amazonaws.services.ec2.model.*;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+public class AmazonUtil {
+
+    /* Amazon EC2 instance type */
+    public final static String[] INSTANCE_TYPE =
+            { "t1.micro", "m1.small", "m1.large", "m1.xlarge", "m2.xlarge", "m2.2xlarge",
+                    "m2.4xlarge", "c1.medium", "c1.xlarge" };
+
+    private static AmazonEC2 getEC2Client() {
+        // TODO Heshan : Fix this properly after adding UI components.
+        String accessKey = "";
+        String secretKey = "";
+        AmazonEC2 ec2 = new AmazonEC2Client(new BasicAWSCredentials(accessKey, secretKey));
+        return ec2;
+    }
+
+    /**
+     * Launch a new EC2 instance
+     *
+     * @param amiId
+     * @param type
+     * @param number
+     * @return list of newly launched instances
+     */
+    public static List<Instance> launchInstance(String amiId, String type, Integer number) {
+        List<Instance> resultList = new ArrayList<Instance>();
+
+        RunInstancesRequest request = new RunInstancesRequest(amiId, number, number);
+        request.setInstanceType(type);
+
+        RunInstancesResult result = getEC2Client().runInstances(request);
+        resultList.addAll(result.getReservation().getInstances());
+        return resultList;
+    }
+
+    /**
+     * Launch a new EC2 instance
+     *
+     * @param amiId
+     * @param type
+     * @param number
+     * @param keyname
+     * @return list of newly launched instances
+     */
+    public static List<Instance> launchInstance(String amiId, String type, Integer number, String keyname) {
+        List<Instance> resultList = new ArrayList<Instance>();
+
+        RunInstancesRequest request = new RunInstancesRequest(amiId, number, number);
+        request.setInstanceType(type);
+        request.setKeyName(keyname);
+
+        RunInstancesResult result = getEC2Client().runInstances(request);
+        resultList.addAll(result.getReservation().getInstances());
+        return resultList;
+    }
+
+    /**
+     * Load instances
+     *
+     * @return list of instances
+     */
+    public static List<Instance> loadInstances() {
+        List<Instance> resultList = new ArrayList<Instance>();
+        DescribeInstancesResult describeInstancesResult = getEC2Client().describeInstances();
+        List<Reservation> reservations = describeInstancesResult.getReservations();
+        for (Iterator<Reservation> iterator = reservations.iterator(); iterator.hasNext();) {
+            Reservation reservation = iterator.next();
+            for (Instance instance : reservation.getInstances()) {
+                resultList.add(instance);
+            }
+        }
+        return resultList;
+    }
+
+    /**
+     * Load key pairs
+     *
+     * @return list of keypairs
+     */
+    public static List<String> loadKeypairs(){
+        List<String> resultList = new ArrayList<String>();
+        DescribeKeyPairsResult results = getEC2Client().describeKeyPairs();
+        for (KeyPairInfo key : results.getKeyPairs()) {
+            resultList.add(key.getKeyName());
+        }
+        return resultList;
+    }
+
+    /**
+     * Terminate instances
+     *
+     * @param instanceIds instance ids of the running instances.
+     */
+    public static void terminateInstances(List<String> instanceIds) {
+        // terminate
+        TerminateInstancesRequest request = new TerminateInstancesRequest(instanceIds);
+        getEC2Client().terminateInstances(request);
+    }
+
+    /**
+     * Terminate instances
+     *
+     * @param instanceIds  instance ids of the running instances.
+     */
+    public static void terminateInstances(String... instanceIds) {
+        // terminate
+        TerminateInstancesRequest request = new TerminateInstancesRequest();
+        getEC2Client().terminateInstances(request.withInstanceIds(instanceIds));
+    }
+
+}

Added: airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/ec2/GreedyScheduler.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/ec2/GreedyScheduler.java?rev=1455554&view=auto
==============================================================================
--- airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/ec2/GreedyScheduler.java (added)
+++ airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/ec2/GreedyScheduler.java Tue Mar 12 14:41:26 2013
@@ -0,0 +1,72 @@
+package org.apache.airavata.gfac.ec2;
+
+
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.services.ec2.AmazonEC2Client;
+import com.amazonaws.services.ec2.model.Instance;
+
+import java.io.IOException;
+import java.security.NoSuchAlgorithmException;
+import java.security.spec.InvalidKeySpecException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class GreedyScheduler implements SchedulingAlgorithm {
+
+    /**
+     * Returns the amazon instance id of the amazon instance which is having the minimum
+     * CPU utilization (out of the already running instances). If the instance which
+     * is having the minimum CPU utilization exceeds 80%, ami-id will be returned
+     * instead of a an instance id. If a particular running instance's uptime is
+     * greater than 55 minutes, that instance will be shut down.
+     *
+     * @return instance id
+     * @throws java.security.NoSuchAlgorithmException
+     * @throws java.security.spec.InvalidKeySpecException
+     * @throws java.io.IOException
+     */
+    public String getScheduledAmazonInstance(AmazonEC2Client ec2client, String imageId, AWSCredentials credential)
+            throws NoSuchAlgorithmException, InvalidKeySpecException, IOException {
+
+        Map<String, Double> instanceUtilMap = new HashMap<String, Double>();
+        List<Instance> instanceList = AmazonInstanceScheduler.loadInstances(ec2client);
+        // If there are no instances created at this point return the imageId
+        if(instanceList.isEmpty()){
+            return imageId;
+        }
+
+        for (Instance instance : instanceList) {
+            String instanceImageId = instance.getImageId();
+            String instanceId = instance.getInstanceId();
+            double avgCPUUtilization = AmazonInstanceScheduler.monitorInstance(credential, instanceId);
+
+            System.out.println("Image id         : " + instanceImageId);
+            System.out.println("Instance id      : " + instanceId);
+            System.out.println("CPU Utilization  : " + avgCPUUtilization);
+
+            //Storing the instance id, if that particular instance was created by the given AMI(imageId)
+            if(imageId.equalsIgnoreCase(instanceImageId)) {
+                instanceUtilMap.put(instanceId, avgCPUUtilization);
+            }
+        }
+
+        // Selects the instance with minimum CPU utilization
+        Map.Entry<String, Double> min = null;
+        for (Map.Entry<String, Double> entry : instanceUtilMap.entrySet()) {
+            if (min == null || min.getValue() > entry.getValue()) {
+                min = entry;
+            }
+        }
+
+        if((min!=null) && (min.getValue()<80)) {
+            System.out.println("Use the existing instance " + min.getKey() + " with CPU Utilization : " + min.getValue());
+            return min.getKey();
+        } else {
+            System.out.println("Create a new instance using AMI : " + imageId);
+            return imageId;
+        }
+    }
+
+}
+

Added: airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/ec2/SchedulingAlgorithm.java
URL: http://svn.apache.org/viewvc/airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/ec2/SchedulingAlgorithm.java?rev=1455554&view=auto
==============================================================================
--- airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/ec2/SchedulingAlgorithm.java (added)
+++ airavata/trunk/modules/gfac-core/src/main/java/org/apache/airavata/gfac/ec2/SchedulingAlgorithm.java Tue Mar 12 14:41:26 2013
@@ -0,0 +1,15 @@
+package org.apache.airavata.gfac.ec2;
+
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.services.ec2.AmazonEC2Client;
+
+import java.io.IOException;
+import java.security.NoSuchAlgorithmException;
+import java.security.spec.InvalidKeySpecException;
+
+public interface SchedulingAlgorithm {
+
+    String getScheduledAmazonInstance(AmazonEC2Client ec2client, String imageId, AWSCredentials credential)
+            throws NoSuchAlgorithmException, InvalidKeySpecException, IOException;
+}
+