You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by im...@apache.org on 2013/12/10 13:18:46 UTC

[1/2] git commit: Added partition id to in-flight request count sent from load balancer

Updated Branches:
  refs/heads/master de739901c -> 4bf745608


Added partition id to in-flight request count sent from load balancer


Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/ca4b8f43
Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/ca4b8f43
Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/ca4b8f43

Branch: refs/heads/master
Commit: ca4b8f438e4906d9e38742918168c60a53e4dfc1
Parents: 522db60
Author: Imesh Gunaratne <im...@apache.org>
Authored: Tue Dec 10 17:47:57 2013 +0530
Committer: Imesh Gunaratne <im...@apache.org>
Committed: Tue Dec 10 17:47:57 2013 +0530

----------------------------------------------------------------------
 .../WSO2CEPInFlightRequestPublisher.java        |  9 ++-
 .../extension/api/LoadBalancerStatsReader.java  |  3 +-
 .../TenantAwareLoadBalanceEndpoint.java         | 31 +++++++--
 .../balancer/mediators/ResponseInterceptor.java | 26 +++++--
 ...adBalancerInFlightRequestCountCollector.java | 71 +++++++++++++-------
 .../WSO2CEPInFlightRequestCountObserver.java    | 36 ++++++----
 .../stratos/load/balancer/util/Constants.java   |  3 +
 .../haproxy/extension/HAProxyStatsReader.java   | 34 +++++-----
 8 files changed, 143 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/ca4b8f43/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/WSO2CEPInFlightRequestPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/WSO2CEPInFlightRequestPublisher.java b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/WSO2CEPInFlightRequestPublisher.java
index f10907e..d41bb0b 100644
--- a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/WSO2CEPInFlightRequestPublisher.java
+++ b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/WSO2CEPInFlightRequestPublisher.java
@@ -30,8 +30,7 @@ import java.util.List;
  * WSO2 CEP in flight request count publisher.
  *
  * In-flight request count:
