You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by im...@apache.org on 2013/11/06 07:13:23 UTC
git commit: Moved load balancing statistics reporting logic to load
balancer common component
Updated Branches:
refs/heads/master a103b79a8 -> 73d4a873f
Moved load balancing statistics reporting logic to load balancer common component
Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/73d4a873
Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/73d4a873
Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/73d4a873
Branch: refs/heads/master
Commit: 73d4a873f7c6879265f0ae6addadfc245e70a53c
Parents: a103b79
Author: Imesh Gunaratne <im...@apache.org>
Authored: Wed Nov 6 11:43:10 2013 +0530
Committer: Imesh Gunaratne <im...@apache.org>
Committed: Wed Nov 6 11:43:10 2013 +0530
----------------------------------------------------------------------
.../pom.xml | 25 +++++
.../statistics/LoadBalancingStatsCollector.java | 110 +++++++++++++++++++
.../observers/WSO2CEPStatsObserver.java | 102 +++++++++++++++++
.../org.apache.stratos.load.balancer/pom.xml | 32 +++---
.../TenantAwareLoadBalanceEndpoint.java | 2 +-
.../balancer/mediators/ResponseInterceptor.java | 4 +-
.../stat/LoadBalancingStatsCollector.java | 110 -------------------
.../stat/observers/WSO2CEPStatsObserver.java | 102 -----------------
8 files changed, 257 insertions(+), 230 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/73d4a873/components/org.apache.stratos.load.balancer.common/pom.xml
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer.common/pom.xml b/components/org.apache.stratos.load.balancer.common/pom.xml
index 9ba9aac..996352a 100644
--- a/components/org.apache.stratos.load.balancer.common/pom.xml
+++ b/components/org.apache.stratos.load.balancer.common/pom.xml
@@ -49,6 +49,31 @@
<artifactId>org.wso2.carbon.logging</artifactId>
<version>${wso2carbon.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.stratos</groupId>
+ <artifactId>org.apache.stratos.messaging</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.thrift</groupId>
+ <artifactId>libthrift</artifactId>
+ <version>0.9.1</version>
+ </dependency>
+ <dependency>
+ <groupId>org.wso2.carbon</groupId>
+ <artifactId>org.wso2.carbon.databridge.agent.thrift</artifactId>
+ <version>4.2.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.wso2.carbon</groupId>
+ <artifactId>org.wso2.carbon.databridge.commons.thrift</artifactId>
+ <version>4.2.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.wso2.carbon</groupId>
+ <artifactId>org.wso2.carbon.databridge.commons</artifactId>
+ <version>4.2.0</version>
+ </dependency>
</dependencies>
<build>
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/73d4a873/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/LoadBalancingStatsCollector.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/LoadBalancingStatsCollector.java b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/LoadBalancingStatsCollector.java
new file mode 100644
index 0000000..a55fc22
--- /dev/null
+++ b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/LoadBalancingStatsCollector.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.load.balancer.common.statistics;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Observable;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.stratos.load.balancer.common.statistics.observers.WSO2CEPStatsObserver;
+
+/**
+ * This is the load balancing stats collector and any observer can get registered here
+ * and receive notifications periodically.
+ * This is a Singleton object.
+ * @author nirmal
+ *
+ */
+public class LoadBalancingStatsCollector extends Observable{
+
+ private static LoadBalancingStatsCollector collector;
+ private Map<String, Integer> clusterIdToRequestInflightCountMap;
+ private Thread notifier;
+
+ private LoadBalancingStatsCollector() {
+ clusterIdToRequestInflightCountMap = new ConcurrentHashMap<String, Integer>();
+ if (notifier == null || (notifier != null && !notifier.isAlive())) {
+ notifier = new Thread(new ObserverNotifier());
+ notifier.start();
+ }
+ }
+
+ public static LoadBalancingStatsCollector getInstance() {
+ if (collector == null) {
+ synchronized (LoadBalancingStatsCollector.class) {
+ if (collector == null) {
+ collector = new LoadBalancingStatsCollector();
+ // add observers
+ collector.addObserver(new WSO2CEPStatsObserver());
+ }
+ }
+ }
+ return collector;
+ }
+
+ public void incrementRequestInflightCount(String clusterId) {
+ if(clusterId == null) {
+ return;
+ }
+
+ int value = 1;
+ if(clusterIdToRequestInflightCountMap.get(clusterId) != null) {
+ value += clusterIdToRequestInflightCountMap.get(clusterId);
+ }
+ clusterIdToRequestInflightCountMap.put(clusterId, value);
+ setChanged();
+ }
+
+ public void decrementRequestInflightCount(String clusterId) {
+ if(clusterId == null) {
+ return;
+ }
+
+ int value = -1;
+ if(clusterIdToRequestInflightCountMap.get(clusterId) != null) {
+ value += clusterIdToRequestInflightCountMap.get(clusterId);
+ }
+ clusterIdToRequestInflightCountMap.put(clusterId, value);
+ setChanged();
+ }
+
+
+ /**
+ * This thread will notify all the observers of this subject.
+ * @author nirmal
+ *
+ */
+ private class ObserverNotifier implements Runnable {
+
+ @Override
+ public void run() {
+ while(true) {
+ try {
+ Thread.sleep(15000);
+ } catch (InterruptedException ignore) {
+ }
+ LoadBalancingStatsCollector.getInstance().notifyObservers(new HashMap<String, Integer>(clusterIdToRequestInflightCountMap));
+ }
+
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/73d4a873/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/observers/WSO2CEPStatsObserver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/observers/WSO2CEPStatsObserver.java b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/observers/WSO2CEPStatsObserver.java
new file mode 100644
index 0000000..f53c5a5
--- /dev/null
+++ b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/observers/WSO2CEPStatsObserver.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.load.balancer.common.statistics.observers;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Observable;
+import java.util.Observer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.wso2.carbon.databridge.agent.thrift.Agent;
+import org.wso2.carbon.databridge.agent.thrift.AsyncDataPublisher;
+import org.wso2.carbon.databridge.agent.thrift.conf.AgentConfiguration;
+import org.wso2.carbon.databridge.agent.thrift.exception.AgentException;
+import org.wso2.carbon.databridge.commons.Event;
+import org.wso2.carbon.utils.CarbonUtils;
+
+public class WSO2CEPStatsObserver implements Observer{
+
+ private static final Log log = LogFactory.getLog(WSO2CEPStatsObserver.class);
+ private static final String CALL_CENTER_DATA_STREAM = "stratos.lb.stats";
+ private static final String VERSION = "1.0.0";
+ private AsyncDataPublisher asyncDataPublisher;
+
+ public WSO2CEPStatsObserver() {
+ AgentConfiguration agentConfiguration = new AgentConfiguration();
+ // TODO get following from somewhere, without hard-coding.
+ System.setProperty("javax.net.ssl.trustStore", CarbonUtils.getCarbonHome()+File.separator+"repository"+
+ File.separator+"resources"+File.separator+"security"+File.separator+"client-truststore.jks" );
+ System.setProperty("javax.net.ssl.trustStorePassword", "wso2carbon");
+
+ Agent agent = new Agent(agentConfiguration);
+ //TODO read following from a config file?
+ String ip = System.getProperty("thrift.receiver.ip");
+ String port = System.getProperty("thrift.receiver.port");
+ //Using Asynchronous data publisher
+ asyncDataPublisher = new AsyncDataPublisher("tcp://"+ip+":"+port+"", "admin", "admin", agent);
+ String streamDefinition = "{" +
+ " 'name':'" + CALL_CENTER_DATA_STREAM + "'," +
+ " 'version':'" + VERSION + "'," +
+ " 'nickName': 'lb stats'," +
+ " 'description': 'lb stats'," +
+ " 'metaData':[]," +
+ " 'payloadData':[" +
+ " {'name':'cluster_id','type':'STRING'}," +
+ " {'name':'in_flight_requests','type':'INT'}" +
+ " ]" +
+ "}";
+ asyncDataPublisher.addStreamDefinition(streamDefinition, CALL_CENTER_DATA_STREAM, VERSION);
+ }
+
+ public void update(Observable arg0, Object arg1) {
+ if(arg1 != null && arg1 instanceof Map<?, ?>) {
+ Map<String, Integer> stats = (Map<String, Integer>)arg1;
+ publishEvents(stats);
+ }
+ }
+
+ private void publishEvents(Map<String, Integer> stats) {
+
+ for (Map.Entry<String, Integer> entry : stats.entrySet()) {
+
+ Object[] payload = new Object[]{entry.getKey(), entry.getValue()};
+ Event event = eventObject(null, null, payload, new HashMap<String, String>());
+ try {
+ asyncDataPublisher.publish(CALL_CENTER_DATA_STREAM, VERSION, event);
+ } catch (AgentException e) {
+ log.error("Failed to publish events. ", e);
+ }
+
+ }
+ stats = null;
+ }
+
+ private static Event eventObject(Object[] correlationData, Object[] metaData,
+ Object[] payLoadData, HashMap<String, String> map) {
+ Event event = new Event();
+ event.setCorrelationData(correlationData);
+ event.setMetaData(metaData);
+ event.setPayloadData(payLoadData);
+ event.setArbitraryDataMap(map);
+ return event;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/73d4a873/components/org.apache.stratos.load.balancer/pom.xml
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer/pom.xml b/components/org.apache.stratos.load.balancer/pom.xml
index a88806e..a763770 100644
--- a/components/org.apache.stratos.load.balancer/pom.xml
+++ b/components/org.apache.stratos.load.balancer/pom.xml
@@ -32,7 +32,7 @@
<packaging>bundle</packaging>
<name>Apache Stratos - Load Balancer</name>
<url>http://apache.org</url>
-
+
<dependencies>
<dependency>
<groupId>org.wso2.carbon</groupId>
@@ -54,7 +54,7 @@
<artifactId>org.wso2.carbon.logging</artifactId>
<version>${wso2carbon.version}</version>
</dependency>
- <dependency>
+ <dependency>
<groupId>org.apache.axis2.wso2</groupId>
<artifactId>axis2</artifactId>
<version>${axis2.wso2.version}</version>
@@ -70,6 +70,16 @@
<version>${project.version}</version>
</dependency>
<dependency>
+ <groupId>org.apache.stratos</groupId>
+ <artifactId>org.apache.stratos.load.balancer.common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.thrift</groupId>
+ <artifactId>libthrift</artifactId>
+ <version>0.9.1</version>
+ </dependency>
+ <dependency>
<groupId>org.wso2.carbon</groupId>
<artifactId>org.wso2.carbon.databridge.agent.thrift</artifactId>
<version>4.2.0</version>
@@ -84,18 +94,12 @@
<artifactId>org.wso2.carbon.databridge.commons</artifactId>
<version>4.2.0</version>
</dependency>
-
- <dependency>
- <groupId>org.apache.thrift</groupId>
- <artifactId>libthrift</artifactId>
- <version>0.9.1</version>
- </dependency>
- <dependency>
- <groupId>commons-pool</groupId>
- <artifactId>commons-pool</artifactId>
- <version>1.6</version>
- </dependency>
-
+ <dependency>
+ <groupId>commons-pool</groupId>
+ <artifactId>commons-pool</artifactId>
+ <version>1.6</version>
+ </dependency>
+
</dependencies>
<build>
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/73d4a873/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java
index a132b30..4a05335 100644
--- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java
+++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java
@@ -24,7 +24,7 @@ import org.apache.axis2.description.TransportInDescription;
import org.apache.http.protocol.HTTP;
import org.apache.stratos.load.balancer.RequestDelegator;
import org.apache.stratos.load.balancer.algorithm.LoadBalanceAlgorithmFactory;
-import org.apache.stratos.load.balancer.stat.LoadBalancingStatsCollector;
+import org.apache.stratos.load.balancer.common.statistics.LoadBalancingStatsCollector;
import org.apache.stratos.load.balancer.util.Constants;
import org.apache.stratos.messaging.domain.topology.Member;
import org.apache.stratos.messaging.domain.topology.Port;
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/73d4a873/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/mediators/ResponseInterceptor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/mediators/ResponseInterceptor.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/mediators/ResponseInterceptor.java
index fc6d84c..9108be5 100644
--- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/mediators/ResponseInterceptor.java
+++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/mediators/ResponseInterceptor.java
@@ -18,13 +18,11 @@
*/
package org.apache.stratos.load.balancer.mediators;
-import org.apache.axis2.context.ConfigurationContext;
-import org.apache.stratos.load.balancer.stat.LoadBalancingStatsCollector;
+import org.apache.stratos.load.balancer.common.statistics.LoadBalancingStatsCollector;
import org.apache.stratos.load.balancer.util.Constants;
import org.apache.synapse.ManagedLifecycle;
import org.apache.synapse.MessageContext;
import org.apache.synapse.core.SynapseEnvironment;
-import org.apache.synapse.core.axis2.Axis2MessageContext;
import org.apache.synapse.mediators.AbstractMediator;
/**
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/73d4a873/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/stat/LoadBalancingStatsCollector.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/stat/LoadBalancingStatsCollector.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/stat/LoadBalancingStatsCollector.java
deleted file mode 100644
index a0adc93..0000000
--- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/stat/LoadBalancingStatsCollector.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.load.balancer.stat;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Observable;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.stratos.load.balancer.stat.observers.WSO2CEPStatsObserver;
-
-/**
- * This is the load balancing stats collector and any observer can get registered here
- * and receive notifications periodically.
- * This is a Singleton object.
- * @author nirmal
- *
- */
-public class LoadBalancingStatsCollector extends Observable{
-
- private static LoadBalancingStatsCollector collector;
- private Map<String, Integer> clusterIdToRequestInflightCountMap;
- private Thread notifier;
-
- private LoadBalancingStatsCollector() {
- clusterIdToRequestInflightCountMap = new ConcurrentHashMap<String, Integer>();
- if (notifier == null || (notifier != null && !notifier.isAlive())) {
- notifier = new Thread(new ObserverNotifier());
- notifier.start();
- }
- }
-
- public static LoadBalancingStatsCollector getInstance() {
- if (collector == null) {
- synchronized (LoadBalancingStatsCollector.class) {
- if (collector == null) {
- collector = new LoadBalancingStatsCollector();
- // add observers
- collector.addObserver(new WSO2CEPStatsObserver());
- }
- }
- }
- return collector;
- }
-
- public void incrementRequestInflightCount(String clusterId) {
- if(clusterId == null) {
- return;
- }
-
- int value = 1;
- if(clusterIdToRequestInflightCountMap.get(clusterId) != null) {
- value += clusterIdToRequestInflightCountMap.get(clusterId);
- }
- clusterIdToRequestInflightCountMap.put(clusterId, value);
- setChanged();
- }
-
- public void decrementRequestInflightCount(String clusterId) {
- if(clusterId == null) {
- return;
- }
-
- int value = -1;
- if(clusterIdToRequestInflightCountMap.get(clusterId) != null) {
- value += clusterIdToRequestInflightCountMap.get(clusterId);
- }
- clusterIdToRequestInflightCountMap.put(clusterId, value);
- setChanged();
- }
-
-
- /**
- * This thread will notify all the observers of this subject.
- * @author nirmal
- *
- */
- private class ObserverNotifier implements Runnable {
-
- @Override
- public void run() {
- while(true) {
- try {
- Thread.sleep(15000);
- } catch (InterruptedException ignore) {
- }
- LoadBalancingStatsCollector.getInstance().notifyObservers(new HashMap<String, Integer>(clusterIdToRequestInflightCountMap));
- }
-
- }
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/73d4a873/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/stat/observers/WSO2CEPStatsObserver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/stat/observers/WSO2CEPStatsObserver.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/stat/observers/WSO2CEPStatsObserver.java
deleted file mode 100644
index 6093ee5..0000000
--- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/stat/observers/WSO2CEPStatsObserver.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.load.balancer.stat.observers;
-
-import java.io.File;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Observable;
-import java.util.Observer;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.wso2.carbon.databridge.agent.thrift.Agent;
-import org.wso2.carbon.databridge.agent.thrift.AsyncDataPublisher;
-import org.wso2.carbon.databridge.agent.thrift.conf.AgentConfiguration;
-import org.wso2.carbon.databridge.agent.thrift.exception.AgentException;
-import org.wso2.carbon.databridge.commons.Event;
-import org.wso2.carbon.utils.CarbonUtils;
-
-public class WSO2CEPStatsObserver implements Observer{
-
- private static final Log log = LogFactory.getLog(WSO2CEPStatsObserver.class);
- private static final String CALL_CENTER_DATA_STREAM = "stratos.lb.stats";
- private static final String VERSION = "1.0.0";
- private AsyncDataPublisher asyncDataPublisher;
-
- public WSO2CEPStatsObserver() {
- AgentConfiguration agentConfiguration = new AgentConfiguration();
- // TODO get following from somewhere, without hard-coding.
- System.setProperty("javax.net.ssl.trustStore", CarbonUtils.getCarbonHome()+File.separator+"repository"+
- File.separator+"resources"+File.separator+"security"+File.separator+"client-truststore.jks" );
- System.setProperty("javax.net.ssl.trustStorePassword", "wso2carbon");
-
- Agent agent = new Agent(agentConfiguration);
- //TODO read following from a config file?
- String ip = System.getProperty("thrift.receiver.ip");
- String port = System.getProperty("thrift.receiver.port");
- //Using Asynchronous data publisher
- asyncDataPublisher = new AsyncDataPublisher("tcp://"+ip+":"+port+"", "admin", "admin", agent);
- String streamDefinition = "{" +
- " 'name':'" + CALL_CENTER_DATA_STREAM + "'," +
- " 'version':'" + VERSION + "'," +
- " 'nickName': 'lb stats'," +
- " 'description': 'lb stats'," +
- " 'metaData':[]," +
- " 'payloadData':[" +
- " {'name':'cluster_id','type':'STRING'}," +
- " {'name':'in_flight_requests','type':'INT'}" +
- " ]" +
- "}";
- asyncDataPublisher.addStreamDefinition(streamDefinition, CALL_CENTER_DATA_STREAM, VERSION);
- }
-
- public void update(Observable arg0, Object arg1) {
- if(arg1 != null && arg1 instanceof Map<?, ?>) {
- Map<String, Integer> stats = (Map<String, Integer>)arg1;
- publishEvents(stats);
- }
- }
-
- private void publishEvents(Map<String, Integer> stats) {
-
- for (Map.Entry<String, Integer> entry : stats.entrySet()) {
-
- Object[] payload = new Object[]{entry.getKey(), entry.getValue()};
- Event event = eventObject(null, null, payload, new HashMap<String, String>());
- try {
- asyncDataPublisher.publish(CALL_CENTER_DATA_STREAM, VERSION, event);
- } catch (AgentException e) {
- log.error("Failed to publish events. ", e);
- }
-
- }
- stats = null;
- }
-
- private static Event eventObject(Object[] correlationData, Object[] metaData,
- Object[] payLoadData, HashMap<String, String> map) {
- Event event = new Event();
- event.setCorrelationData(correlationData);
- event.setMetaData(metaData);
- event.setPayloadData(payLoadData);
- event.setArbitraryDataMap(map);
- return event;
- }
-}