You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by im...@apache.org on 2013/11/06 07:13:23 UTC

git commit: Moved load balancing statistics reporting logic to load balancer common component

Updated Branches:
  refs/heads/master a103b79a8 -> 73d4a873f


Moved load balancing statistics reporting logic to load balancer common component


Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/73d4a873
Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/73d4a873
Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/73d4a873

Branch: refs/heads/master
Commit: 73d4a873f7c6879265f0ae6addadfc245e70a53c
Parents: a103b79
Author: Imesh Gunaratne <im...@apache.org>
Authored: Wed Nov 6 11:43:10 2013 +0530
Committer: Imesh Gunaratne <im...@apache.org>
Committed: Wed Nov 6 11:43:10 2013 +0530

----------------------------------------------------------------------
 .../pom.xml                                     |  25 +++++
 .../statistics/LoadBalancingStatsCollector.java | 110 +++++++++++++++++++
 .../observers/WSO2CEPStatsObserver.java         | 102 +++++++++++++++++
 .../org.apache.stratos.load.balancer/pom.xml    |  32 +++---
 .../TenantAwareLoadBalanceEndpoint.java         |   2 +-
 .../balancer/mediators/ResponseInterceptor.java |   4 +-
 .../stat/LoadBalancingStatsCollector.java       | 110 -------------------
 .../stat/observers/WSO2CEPStatsObserver.java    | 102 -----------------
 8 files changed, 257 insertions(+), 230 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/73d4a873/components/org.apache.stratos.load.balancer.common/pom.xml
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer.common/pom.xml b/components/org.apache.stratos.load.balancer.common/pom.xml
index 9ba9aac..996352a 100644
--- a/components/org.apache.stratos.load.balancer.common/pom.xml
+++ b/components/org.apache.stratos.load.balancer.common/pom.xml
@@ -49,6 +49,31 @@
             <artifactId>org.wso2.carbon.logging</artifactId>
             <version>${wso2carbon.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.stratos</groupId>
+            <artifactId>org.apache.stratos.messaging</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.thrift</groupId>
+            <artifactId>libthrift</artifactId>
+            <version>0.9.1</version>
+        </dependency>
+        <dependency>
+            <groupId>org.wso2.carbon</groupId>
+            <artifactId>org.wso2.carbon.databridge.agent.thrift</artifactId>
+            <version>4.2.0</version>
+        </dependency>
+        <dependency>
+            <groupId>org.wso2.carbon</groupId>
+            <artifactId>org.wso2.carbon.databridge.commons.thrift</artifactId>
+            <version>4.2.0</version>
+        </dependency>
+        <dependency>
+            <groupId>org.wso2.carbon</groupId>
+            <artifactId>org.wso2.carbon.databridge.commons</artifactId>
+            <version>4.2.0</version>
+        </dependency>
     </dependencies>
 
     <build>

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/73d4a873/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/LoadBalancingStatsCollector.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/LoadBalancingStatsCollector.java b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/LoadBalancingStatsCollector.java
new file mode 100644
index 0000000..a55fc22
--- /dev/null
+++ b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/LoadBalancingStatsCollector.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one 
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
+ * KIND, either express or implied.  See the License for the 
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.load.balancer.common.statistics;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Observable;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.stratos.load.balancer.common.statistics.observers.WSO2CEPStatsObserver;
+
+/**
+ * This is the load balancing stats collector and any observer can get registered here 
+ * and receive notifications periodically.
+ * This is a Singleton object.
+ * @author nirmal
+ *
+ */
+public class LoadBalancingStatsCollector extends Observable{
+
+	private static LoadBalancingStatsCollector collector;
+	private Map<String, Integer> clusterIdToRequestInflightCountMap;
+	private Thread notifier;
+	
+	private LoadBalancingStatsCollector() {
+		clusterIdToRequestInflightCountMap = new ConcurrentHashMap<String, Integer>();
+		if (notifier == null || (notifier != null && !notifier.isAlive())) {
+			notifier = new Thread(new ObserverNotifier());
+			notifier.start();
+		}
+    }
+	
+	public static LoadBalancingStatsCollector getInstance() {
+		if (collector == null) {
+			synchronized (LoadBalancingStatsCollector.class) {
+				if (collector == null) {
+					collector = new LoadBalancingStatsCollector();
+					// add observers
+					collector.addObserver(new WSO2CEPStatsObserver());
+				}
+			}
+		}
+		return collector;
+	}
+	
+	public void incrementRequestInflightCount(String clusterId) {
+		if(clusterId == null) {
+			return;
+		}
+		
+		int value = 1;
+		if(clusterIdToRequestInflightCountMap.get(clusterId) != null) {
+			value += clusterIdToRequestInflightCountMap.get(clusterId);
+		}
+		clusterIdToRequestInflightCountMap.put(clusterId, value);
+		setChanged();
+	}
+	
+	public void decrementRequestInflightCount(String clusterId) {
+		if(clusterId == null) {
+			return;
+		}
+		
+		int value = -1;
+		if(clusterIdToRequestInflightCountMap.get(clusterId) != null) {
+			value += clusterIdToRequestInflightCountMap.get(clusterId);
+		}
+		clusterIdToRequestInflightCountMap.put(clusterId, value);
+		setChanged();
+	}
+	
+	
+	/**
+	 * This thread will notify all the observers of this subject.
+	 * @author nirmal
+	 *
+	 */
+	private class ObserverNotifier implements Runnable {
+
+		@Override
+        public void run() {
+			while(true) {
+				try {
+	                Thread.sleep(15000);
+                } catch (InterruptedException ignore) {
+                }
+				LoadBalancingStatsCollector.getInstance().notifyObservers(new HashMap<String, Integer>(clusterIdToRequestInflightCountMap));
+			}
+	        
+        }
+		
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/73d4a873/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/observers/WSO2CEPStatsObserver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/observers/WSO2CEPStatsObserver.java b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/observers/WSO2CEPStatsObserver.java
new file mode 100644
index 0000000..f53c5a5
--- /dev/null
+++ b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/observers/WSO2CEPStatsObserver.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one 
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
+ * KIND, either express or implied.  See the License for the 
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.load.balancer.common.statistics.observers;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Observable;
+import java.util.Observer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.wso2.carbon.databridge.agent.thrift.Agent;
+import org.wso2.carbon.databridge.agent.thrift.AsyncDataPublisher;
+import org.wso2.carbon.databridge.agent.thrift.conf.AgentConfiguration;
+import org.wso2.carbon.databridge.agent.thrift.exception.AgentException;
+import org.wso2.carbon.databridge.commons.Event;
+import org.wso2.carbon.utils.CarbonUtils;
+
+public class WSO2CEPStatsObserver implements Observer{
+
+	private static final Log log = LogFactory.getLog(WSO2CEPStatsObserver.class);
+	private static final String CALL_CENTER_DATA_STREAM = "stratos.lb.stats";
+	private static final String VERSION = "1.0.0";
+	private AsyncDataPublisher asyncDataPublisher;
+	
+	public WSO2CEPStatsObserver() {
+		AgentConfiguration agentConfiguration = new AgentConfiguration();
+        // TODO get following from somewhere, without hard-coding.
+        System.setProperty("javax.net.ssl.trustStore", CarbonUtils.getCarbonHome()+File.separator+"repository"+
+                           File.separator+"resources"+File.separator+"security"+File.separator+"client-truststore.jks"	);
+        System.setProperty("javax.net.ssl.trustStorePassword", "wso2carbon");
+        
+        Agent agent = new Agent(agentConfiguration);
+        //TODO read following from a config file?
+        String ip = System.getProperty("thrift.receiver.ip");
+        String port = System.getProperty("thrift.receiver.port");
+        //Using Asynchronous data publisher
+        asyncDataPublisher = new AsyncDataPublisher("tcp://"+ip+":"+port+"", "admin", "admin", agent);
+        String streamDefinition = "{" +
+                                  " 'name':'" + CALL_CENTER_DATA_STREAM + "'," +
+                                  " 'version':'" + VERSION + "'," +
+                                  " 'nickName': 'lb stats'," +
+                                  " 'description': 'lb stats'," +
+                                  " 'metaData':[]," +
+                                  " 'payloadData':[" +
+                                  " {'name':'cluster_id','type':'STRING'}," +
+                                  " {'name':'in_flight_requests','type':'INT'}" +
+                                  " ]" +
+                                  "}";
+        asyncDataPublisher.addStreamDefinition(streamDefinition, CALL_CENTER_DATA_STREAM, VERSION);
+    }
+	
+	public void update(Observable arg0, Object arg1) {
+		if(arg1 != null && arg1 instanceof Map<?, ?>) {
+			Map<String, Integer> stats = (Map<String, Integer>)arg1;
+			publishEvents(stats);
+		}
+	}
+	
+    private void publishEvents(Map<String, Integer> stats) {
+    	
+    	for (Map.Entry<String, Integer> entry : stats.entrySet()) {
+    		
+    		Object[] payload = new Object[]{entry.getKey(), entry.getValue()};
+    		Event event = eventObject(null, null, payload, new HashMap<String, String>());
+    		try {
+    			asyncDataPublisher.publish(CALL_CENTER_DATA_STREAM, VERSION, event);
+    		} catch (AgentException e) {
+    			log.error("Failed to publish events. ", e);
+    		}
+    		
+    	}
+    	stats = null;
+    }
+    
+    private static Event eventObject(Object[] correlationData, Object[] metaData,
+                                     Object[] payLoadData, HashMap<String, String> map) {
+        Event event = new Event();
+        event.setCorrelationData(correlationData);
+        event.setMetaData(metaData);
+        event.setPayloadData(payLoadData);
+        event.setArbitraryDataMap(map);
+        return event;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/73d4a873/components/org.apache.stratos.load.balancer/pom.xml
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer/pom.xml b/components/org.apache.stratos.load.balancer/pom.xml
index a88806e..a763770 100644
--- a/components/org.apache.stratos.load.balancer/pom.xml
+++ b/components/org.apache.stratos.load.balancer/pom.xml
@@ -32,7 +32,7 @@
     <packaging>bundle</packaging>
     <name>Apache Stratos - Load Balancer</name>
     <url>http://apache.org</url>
-    
+
     <dependencies>
         <dependency>
             <groupId>org.wso2.carbon</groupId>
@@ -54,7 +54,7 @@
             <artifactId>org.wso2.carbon.logging</artifactId>
             <version>${wso2carbon.version}</version>
         </dependency>
-	<dependency>
+        <dependency>
             <groupId>org.apache.axis2.wso2</groupId>
             <artifactId>axis2</artifactId>
             <version>${axis2.wso2.version}</version>
@@ -70,6 +70,16 @@
             <version>${project.version}</version>
         </dependency>
         <dependency>
+            <groupId>org.apache.stratos</groupId>
+            <artifactId>org.apache.stratos.load.balancer.common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.thrift</groupId>
+            <artifactId>libthrift</artifactId>
+            <version>0.9.1</version>
+        </dependency>
+        <dependency>
             <groupId>org.wso2.carbon</groupId>
             <artifactId>org.wso2.carbon.databridge.agent.thrift</artifactId>
             <version>4.2.0</version>
@@ -84,18 +94,12 @@
             <artifactId>org.wso2.carbon.databridge.commons</artifactId>
             <version>4.2.0</version>
         </dependency>
-        
-		<dependency>
-		  <groupId>org.apache.thrift</groupId>
-		  <artifactId>libthrift</artifactId>
-		  <version>0.9.1</version>
-		</dependency>
-		<dependency>
-			<groupId>commons-pool</groupId>
-			<artifactId>commons-pool</artifactId>
-			<version>1.6</version>
-		</dependency>
-        
+        <dependency>
+            <groupId>commons-pool</groupId>
+            <artifactId>commons-pool</artifactId>
+            <version>1.6</version>
+        </dependency>
+
     </dependencies>
 
     <build>

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/73d4a873/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java
index a132b30..4a05335 100644
--- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java
+++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java
@@ -24,7 +24,7 @@ import org.apache.axis2.description.TransportInDescription;
 import org.apache.http.protocol.HTTP;
 import org.apache.stratos.load.balancer.RequestDelegator;
 import org.apache.stratos.load.balancer.algorithm.LoadBalanceAlgorithmFactory;
-import org.apache.stratos.load.balancer.stat.LoadBalancingStatsCollector;
+import org.apache.stratos.load.balancer.common.statistics.LoadBalancingStatsCollector;
 import org.apache.stratos.load.balancer.util.Constants;
 import org.apache.stratos.messaging.domain.topology.Member;
 import org.apache.stratos.messaging.domain.topology.Port;

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/73d4a873/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/mediators/ResponseInterceptor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/mediators/ResponseInterceptor.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/mediators/ResponseInterceptor.java
index fc6d84c..9108be5 100644
--- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/mediators/ResponseInterceptor.java
+++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/mediators/ResponseInterceptor.java
@@ -18,13 +18,11 @@
  */
 package org.apache.stratos.load.balancer.mediators;
 
-import org.apache.axis2.context.ConfigurationContext;
-import org.apache.stratos.load.balancer.stat.LoadBalancingStatsCollector;
+import org.apache.stratos.load.balancer.common.statistics.LoadBalancingStatsCollector;
 import org.apache.stratos.load.balancer.util.Constants;
 import org.apache.synapse.ManagedLifecycle;
 import org.apache.synapse.MessageContext;
 import org.apache.synapse.core.SynapseEnvironment;
-import org.apache.synapse.core.axis2.Axis2MessageContext;
 import org.apache.synapse.mediators.AbstractMediator;
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/73d4a873/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/stat/LoadBalancingStatsCollector.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/stat/LoadBalancingStatsCollector.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/stat/LoadBalancingStatsCollector.java
deleted file mode 100644
index a0adc93..0000000
--- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/stat/LoadBalancingStatsCollector.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one 
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
- * KIND, either express or implied.  See the License for the 
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.load.balancer.stat;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Observable;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.stratos.load.balancer.stat.observers.WSO2CEPStatsObserver;
-
-/**
- * This is the load balancing stats collector and any observer can get registered here 
- * and receive notifications periodically.
- * This is a Singleton object.
- * @author nirmal
- *
- */
-public class LoadBalancingStatsCollector extends Observable{
-
-	private static LoadBalancingStatsCollector collector;
-	private Map<String, Integer> clusterIdToRequestInflightCountMap;
-	private Thread notifier;
-	
-	private LoadBalancingStatsCollector() {
-		clusterIdToRequestInflightCountMap = new ConcurrentHashMap<String, Integer>();
-		if (notifier == null || (notifier != null && !notifier.isAlive())) {
-			notifier = new Thread(new ObserverNotifier());
-			notifier.start();
-		}
-    }
-	
-	public static LoadBalancingStatsCollector getInstance() {
-		if (collector == null) {
-			synchronized (LoadBalancingStatsCollector.class) {
-				if (collector == null) {
-					collector = new LoadBalancingStatsCollector();
-					// add observers
-					collector.addObserver(new WSO2CEPStatsObserver());
-				}
-			}
-		}
-		return collector;
-	}
-	
-	public void incrementRequestInflightCount(String clusterId) {
-		if(clusterId == null) {
-			return;
-		}
-		
-		int value = 1;
-		if(clusterIdToRequestInflightCountMap.get(clusterId) != null) {
-			value += clusterIdToRequestInflightCountMap.get(clusterId);
-		}
-		clusterIdToRequestInflightCountMap.put(clusterId, value);
-		setChanged();
-	}
-	
-	public void decrementRequestInflightCount(String clusterId) {
-		if(clusterId == null) {
-			return;
-		}
-		
-		int value = -1;
-		if(clusterIdToRequestInflightCountMap.get(clusterId) != null) {
-			value += clusterIdToRequestInflightCountMap.get(clusterId);
-		}
-		clusterIdToRequestInflightCountMap.put(clusterId, value);
-		setChanged();
-	}
-	
-	
-	/**
-	 * This thread will notify all the observers of this subject.
-	 * @author nirmal
-	 *
-	 */
-	private class ObserverNotifier implements Runnable {
-
-		@Override
-        public void run() {
-			while(true) {
-				try {
-	                Thread.sleep(15000);
-                } catch (InterruptedException ignore) {
-                }
-				LoadBalancingStatsCollector.getInstance().notifyObservers(new HashMap<String, Integer>(clusterIdToRequestInflightCountMap));
-			}
-	        
-        }
-		
-	}
-	
-}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/73d4a873/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/stat/observers/WSO2CEPStatsObserver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/stat/observers/WSO2CEPStatsObserver.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/stat/observers/WSO2CEPStatsObserver.java
deleted file mode 100644
index 6093ee5..0000000
--- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/stat/observers/WSO2CEPStatsObserver.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one 
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
- * KIND, either express or implied.  See the License for the 
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.load.balancer.stat.observers;
-
-import java.io.File;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Observable;
-import java.util.Observer;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.wso2.carbon.databridge.agent.thrift.Agent;
-import org.wso2.carbon.databridge.agent.thrift.AsyncDataPublisher;
-import org.wso2.carbon.databridge.agent.thrift.conf.AgentConfiguration;
-import org.wso2.carbon.databridge.agent.thrift.exception.AgentException;
-import org.wso2.carbon.databridge.commons.Event;
-import org.wso2.carbon.utils.CarbonUtils;
-
-public class WSO2CEPStatsObserver implements Observer{
-
-	private static final Log log = LogFactory.getLog(WSO2CEPStatsObserver.class);
-	private static final String CALL_CENTER_DATA_STREAM = "stratos.lb.stats";
-	private static final String VERSION = "1.0.0";
-	private AsyncDataPublisher asyncDataPublisher;
-	
-	public WSO2CEPStatsObserver() {
-		AgentConfiguration agentConfiguration = new AgentConfiguration();
-        // TODO get following from somewhere, without hard-coding.
-        System.setProperty("javax.net.ssl.trustStore", CarbonUtils.getCarbonHome()+File.separator+"repository"+
-                           File.separator+"resources"+File.separator+"security"+File.separator+"client-truststore.jks"	);
-        System.setProperty("javax.net.ssl.trustStorePassword", "wso2carbon");
-        
-        Agent agent = new Agent(agentConfiguration);
-        //TODO read following from a config file?
-        String ip = System.getProperty("thrift.receiver.ip");
-        String port = System.getProperty("thrift.receiver.port");
-        //Using Asynchronous data publisher
-        asyncDataPublisher = new AsyncDataPublisher("tcp://"+ip+":"+port+"", "admin", "admin", agent);
-        String streamDefinition = "{" +
-                                  " 'name':'" + CALL_CENTER_DATA_STREAM + "'," +
-                                  " 'version':'" + VERSION + "'," +
-                                  " 'nickName': 'lb stats'," +
-                                  " 'description': 'lb stats'," +
-                                  " 'metaData':[]," +
-                                  " 'payloadData':[" +
-                                  " {'name':'cluster_id','type':'STRING'}," +
-                                  " {'name':'in_flight_requests','type':'INT'}" +
-                                  " ]" +
-                                  "}";
-        asyncDataPublisher.addStreamDefinition(streamDefinition, CALL_CENTER_DATA_STREAM, VERSION);
-    }
-	
-	public void update(Observable arg0, Object arg1) {
-		if(arg1 != null && arg1 instanceof Map<?, ?>) {
-			Map<String, Integer> stats = (Map<String, Integer>)arg1;
-			publishEvents(stats);
-		}
-	}
-	
-    private void publishEvents(Map<String, Integer> stats) {
-    	
-    	for (Map.Entry<String, Integer> entry : stats.entrySet()) {
-    		
-    		Object[] payload = new Object[]{entry.getKey(), entry.getValue()};
-    		Event event = eventObject(null, null, payload, new HashMap<String, String>());
-    		try {
-    			asyncDataPublisher.publish(CALL_CENTER_DATA_STREAM, VERSION, event);
-    		} catch (AgentException e) {
-    			log.error("Failed to publish events. ", e);
-    		}
-    		
-    	}
-    	stats = null;
-    }
-    
-    private static Event eventObject(Object[] correlationData, Object[] metaData,
-                                     Object[] payLoadData, HashMap<String, String> map) {
-        Event event = new Event();
-        event.setCorrelationData(correlationData);
-        event.setMetaData(metaData);
-        event.setPayloadData(payLoadData);
-        event.setArbitraryDataMap(map);
-        return event;
-    }
-}