You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by im...@apache.org on 2013/12/19 08:22:02 UTC
[1/2] git commit: Moved statistics publisher interface and its cep
implementation to stratos common module
Updated Branches:
refs/heads/master ba1fe838b -> 880c346a1
Moved statistics publisher interface and its cep implementation to stratos common module
Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/790b2ea2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/790b2ea2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/790b2ea2
Branch: refs/heads/master
Commit: 790b2ea29b4988cad11027a28ca2b9c74c2474c3
Parents: 12adc17
Author: Imesh Gunaratne <im...@apache.org>
Authored: Thu Dec 19 12:50:23 2013 +0530
Committer: Imesh Gunaratne <im...@apache.org>
Committed: Thu Dec 19 12:50:23 2013 +0530
----------------------------------------------------------------------
components/org.apache.stratos.common/pom.xml | 22 +++-
.../pom.xml | 10 +-
.../statistics/LoadBalancerStatsPublisher.java | 45 --------
.../WSO2CEPFaultyMemberPublisher.java | 74 -------------
.../WSO2CEPInFlightRequestPublisher.java | 76 -------------
.../statistics/WSO2CEPStatsPublisher.java | 106 -------------------
.../publisher/WSO2CEPFaultyMemberPublisher.java | 75 +++++++++++++
.../WSO2CEPInFlightRequestPublisher.java | 77 ++++++++++++++
...oadBalancerInFlightRequestCountNotifier.java | 4 +-
.../WSO2CEPInFlightRequestCountObserver.java | 2 +-
10 files changed, 180 insertions(+), 311 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/790b2ea2/components/org.apache.stratos.common/pom.xml
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/pom.xml b/components/org.apache.stratos.common/pom.xml
index 27c2058..724989b 100644
--- a/components/org.apache.stratos.common/pom.xml
+++ b/components/org.apache.stratos.common/pom.xml
@@ -80,6 +80,26 @@
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
- </dependency>
+ </dependency>
+ <dependency>
+ <groupId>org.wso2.carbon</groupId>
+ <artifactId>org.wso2.carbon.databridge.commons</artifactId>
+ <version>4.1.0</version>
+ </dependency>
+ <dependency>
+ <groupId>commons-lang.wso2</groupId>
+ <artifactId>commons-lang</artifactId>
+ <version>2.6.0.wso2v1</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ <version>3.1</version>
+ </dependency>
+ <dependency>
+ <groupId>org.wso2.carbon</groupId>
+ <artifactId>org.wso2.carbon.databridge.agent.thrift</artifactId>
+ <version>4.1.0</version>
+ </dependency>
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/790b2ea2/components/org.apache.stratos.load.balancer.common/pom.xml
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer.common/pom.xml b/components/org.apache.stratos.load.balancer.common/pom.xml
index 996352a..e3bc211 100644
--- a/components/org.apache.stratos.load.balancer.common/pom.xml
+++ b/components/org.apache.stratos.load.balancer.common/pom.xml
@@ -51,6 +51,11 @@
</dependency>
<dependency>
<groupId>org.apache.stratos</groupId>
+ <artifactId>org.apache.stratos.common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.stratos</groupId>
<artifactId>org.apache.stratos.messaging</artifactId>
<version>${project.version}</version>
</dependency>
@@ -66,11 +71,6 @@
</dependency>
<dependency>
<groupId>org.wso2.carbon</groupId>
- <artifactId>org.wso2.carbon.databridge.commons.thrift</artifactId>
- <version>4.2.0</version>
- </dependency>
- <dependency>
- <groupId>org.wso2.carbon</groupId>
<artifactId>org.wso2.carbon.databridge.commons</artifactId>
<version>4.2.0</version>
</dependency>
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/790b2ea2/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/LoadBalancerStatsPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/LoadBalancerStatsPublisher.java b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/LoadBalancerStatsPublisher.java
deleted file mode 100644
index e6f8117..0000000
--- a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/LoadBalancerStatsPublisher.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.stratos.load.balancer.common.statistics;
-
-/**
- * Load balancer statistics publisher interface.
- */
-public interface LoadBalancerStatsPublisher {
-
- /**
- * Set statistics publisher enabled or disabled.
- *
- * @param enabled
- */
- void setEnabled(boolean enabled);
-
- /**
- * Return enabled state of the statistics publisher.
- */
- boolean isEnabled();
-
- /**
- * Payload to be published.
- *
- * @param payload An array of parameter values.
- */
- void publish(Object[] payload);
-}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/790b2ea2/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/WSO2CEPFaultyMemberPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/WSO2CEPFaultyMemberPublisher.java b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/WSO2CEPFaultyMemberPublisher.java
deleted file mode 100644
index 3225987..0000000
--- a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/WSO2CEPFaultyMemberPublisher.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.stratos.load.balancer.common.statistics;
-
-import org.wso2.carbon.databridge.commons.Attribute;
-import org.wso2.carbon.databridge.commons.AttributeType;
-import org.wso2.carbon.databridge.commons.StreamDefinition;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * WSO2 CEP faulty member publisher.
- * <p/>
- * Faulty members:
- * If a request was rejected by some of the members in a cluster while at least
- * one member could serve it, those members could be identified as faulty.
- */
-public class WSO2CEPFaultyMemberPublisher extends WSO2CEPStatsPublisher {
-
- private static final String DATA_STREAM_NAME = "stratos.lb.faulty.members";
- private static final String VERSION = "1.0.0";
-
- private static StreamDefinition createStreamDefinition() {
- try {
- StreamDefinition streamDefinition = new StreamDefinition(DATA_STREAM_NAME, VERSION);
- streamDefinition.setNickName("lb fault members");
- streamDefinition.setDescription("lb fault members");
- List<Attribute> payloadData = new ArrayList<Attribute>();
- // Payload definition
- payloadData.add(new Attribute("cluster_id", AttributeType.STRING));
- payloadData.add(new Attribute("member_id", AttributeType.STRING));
- streamDefinition.setPayloadData(payloadData);
- return streamDefinition;
- } catch (Exception e) {
- throw new RuntimeException("Could not create stream definition", e);
- }
- }
-
- public WSO2CEPFaultyMemberPublisher() {
- super(createStreamDefinition());
- }
-
- /**
- * Publish faulty members.
- *
- * @param clusterId
- * @param memberId
- */
- public void publish(String clusterId, String memberId) {
- List<Object> payload = new ArrayList<Object>();
- // Payload values
- payload.add(clusterId);
- payload.add(memberId);
- super.publish(payload.toArray());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/790b2ea2/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/WSO2CEPInFlightRequestPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/WSO2CEPInFlightRequestPublisher.java b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/WSO2CEPInFlightRequestPublisher.java
deleted file mode 100644
index f8c6d40..0000000
--- a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/WSO2CEPInFlightRequestPublisher.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.stratos.load.balancer.common.statistics;
-
-import org.wso2.carbon.databridge.commons.Attribute;
-import org.wso2.carbon.databridge.commons.AttributeType;
-import org.wso2.carbon.databridge.commons.StreamDefinition;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * WSO2 CEP in flight request count publisher.
- * <p/>
- * In-flight request count:
- * Number of requests being served at a given moment could be identified as in-flight request count.
- */
-public class WSO2CEPInFlightRequestPublisher extends WSO2CEPStatsPublisher {
-
- private static final String DATA_STREAM_NAME = "in_flight_requests";
- private static final String VERSION = "1.0.0";
-
- private static StreamDefinition createStreamDefinition() {
- try {
- StreamDefinition streamDefinition = new StreamDefinition(DATA_STREAM_NAME, VERSION);
- streamDefinition.setNickName("lb stats");
- streamDefinition.setDescription("lb stats");
- List<Attribute> payloadData = new ArrayList<Attribute>();
- // Payload definition
- payloadData.add(new Attribute("cluster_id", AttributeType.STRING));
- payloadData.add(new Attribute("network_partition_id", AttributeType.STRING));
- payloadData.add(new Attribute("in_flight_request_count", AttributeType.INT));
- streamDefinition.setPayloadData(payloadData);
- return streamDefinition;
- } catch (Exception e) {
- throw new RuntimeException("Could not create stream definition", e);
- }
- }
-
- public WSO2CEPInFlightRequestPublisher() {
- super(createStreamDefinition());
- }
-
- /**
- * Publish in-flight request count of a cluster.
- *
- * @param clusterId
- * @param networkPartitionId
- * @param inFlightRequestCount
- */
- public void publish(String clusterId, String networkPartitionId, int inFlightRequestCount) {
- List<Object> payload = new ArrayList<Object>();
- // Payload values
- payload.add(clusterId);
- payload.add(networkPartitionId);
- payload.add(inFlightRequestCount);
- super.publish(payload.toArray());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/790b2ea2/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/WSO2CEPStatsPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/WSO2CEPStatsPublisher.java b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/WSO2CEPStatsPublisher.java
deleted file mode 100644
index 03df6f2..0000000
--- a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/WSO2CEPStatsPublisher.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.stratos.load.balancer.common.statistics;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.wso2.carbon.databridge.agent.thrift.Agent;
-import org.wso2.carbon.databridge.agent.thrift.AsyncDataPublisher;
-import org.wso2.carbon.databridge.agent.thrift.conf.AgentConfiguration;
-import org.wso2.carbon.databridge.agent.thrift.exception.AgentException;
-import org.wso2.carbon.databridge.commons.Event;
-import org.wso2.carbon.databridge.commons.StreamDefinition;
-
-import java.util.HashMap;
-
-/**
- * WSO2 CEP statistics publisher for the load balancer.
- */
-public class WSO2CEPStatsPublisher implements LoadBalancerStatsPublisher {
- private static final Log log = LogFactory.getLog(WSO2CEPStatsPublisher.class);
-
- private StreamDefinition streamDefinition;
- private AsyncDataPublisher asyncDataPublisher;
- private String ip;
- private String port;
- private String username;
- private String password;
- private boolean enabled = false;
-
- public WSO2CEPStatsPublisher(StreamDefinition streamDefinition) {
- this.streamDefinition = streamDefinition;
- this.ip = System.getProperty("thrift.receiver.ip");
- this.port = System.getProperty("thrift.receiver.port");
- this.username = "admin";
- this.password = "admin";
- String enabledStr = System.getProperty("load.balancer.cep.stats.publisher.enabled");
- if (StringUtils.isNotBlank(enabledStr)) {
- enabled = Boolean.getBoolean(enabledStr);
- if (enabled) {
- init();
- }
- }
- }
-
- private void init() {
- AgentConfiguration agentConfiguration = new AgentConfiguration();
- Agent agent = new Agent(agentConfiguration);
-
- // Initialize asynchronous data publisher
- asyncDataPublisher = new AsyncDataPublisher("tcp://" + ip + ":" + port + "", username, password, agent);
- asyncDataPublisher.addStreamDefinition(streamDefinition);
- }
-
- @Override
- public void setEnabled(boolean enabled) {
- this.enabled = enabled;
- if (this.enabled) {
- init();
- }
- }
-
- @Override
- public boolean isEnabled() {
- return enabled;
- }
-
- @Override
- public void publish(Object[] payload) {
- if (!isEnabled()) {
- throw new RuntimeException("Statistics publisher is not enabled");
- }
-
- Event event = new Event();
- event.setPayloadData(payload);
- event.setArbitraryDataMap(new HashMap<String, String>());
-
- try {
- if (log.isDebugEnabled()) {
- log.debug(String.format("Publishing cep event: [stream] %s [version] %s", streamDefinition.getName(), streamDefinition.getVersion()));
- }
- asyncDataPublisher.publish(streamDefinition.getName(), streamDefinition.getVersion(), event);
- } catch (AgentException e) {
- if (log.isErrorEnabled()) {
- log.error(String.format("Could not publish cep event: [stream] %s [version] %s", streamDefinition.getName(), streamDefinition.getVersion()), e);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/790b2ea2/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/publisher/WSO2CEPFaultyMemberPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/publisher/WSO2CEPFaultyMemberPublisher.java b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/publisher/WSO2CEPFaultyMemberPublisher.java
new file mode 100644
index 0000000..cf21e5b
--- /dev/null
+++ b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/publisher/WSO2CEPFaultyMemberPublisher.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.load.balancer.common.statistics.publisher;
+
+import org.apache.stratos.common.statistics.publisher.WSO2CEPStatisticsPublisher;
+import org.wso2.carbon.databridge.commons.Attribute;
+import org.wso2.carbon.databridge.commons.AttributeType;
+import org.wso2.carbon.databridge.commons.StreamDefinition;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * WSO2 CEP faulty member publisher.
+ * <p/>
+ * Faulty members:
+ * If a request was rejected by some of the members in a cluster while at least
+ * one member could serve it, those members could be identified as faulty.
+ */
+public class WSO2CEPFaultyMemberPublisher extends WSO2CEPStatisticsPublisher {
+
+ private static final String DATA_STREAM_NAME = "stratos.lb.faulty.members";
+ private static final String VERSION = "1.0.0";
+
+ private static StreamDefinition createStreamDefinition() {
+ try {
+ StreamDefinition streamDefinition = new StreamDefinition(DATA_STREAM_NAME, VERSION);
+ streamDefinition.setNickName("lb fault members");
+ streamDefinition.setDescription("lb fault members");
+ List<Attribute> payloadData = new ArrayList<Attribute>();
+ // Payload definition
+ payloadData.add(new Attribute("cluster_id", AttributeType.STRING));
+ payloadData.add(new Attribute("member_id", AttributeType.STRING));
+ streamDefinition.setPayloadData(payloadData);
+ return streamDefinition;
+ } catch (Exception e) {
+ throw new RuntimeException("Could not create stream definition", e);
+ }
+ }
+
+ public WSO2CEPFaultyMemberPublisher() {
+ super(createStreamDefinition());
+ }
+
+ /**
+ * Publish faulty members.
+ *
+ * @param clusterId
+ * @param memberId
+ */
+ public void publish(String clusterId, String memberId) {
+ List<Object> payload = new ArrayList<Object>();
+ // Payload values
+ payload.add(clusterId);
+ payload.add(memberId);
+ super.publish(payload.toArray());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/790b2ea2/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/publisher/WSO2CEPInFlightRequestPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/publisher/WSO2CEPInFlightRequestPublisher.java b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/publisher/WSO2CEPInFlightRequestPublisher.java
new file mode 100644
index 0000000..7e52ba0
--- /dev/null
+++ b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/publisher/WSO2CEPInFlightRequestPublisher.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.load.balancer.common.statistics.publisher;
+
+import org.apache.stratos.common.statistics.publisher.WSO2CEPStatisticsPublisher;
+import org.wso2.carbon.databridge.commons.Attribute;
+import org.wso2.carbon.databridge.commons.AttributeType;
+import org.wso2.carbon.databridge.commons.StreamDefinition;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * WSO2 CEP in flight request count publisher.
+ * <p/>
+ * In-flight request count:
+ * Number of requests being served at a given moment could be identified as in-flight request count.
+ */
+public class WSO2CEPInFlightRequestPublisher extends WSO2CEPStatisticsPublisher {
+
+ private static final String DATA_STREAM_NAME = "in_flight_requests";
+ private static final String VERSION = "1.0.0";
+
+ private static StreamDefinition createStreamDefinition() {
+ try {
+ StreamDefinition streamDefinition = new StreamDefinition(DATA_STREAM_NAME, VERSION);
+ streamDefinition.setNickName("lb stats");
+ streamDefinition.setDescription("lb stats");
+ List<Attribute> payloadData = new ArrayList<Attribute>();
+ // Payload definition
+ payloadData.add(new Attribute("cluster_id", AttributeType.STRING));
+ payloadData.add(new Attribute("network_partition_id", AttributeType.STRING));
+ payloadData.add(new Attribute("in_flight_request_count", AttributeType.INT));
+ streamDefinition.setPayloadData(payloadData);
+ return streamDefinition;
+ } catch (Exception e) {
+ throw new RuntimeException("Could not create stream definition", e);
+ }
+ }
+
+ public WSO2CEPInFlightRequestPublisher() {
+ super(createStreamDefinition());
+ }
+
+ /**
+ * Publish in-flight request count of a cluster.
+ *
+ * @param clusterId
+ * @param networkPartitionId
+ * @param inFlightRequestCount
+ */
+ public void publish(String clusterId, String networkPartitionId, int inFlightRequestCount) {
+ List<Object> payload = new ArrayList<Object>();
+ // Payload values
+ payload.add(clusterId);
+ payload.add(networkPartitionId);
+ payload.add(inFlightRequestCount);
+ super.publish(payload.toArray());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/790b2ea2/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerInFlightRequestCountNotifier.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerInFlightRequestCountNotifier.java b/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerInFlightRequestCountNotifier.java
index 5fa3c71..c5bf52b 100644
--- a/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerInFlightRequestCountNotifier.java
+++ b/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerInFlightRequestCountNotifier.java
@@ -22,13 +22,11 @@ package org.apache.stratos.load.balancer.extension.api;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.load.balancer.common.statistics.WSO2CEPInFlightRequestPublisher;
+import org.apache.stratos.load.balancer.common.statistics.publisher.WSO2CEPInFlightRequestPublisher;
import org.apache.stratos.messaging.domain.topology.Cluster;
import org.apache.stratos.messaging.domain.topology.Service;
import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
-import java.util.Collection;
-
/**
* Load balancer statistics notifier thread for publishing statistics periodically to CEP.
*/
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/790b2ea2/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/observers/WSO2CEPInFlightRequestCountObserver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/observers/WSO2CEPInFlightRequestCountObserver.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/observers/WSO2CEPInFlightRequestCountObserver.java
index 390905e..47a2602 100644
--- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/observers/WSO2CEPInFlightRequestCountObserver.java
+++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/observers/WSO2CEPInFlightRequestCountObserver.java
@@ -21,7 +21,7 @@ package org.apache.stratos.load.balancer.statistics.observers;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.load.balancer.common.statistics.WSO2CEPInFlightRequestPublisher;
+import org.apache.stratos.load.balancer.common.statistics.publisher.WSO2CEPInFlightRequestPublisher;
import java.util.Map;
import java.util.Observable;
[2/2] git commit: Merge remote-tracking branch 'origin/master'
Posted by im...@apache.org.
Merge remote-tracking branch 'origin/master'
Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/880c346a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/880c346a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/880c346a
Branch: refs/heads/master
Commit: 880c346a1a3ca831158ce4a0c3003f74276c5baa
Parents: 790b2ea ba1fe83
Author: Imesh Gunaratne <im...@apache.org>
Authored: Thu Dec 19 12:50:55 2013 +0530
Committer: Imesh Gunaratne <im...@apache.org>
Committed: Thu Dec 19 12:50:55 2013 +0530
----------------------------------------------------------------------
.../stratos/adc/mgt/deploy/service/Service.java | 5 +-
.../service/ServiceDeploymentManager.java | 15 +-
.../service/multitenant/MultiTenantService.java | 5 +-
.../mgt/listener/InstanceStatusListener.java | 19 +-
.../manager/CartridgeSubscriptionManager.java | 12 ++
.../adc/mgt/utils/PersistenceManager.java | 42 ++++
.../java/org/apache/stratos/cli/RestClient.java | 16 +-
.../stratos/cli/RestCommandLineService.java | 34 +--
.../rest/endpoint/services/ServiceUtils.java | 17 +-
.../extension/FaultHandlingWindowProcessor.java | 6 +-
.../cartridge-agent/ec2/php/cartridge-agent.sh | 12 +-
.../subscriber/CartridgeAgentConstants.java | 5 +
.../cartridge/agent/event/subscriber/Main.java | 207 ++++++++++++++-----
.../puppet/etc/puppet/files/cartridge-agent.sh | 12 +-
14 files changed, 297 insertions(+), 110 deletions(-)
----------------------------------------------------------------------