- * Number of requests being served at a given moment could be identified as
- * in-flight request count.
+ * Number of requests being served at a given moment could be identified as in-flight request count.
  */
 public class WSO2CEPInFlightRequestPublisher extends WSO2CEPStatsPublisher {
 
@@ -46,6 +45,7 @@ public class WSO2CEPInFlightRequestPublisher extends WSO2CEPStatsPublisher {
             List<Attribute> payloadData = new ArrayList<Attribute>();
             // Payload definition
             payloadData.add(new Attribute("cluster_id", AttributeType.STRING));
+            payloadData.add(new Attribute("partition_id", AttributeType.STRING));
             payloadData.add(new Attribute("in_flight_requests", AttributeType.INT));
             streamDefinition.setPayloadData(payloadData);
             return streamDefinition;
@@ -61,13 +61,16 @@ public class WSO2CEPInFlightRequestPublisher extends WSO2CEPStatsPublisher {
 
     /**
      * Publish in-flight request count of a cluster.
+     *
      * @param clusterId
+     * @param partitionId
      * @param inFlightRequestCount
      */
-    public void publish(String clusterId, int inFlightRequestCount) {
+    public void publish(String clusterId, String partitionId, int inFlightRequestCount) {
         List<Object> payload = new ArrayList<Object>();
         // Payload values
         payload.add(clusterId);
+        payload.add(partitionId);
         payload.add(inFlightRequestCount);
         super.publish(payload.toArray());
     }

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/ca4b8f43/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerStatsReader.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerStatsReader.java b/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerStatsReader.java
index 2c6f324..a098ef0 100644
--- a/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerStatsReader.java
+++ b/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerStatsReader.java
@@ -29,6 +29,7 @@ public interface LoadBalancerStatsReader {
     /**
      * Get in-flight request count of a given cluster.
      * @param clusterId
+     * @param partitionId
      */
-    int getInFlightRequestCount(String clusterId);
+    int getInFlightRequestCount(String clusterId, String partitionId);
 }

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/ca4b8f43/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java
index 8e3928e..9c97d4b 100644
--- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java
+++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java
@@ -21,6 +21,7 @@ package org.apache.stratos.load.balancer.endpoint;
 
 import org.apache.axis2.addressing.EndpointReference;
 import org.apache.axis2.description.TransportInDescription;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.http.protocol.HTTP;
 import org.apache.stratos.load.balancer.RequestDelegator;
 import org.apache.stratos.load.balancer.algorithm.LoadBalanceAlgorithmFactory;
@@ -238,6 +239,9 @@ public class TenantAwareLoadBalanceEndpoint extends org.apache.synapse.endpoints
         if (httpsPort != null)
             axis2Member.setHttpsPort(httpsPort.getValue());
         axis2Member.setActive(member.isActive());
+        // Set cluster id and partition id in message context
+        synCtx.setProperty(Constants.CLUSTER_ID, member.getClusterId());
+        synCtx.setProperty(Constants.PARTITION_ID, member.getPartitionId());
         return axis2Member;
     }
 
@@ -499,16 +503,14 @@ public class TenantAwareLoadBalanceEndpoint extends org.apache.synapse.endpoints
         setupTransportHeaders(synCtx);
         setupLoadBalancerContextProperties(synCtx);
 
-        // Update health stats
-        LoadBalancerInFlightRequestCountCollector.getInstance().incrementRequestInflightCount(currentMember.getDomain());
-        // Set the cluster id in the message context
-        synCtx.setProperty(Constants.CLUSTER_ID, currentMember.getDomain());
-
         try {
             if (log.isDebugEnabled()) {
                 log.debug(String.format("Sending request to endpoint: %s", to.getAddress()));
             }
             endpoint.send(synCtx);
+
+            // Increment in-flight request count
+            incrementInFlightRequestCount(synCtx);
         } catch (Exception e) {
             if (e.getMessage().toLowerCase().contains("io reactor shutdown")) {
                 log.fatal("System cannot continue normal operation. Restarting", e);
@@ -519,6 +521,25 @@ public class TenantAwareLoadBalanceEndpoint extends org.apache.synapse.endpoints
         }
     }
 
+    private void incrementInFlightRequestCount(MessageContext messageContext) {
+        try {
+            String clusterId = (String) messageContext.getProperty(Constants.CLUSTER_ID);
+            if(StringUtils.isBlank(clusterId)) {
+                throw new RuntimeException("Cluster id not found in message context");
+            }
+            String partitionId = (String) messageContext.getProperty(Constants.PARTITION_ID);
+            if(StringUtils.isBlank(partitionId)) {
+                throw new RuntimeException("Partition id not found in message context");
+            }
+            LoadBalancerInFlightRequestCountCollector.getInstance().incrementInFlightRequestCount(clusterId, partitionId);
+        }
+        catch (Exception e) {
+            if(log.isDebugEnabled()) {
+                log.debug("Could not increment in-flight request count", e);
+            }
+        }
+    }
+
     public void setDispatcher(HttpSessionDispatcher dispatcher) {
         this.dispatcher = dispatcher;
     }

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/ca4b8f43/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/mediators/ResponseInterceptor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/mediators/ResponseInterceptor.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/mediators/ResponseInterceptor.java
index ddf7eb5..1af0b9c 100644
--- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/mediators/ResponseInterceptor.java
+++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/mediators/ResponseInterceptor.java
@@ -18,6 +18,7 @@
  */
 package org.apache.stratos.load.balancer.mediators;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.stratos.load.balancer.statistics.LoadBalancerInFlightRequestCountCollector;
 import org.apache.stratos.load.balancer.util.Constants;
 import org.apache.synapse.ManagedLifecycle;
@@ -30,19 +31,32 @@ import org.apache.synapse.mediators.AbstractMediator;
  */
 public class ResponseInterceptor extends AbstractMediator implements ManagedLifecycle {
 
-    public boolean mediate(MessageContext synCtx) {
-        if (log.isDebugEnabled()) {
-            log.debug("Mediation started " + ResponseInterceptor.class.getName());
+    public boolean mediate(MessageContext messageContext) {
+        try {
+            if (log.isDebugEnabled()) {
+                log.debug("Response interceptor mediation started");
+            }
+            String clusterId = (String) messageContext.getProperty(Constants.CLUSTER_ID);
+            if (StringUtils.isBlank(clusterId)) {
+                throw new RuntimeException("Cluster id not found in message context");
+            }
+            String partitionId = (String) messageContext.getProperty(Constants.PARTITION_ID);
+            if (StringUtils.isBlank(partitionId)) {
+                throw new RuntimeException("Partition id not found in message context");
+            }
+            LoadBalancerInFlightRequestCountCollector.getInstance().decrementInFlightRequestCount(clusterId, partitionId);
+        } catch (Exception e) {
+            if(log.isErrorEnabled()) {
+                log.error("Could not decrement in-flight request count", e);
+            }
         }
-        String clusterId = (String) synCtx.getProperty(Constants.CLUSTER_ID);
-        LoadBalancerInFlightRequestCountCollector.getInstance().decrementRequestInflightCount(clusterId);
         return true;
     }
 
     @Override
     public void destroy() {
         if (log.isDebugEnabled()) {
-            log.debug("ResponseInterceptor mediator destroyed");
+            log.debug("Response interceptor mediator destroyed");
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/ca4b8f43/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerInFlightRequestCountCollector.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerInFlightRequestCountCollector.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerInFlightRequestCountCollector.java
index cbe8e00..497b973 100644
--- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerInFlightRequestCountCollector.java
+++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerInFlightRequestCountCollector.java
@@ -18,6 +18,8 @@
  */
 package org.apache.stratos.load.balancer.statistics;
 
+import org.apache.commons.lang.StringUtils;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.load.balancer.statistics.observers.WSO2CEPInFlightRequestCountObserver;
@@ -28,7 +30,7 @@ import java.util.Observable;
 import java.util.concurrent.ConcurrentHashMap;
 
 /**
- * This is the load balancing stats collector and any observer can get registered here
+ * This is the load balancing in-flight request count collector and any observer can get registered here
  * and receive notifications periodically.
  * This is a Singleton object.
  *
@@ -38,11 +40,12 @@ public class LoadBalancerInFlightRequestCountCollector extends Observable {
     private static final Log log = LogFactory.getLog(LoadBalancerInFlightRequestCountCollector.class);
 
     private static LoadBalancerInFlightRequestCountCollector collector;
-    private Map<String, Integer> clusterIdToRequestInflightCountMap;
+    // Map<ClusterId, Map<PartitionId, InFlightRequestCount>
+    private Map<String, Map<String, Integer>> inFlightRequestCountMap;
     private Thread notifier;
 
     private LoadBalancerInFlightRequestCountCollector() {
-        clusterIdToRequestInflightCountMap = new ConcurrentHashMap<String, Integer>();
+        inFlightRequestCountMap = new ConcurrentHashMap<String, Map<String, Integer>>();
         if (notifier == null || (notifier != null && !notifier.isAlive())) {
             notifier = new Thread(new ObserverNotifier());
             notifier.start();
@@ -62,45 +65,63 @@ public class LoadBalancerInFlightRequestCountCollector extends Observable {
         return collector;
     }
 
-    public void setRequestInflightCount(String clusterId, int value) {
-        if (clusterId == null) {
+    public int getInFlightRequestCount(String clusterId, String partitionId) {
+        if (StringUtils.isBlank(clusterId) || StringUtils.isBlank(partitionId)) {
+            return -1;
+        }
+
+        Map<String, Integer> partitionMap = inFlightRequestCountMap.get(clusterId);
+        if (partitionMap == null) {
+            return 0;
+        }
+        if (partitionMap.containsKey(partitionId)) {
+            return partitionMap.get(partitionId);
+        }
+        return 0;
+    }
+
+    public void setInFlightRequestCount(String clusterId, String partitionId, int value) {
+        if (StringUtils.isBlank(clusterId) || StringUtils.isBlank(partitionId)) {
             return;
         }
 
-        clusterIdToRequestInflightCountMap.put(clusterId, value);
+        Map<String, Integer> partitionMap = inFlightRequestCountMap.get(clusterId);
+        if (partitionMap == null) {
+            partitionMap = new HashMap<String, Integer>();
+            inFlightRequestCountMap.put(clusterId, partitionMap);
+        }
+        partitionMap.put(partitionId, value);
+        if(log.isDebugEnabled()) {
+            log.debug(String.format("In-flight request count updated: [cluster] %s [partition] $s [value] %d", clusterId, partitionId, value));
+        }
         setChanged();
     }
 
-    public void incrementRequestInflightCount(String clusterId) {
-        incrementRequestInflightCount(clusterId, 1);
+    public void incrementInFlightRequestCount(String clusterId, String partitionId) {
+        incrementInFlightRequestCount(clusterId, partitionId, 1);
     }
 
-    public void incrementRequestInflightCount(String clusterId, int value) {
-        if (clusterId == null) {
+    public void incrementInFlightRequestCount(String clusterId, String partitionId, int value) {
+        if (StringUtils.isBlank(clusterId) || StringUtils.isBlank(partitionId)) {
             return;
         }
 
-        if (clusterIdToRequestInflightCountMap.get(clusterId) != null) {
-            value += clusterIdToRequestInflightCountMap.get(clusterId);
-        }
-        clusterIdToRequestInflightCountMap.put(clusterId, value);
-        setChanged();
+        int count = getInFlightRequestCount(clusterId, partitionId);
+        setInFlightRequestCount(clusterId, partitionId, (count + value));
     }
 
-    public void decrementRequestInflightCount(String clusterId) {
-        decrementRequestInflightCount(clusterId, 1);
+    public void decrementInFlightRequestCount(String clusterId, String partitionId) {
+        decrementInFlightRequestCount(clusterId, partitionId, 1);
     }
 
-    public void decrementRequestInflightCount(String clusterId, int value) {
-        if (clusterId == null) {
+    public void decrementInFlightRequestCount(String clusterId, String partitionId, int value) {
+        if (StringUtils.isBlank(clusterId) || StringUtils.isBlank(partitionId)) {
             return;
         }
 
-        if (clusterIdToRequestInflightCountMap.get(clusterId) != null) {
-            value += clusterIdToRequestInflightCountMap.get(clusterId);
-        }
-        clusterIdToRequestInflightCountMap.put(clusterId, value);
-        setChanged();
+        int count = getInFlightRequestCount(clusterId, partitionId);
+        int newValue = (count - value) < 0 ? 0 : (count - value);
+        setInFlightRequestCount(clusterId, partitionId, newValue);
     }
 
 
@@ -121,7 +142,7 @@ public class LoadBalancerInFlightRequestCountCollector extends Observable {
                     Thread.sleep(15000);
                 } catch (InterruptedException ignore) {
                 }
-                LoadBalancerInFlightRequestCountCollector.getInstance().notifyObservers(new HashMap<String, Integer>(clusterIdToRequestInflightCountMap));
+                LoadBalancerInFlightRequestCountCollector.getInstance().notifyObservers(new HashMap<String, Map<String, Integer>>(inFlightRequestCountMap));
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/ca4b8f43/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/observers/WSO2CEPInFlightRequestCountObserver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/observers/WSO2CEPInFlightRequestCountObserver.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/observers/WSO2CEPInFlightRequestCountObserver.java
index 8b7bf5a..f86643b 100644
--- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/observers/WSO2CEPInFlightRequestCountObserver.java
+++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/observers/WSO2CEPInFlightRequestCountObserver.java
@@ -35,22 +35,30 @@ public class WSO2CEPInFlightRequestCountObserver implements Observer {
     }
 
     public void update(Observable observable, Object object) {
-        if (object != null && object instanceof Map<?, ?>) {
-            try {
-                if (publisher.isEnabled()) {
-                    Map<String, Integer> stats = (Map<String, Integer>) object;
-                    // Publish event per cluster id
-                    for (String clusterId : stats.keySet()) {
-                        // Publish event
-                        publisher.publish(clusterId, stats.get(clusterId));
+        try {
+            if (publisher.isEnabled()) {
+                Map<String, Map<String, Integer>> inFlightRequestCountMap = (Map<String, Map<String, Integer>>) object;
+                // Publish event per cluster id
+                Map<String, Integer> partitionMap = null;
+                for (String clusterId : inFlightRequestCountMap.keySet()) {
+                    partitionMap = inFlightRequestCountMap.get(clusterId);
+                    if (partitionMap != null) {
+                        for (String partitionId : partitionMap.keySet()) {
+                            // Publish event
+                            publisher.publish(clusterId, partitionId, partitionMap.get(partitionId));
+                            if (log.isDebugEnabled()) {
+                                log.debug(String.format("In-flight request count published to cep: [cluster-id] %s [partition] %s [value] %d",
+                                        clusterId, partitionId, partitionMap.get(partitionId)));
+                            }
+                        }
                     }
-                } else if (log.isWarnEnabled()) {
-                    log.warn("CEP statistics publisher is disabled");
-                }
-            } catch (Exception e) {
-                if (log.isErrorEnabled()) {
-                    log.error("Could not publish in-flight request count", e);
                 }
+            } else if (log.isWarnEnabled()) {
+                log.warn("CEP statistics publisher is disabled");
+            }
+        } catch (Exception e) {
+            if (log.isErrorEnabled()) {
+                log.error("Could not publish in-flight request count to cep", e);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/ca4b8f43/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/util/Constants.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/util/Constants.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/util/Constants.java
index 96fee1d..959ba90 100644
--- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/util/Constants.java
+++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/util/Constants.java
@@ -21,8 +21,11 @@ package org.apache.stratos.load.balancer.util;
 public class Constants {
 
     public static final String CLUSTER_ID = "cluster_id";
+    public static final String PARTITION_ID = "partition_id";
+
     public static final String LB_HOST_NAME = "LB_HOST_NAME";
     public static final String LB_HTTP_PORT = "LB_HTTP_PORT";
     public static final String LB_HTTPS_PORT = "LB_HTTPS_PORT";
+
     public static final String AXIS2_MSG_CTX_TRANSPORT_IN_URL = "TransportInURL";
 }

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/ca4b8f43/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxyStatsReader.java
----------------------------------------------------------------------
diff --git a/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxyStatsReader.java b/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxyStatsReader.java
index 57c6bc0..43de7d6 100644
--- a/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxyStatsReader.java
+++ b/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxyStatsReader.java
@@ -45,7 +45,7 @@ public class HAProxyStatsReader implements LoadBalancerStatsReader {
     }
 
     @Override
-    public int getInFlightRequestCount(String clusterId) {
+    public int getInFlightRequestCount(String clusterId, String partitionId) {
         String frontendId, backendId, command, output;
         String[] array;
         int totalWeight, weight;
@@ -63,23 +63,25 @@ public class HAProxyStatsReader implements LoadBalancerStatsReader {
                         backendId = frontendId + "-members";
 
                         for (Member member : cluster.getMembers()) {
-                            // echo "get weight <backend>/<server>" | socat stdio <stats-socket>
-                            command = String.format("%s/get-weight.sh %s %s %s", scriptsPath, backendId, member.getMemberId(), statsSocketFilePath);
-                            try {
-                                output = CommandUtil.executeCommand(command);
-                                if ((output != null) && (output.length() > 0)) {
-                                    array = output.split(" ");
-                                    if ((array != null) && (array.length > 0)) {
-                                        weight = Integer.parseInt(array[0]);
-                                        if (log.isDebugEnabled()) {
-                                            log.debug(String.format("Member weight found: [cluster] %s [member] %s [weight] %d", member.getClusterId(), member.getMemberId(), weight));
+                            if((member.getPartitionId() != null) && member.getPartitionId().equals(partitionId)) {
+                                // echo "get weight <backend>/<server>" | socat stdio <stats-socket>
+                                command = String.format("%s/get-weight.sh %s %s %s", scriptsPath, backendId, member.getMemberId(), statsSocketFilePath);
+                                try {
+                                    output = CommandUtil.executeCommand(command);
+                                    if ((output != null) && (output.length() > 0)) {
+                                        array = output.split(" ");
+                                        if ((array != null) && (array.length > 0)) {
+                                            weight = Integer.parseInt(array[0]);
+                                            if (log.isDebugEnabled()) {
+                                                log.debug(String.format("Member weight found: [cluster] %s [member] %s [weight] %d", member.getClusterId(), member.getMemberId(), weight));
+                                            }
+                                            totalWeight += weight;
                                         }
-                                        totalWeight += weight;
                                     }
-                                }
-                            } catch (IOException e) {
-                                if (log.isErrorEnabled()) {
-                                    log.error(e);
+                                } catch (IOException e) {
+                                    if (log.isErrorEnabled()) {
+                                        log.error(e);
+                                    }
                                 }
                             }
                         }


[2/2] git commit: Merge remote-tracking branch 'origin/master'

Posted by im...@apache.org.
Merge remote-tracking branch 'origin/master'


Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/4bf74560
Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/4bf74560
Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/4bf74560

Branch: refs/heads/master
Commit: 4bf745608159ee765921d362d133b6d12808c4ee
Parents: ca4b8f4 de73990
Author: Imesh Gunaratne <im...@apache.org>
Authored: Tue Dec 10 17:48:39 2013 +0530
Committer: Imesh Gunaratne <im...@apache.org>
Committed: Tue Dec 10 17:48:39 2013 +0530

----------------------------------------------------------------------
 .../adc/mgt/client/AutoscalerServiceClient.java |    4 +-
 .../org/apache/stratos/adc/mgt/dao/Cluster.java |    4 +-
 .../adc/mgt/listener/TenantStatusListner.java   |   42 +
 .../ClusterIdToCartridgeSubscriptionMap.java    |   70 +
 .../adc/mgt/lookup/LookupDataHolder.java        |  254 ++
 ...criptionAliasToCartridgeSubscriptionMap.java |   71 +
 .../apache/stratos/adc/mgt/payload/Payload.java |    7 +-
 .../DatabaseBasedPersistenceManager.java        | 2816 +++++++++---------
 .../adc/mgt/persistence/PersistenceManager.java |   28 +-
 .../RegistryBasedPersistenceManager.java        |  125 +
 .../adc/mgt/registry/RegistryManager.java       |  138 +
 .../stratos/adc/mgt/repository/Repository.java  |    4 +-
 .../adc/mgt/retriever/DataRetrievalManager.java |  184 ++
 .../stratos/adc/mgt/subscriber/Subscriber.java  |    4 +-
 .../mgt/subscription/CartridgeSubscription.java |   12 +-
 .../tenancy/SubscriptionTenancyBehaviour.java   |    3 +-
 .../stratos/adc/mgt/utils/Deserializer.java     |   52 +
 .../stratos/adc/mgt/utils/Serializer.java       |   83 +
 .../bean/util/converter/PojoConverter.java      |   14 +-
 .../rest/endpoint/services/ServiceUtils.java    |    2 +-
 .../pom.xml                                     |    8 +-
 21 files changed, 2487 insertions(+), 1438 deletions(-)
----------------------------------------------------------------------