You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by im...@apache.org on 2013/12/19 08:22:02 UTC

[1/2] git commit: Moved statistics publisher interface and its cep implementation to stratos common module

Updated Branches:
  refs/heads/master ba1fe838b -> 880c346a1


Moved statistics publisher interface and its cep implementation to stratos common module


Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/790b2ea2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/790b2ea2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/790b2ea2

Branch: refs/heads/master
Commit: 790b2ea29b4988cad11027a28ca2b9c74c2474c3
Parents: 12adc17
Author: Imesh Gunaratne <im...@apache.org>
Authored: Thu Dec 19 12:50:23 2013 +0530
Committer: Imesh Gunaratne <im...@apache.org>
Committed: Thu Dec 19 12:50:23 2013 +0530

----------------------------------------------------------------------
 components/org.apache.stratos.common/pom.xml    |  22 +++-
 .../pom.xml                                     |  10 +-
 .../statistics/LoadBalancerStatsPublisher.java  |  45 --------
 .../WSO2CEPFaultyMemberPublisher.java           |  74 -------------
 .../WSO2CEPInFlightRequestPublisher.java        |  76 -------------
 .../statistics/WSO2CEPStatsPublisher.java       | 106 -------------------
 .../publisher/WSO2CEPFaultyMemberPublisher.java |  75 +++++++++++++
 .../WSO2CEPInFlightRequestPublisher.java        |  77 ++++++++++++++
 ...oadBalancerInFlightRequestCountNotifier.java |   4 +-
 .../WSO2CEPInFlightRequestCountObserver.java    |   2 +-
 10 files changed, 180 insertions(+), 311 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/790b2ea2/components/org.apache.stratos.common/pom.xml
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.common/pom.xml b/components/org.apache.stratos.common/pom.xml
index 27c2058..724989b 100644
--- a/components/org.apache.stratos.common/pom.xml
+++ b/components/org.apache.stratos.common/pom.xml
@@ -80,6 +80,26 @@
              <groupId>junit</groupId>
              <artifactId>junit</artifactId>
              <scope>test</scope>
-         </dependency>        
+         </dependency>
+        <dependency>
+            <groupId>org.wso2.carbon</groupId>
+            <artifactId>org.wso2.carbon.databridge.commons</artifactId>
+            <version>4.1.0</version>
+        </dependency>
+        <dependency>
+            <groupId>commons-lang.wso2</groupId>
+            <artifactId>commons-lang</artifactId>
+            <version>2.6.0.wso2v1</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-lang3</artifactId>
+            <version>3.1</version>
+        </dependency>
+        <dependency>
+            <groupId>org.wso2.carbon</groupId>
+            <artifactId>org.wso2.carbon.databridge.agent.thrift</artifactId>
+            <version>4.1.0</version>
+        </dependency>
     </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/790b2ea2/components/org.apache.stratos.load.balancer.common/pom.xml
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer.common/pom.xml b/components/org.apache.stratos.load.balancer.common/pom.xml
index 996352a..e3bc211 100644
--- a/components/org.apache.stratos.load.balancer.common/pom.xml
+++ b/components/org.apache.stratos.load.balancer.common/pom.xml
@@ -51,6 +51,11 @@
         </dependency>
         <dependency>
             <groupId>org.apache.stratos</groupId>
+            <artifactId>org.apache.stratos.common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.stratos</groupId>
             <artifactId>org.apache.stratos.messaging</artifactId>
             <version>${project.version}</version>
         </dependency>
@@ -66,11 +71,6 @@
         </dependency>
         <dependency>
             <groupId>org.wso2.carbon</groupId>
-            <artifactId>org.wso2.carbon.databridge.commons.thrift</artifactId>
-            <version>4.2.0</version>
-        </dependency>
-        <dependency>
-            <groupId>org.wso2.carbon</groupId>
             <artifactId>org.wso2.carbon.databridge.commons</artifactId>
             <version>4.2.0</version>
         </dependency>

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/790b2ea2/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/LoadBalancerStatsPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/LoadBalancerStatsPublisher.java b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/LoadBalancerStatsPublisher.java
deleted file mode 100644
index e6f8117..0000000
--- a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/LoadBalancerStatsPublisher.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.stratos.load.balancer.common.statistics;
-
-/**
- * Load balancer statistics publisher interface.
- */
-public interface LoadBalancerStatsPublisher {
-
-    /**
-     * Set statistics publisher enabled or disabled.
-     *
-     * @param enabled
-     */
-    void setEnabled(boolean enabled);
-
-    /**
-     * Return enabled state of the statistics publisher.
-     */
-    boolean isEnabled();
-
-    /**
-     * Payload to be published.
-     *
-     * @param payload An array of parameter values.
-     */
-    void publish(Object[] payload);
-}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/790b2ea2/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/WSO2CEPFaultyMemberPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/WSO2CEPFaultyMemberPublisher.java b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/WSO2CEPFaultyMemberPublisher.java
deleted file mode 100644
index 3225987..0000000
--- a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/WSO2CEPFaultyMemberPublisher.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.stratos.load.balancer.common.statistics;
-
-import org.wso2.carbon.databridge.commons.Attribute;
-import org.wso2.carbon.databridge.commons.AttributeType;
-import org.wso2.carbon.databridge.commons.StreamDefinition;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * WSO2 CEP faulty member publisher.
- * <p/>
- * Faulty members:
- * If a request was rejected by some of the members in a cluster while at least
- * one member could serve it, those members could be identified as faulty.
- */
-public class WSO2CEPFaultyMemberPublisher extends WSO2CEPStatsPublisher {
-
-    private static final String DATA_STREAM_NAME = "stratos.lb.faulty.members";
-    private static final String VERSION = "1.0.0";
-
-    private static StreamDefinition createStreamDefinition() {
-        try {
-            StreamDefinition streamDefinition = new StreamDefinition(DATA_STREAM_NAME, VERSION);
-            streamDefinition.setNickName("lb fault members");
-            streamDefinition.setDescription("lb fault members");
-            List<Attribute> payloadData = new ArrayList<Attribute>();
-            // Payload definition
-            payloadData.add(new Attribute("cluster_id", AttributeType.STRING));
-            payloadData.add(new Attribute("member_id", AttributeType.STRING));
-            streamDefinition.setPayloadData(payloadData);
-            return streamDefinition;
-        } catch (Exception e) {
-            throw new RuntimeException("Could not create stream definition", e);
-        }
-    }
-
-    public WSO2CEPFaultyMemberPublisher() {
-        super(createStreamDefinition());
-    }
-
-    /**
-     * Publish faulty members.
-     *
-     * @param clusterId
-     * @param memberId
-     */
-    public void publish(String clusterId, String memberId) {
-        List<Object> payload = new ArrayList<Object>();
-        // Payload values
-        payload.add(clusterId);
-        payload.add(memberId);
-        super.publish(payload.toArray());
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/790b2ea2/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/WSO2CEPInFlightRequestPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/WSO2CEPInFlightRequestPublisher.java b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/WSO2CEPInFlightRequestPublisher.java
deleted file mode 100644
index f8c6d40..0000000
--- a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/WSO2CEPInFlightRequestPublisher.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.stratos.load.balancer.common.statistics;
-
-import org.wso2.carbon.databridge.commons.Attribute;
-import org.wso2.carbon.databridge.commons.AttributeType;
-import org.wso2.carbon.databridge.commons.StreamDefinition;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * WSO2 CEP in flight request count publisher.
- * <p/>
- * In-flight request count:
- * Number of requests being served at a given moment could be identified as in-flight request count.
- */
-public class WSO2CEPInFlightRequestPublisher extends WSO2CEPStatsPublisher {
-
-    private static final String DATA_STREAM_NAME = "in_flight_requests";
-    private static final String VERSION = "1.0.0";
-
-    private static StreamDefinition createStreamDefinition() {
-        try {
-            StreamDefinition streamDefinition = new StreamDefinition(DATA_STREAM_NAME, VERSION);
-            streamDefinition.setNickName("lb stats");
-            streamDefinition.setDescription("lb stats");
-            List<Attribute> payloadData = new ArrayList<Attribute>();
-            // Payload definition
-            payloadData.add(new Attribute("cluster_id", AttributeType.STRING));
-            payloadData.add(new Attribute("network_partition_id", AttributeType.STRING));
-            payloadData.add(new Attribute("in_flight_request_count", AttributeType.INT));
-            streamDefinition.setPayloadData(payloadData);
-            return streamDefinition;
-        } catch (Exception e) {
-            throw new RuntimeException("Could not create stream definition", e);
-        }
-    }
-
-    public WSO2CEPInFlightRequestPublisher() {
-        super(createStreamDefinition());
-    }
-
-    /**
-     * Publish in-flight request count of a cluster.
-     *
-     * @param clusterId
-     * @param networkPartitionId
-     * @param inFlightRequestCount
-     */
-    public void publish(String clusterId, String networkPartitionId, int inFlightRequestCount) {
-        List<Object> payload = new ArrayList<Object>();
-        // Payload values
-        payload.add(clusterId);
-        payload.add(networkPartitionId);
-        payload.add(inFlightRequestCount);
-        super.publish(payload.toArray());
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/790b2ea2/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/WSO2CEPStatsPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/WSO2CEPStatsPublisher.java b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/WSO2CEPStatsPublisher.java
deleted file mode 100644
index 03df6f2..0000000
--- a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/WSO2CEPStatsPublisher.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.stratos.load.balancer.common.statistics;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.wso2.carbon.databridge.agent.thrift.Agent;
-import org.wso2.carbon.databridge.agent.thrift.AsyncDataPublisher;
-import org.wso2.carbon.databridge.agent.thrift.conf.AgentConfiguration;
-import org.wso2.carbon.databridge.agent.thrift.exception.AgentException;
-import org.wso2.carbon.databridge.commons.Event;
-import org.wso2.carbon.databridge.commons.StreamDefinition;
-
-import java.util.HashMap;
-
-/**
- * WSO2 CEP statistics publisher for the load balancer.
- */
-public class WSO2CEPStatsPublisher implements LoadBalancerStatsPublisher {
-    private static final Log log = LogFactory.getLog(WSO2CEPStatsPublisher.class);
-
-    private StreamDefinition streamDefinition;
-    private AsyncDataPublisher asyncDataPublisher;
-    private String ip;
-    private String port;
-    private String username;
-    private String password;
-    private boolean enabled = false;
-
-    public WSO2CEPStatsPublisher(StreamDefinition streamDefinition) {
-        this.streamDefinition = streamDefinition;
-        this.ip = System.getProperty("thrift.receiver.ip");
-        this.port = System.getProperty("thrift.receiver.port");
-        this.username = "admin";
-        this.password = "admin";
-        String enabledStr = System.getProperty("load.balancer.cep.stats.publisher.enabled");
-        if (StringUtils.isNotBlank(enabledStr)) {
-            enabled = Boolean.getBoolean(enabledStr);
-            if (enabled) {
-                init();
-            }
-        }
-    }
-
-    private void init() {
-        AgentConfiguration agentConfiguration = new AgentConfiguration();
-        Agent agent = new Agent(agentConfiguration);
-
-        // Initialize asynchronous data publisher
-        asyncDataPublisher = new AsyncDataPublisher("tcp://" + ip + ":" + port + "", username, password, agent);
-        asyncDataPublisher.addStreamDefinition(streamDefinition);
-    }
-
-    @Override
-    public void setEnabled(boolean enabled) {
-        this.enabled = enabled;
-        if (this.enabled) {
-            init();
-        }
-    }
-
-    @Override
-    public boolean isEnabled() {
-        return enabled;
-    }
-
-    @Override
-    public void publish(Object[] payload) {
-        if (!isEnabled()) {
-            throw new RuntimeException("Statistics publisher is not enabled");
-        }
-
-        Event event = new Event();
-        event.setPayloadData(payload);
-        event.setArbitraryDataMap(new HashMap<String, String>());
-
-        try {
-            if (log.isDebugEnabled()) {
-                log.debug(String.format("Publishing cep event: [stream] %s [version] %s", streamDefinition.getName(), streamDefinition.getVersion()));
-            }
-            asyncDataPublisher.publish(streamDefinition.getName(), streamDefinition.getVersion(), event);
-        } catch (AgentException e) {
-            if (log.isErrorEnabled()) {
-                log.error(String.format("Could not publish cep event: [stream] %s [version] %s", streamDefinition.getName(), streamDefinition.getVersion()), e);
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/790b2ea2/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/publisher/WSO2CEPFaultyMemberPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/publisher/WSO2CEPFaultyMemberPublisher.java b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/publisher/WSO2CEPFaultyMemberPublisher.java
new file mode 100644
index 0000000..cf21e5b
--- /dev/null
+++ b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/publisher/WSO2CEPFaultyMemberPublisher.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.load.balancer.common.statistics.publisher;
+
+import org.apache.stratos.common.statistics.publisher.WSO2CEPStatisticsPublisher;
+import org.wso2.carbon.databridge.commons.Attribute;
+import org.wso2.carbon.databridge.commons.AttributeType;
+import org.wso2.carbon.databridge.commons.StreamDefinition;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * WSO2 CEP faulty member publisher.
+ * <p/>
+ * Faulty members:
+ * If a request was rejected by some of the members in a cluster while at least
+ * one member could serve it, those members could be identified as faulty.
+ */
+public class WSO2CEPFaultyMemberPublisher extends WSO2CEPStatisticsPublisher {
+
+    private static final String DATA_STREAM_NAME = "stratos.lb.faulty.members";
+    private static final String VERSION = "1.0.0";
+
+    private static StreamDefinition createStreamDefinition() {
+        try {
+            StreamDefinition streamDefinition = new StreamDefinition(DATA_STREAM_NAME, VERSION);
+            streamDefinition.setNickName("lb fault members");
+            streamDefinition.setDescription("lb fault members");
+            List<Attribute> payloadData = new ArrayList<Attribute>();
+            // Payload definition
+            payloadData.add(new Attribute("cluster_id", AttributeType.STRING));
+            payloadData.add(new Attribute("member_id", AttributeType.STRING));
+            streamDefinition.setPayloadData(payloadData);
+            return streamDefinition;
+        } catch (Exception e) {
+            throw new RuntimeException("Could not create stream definition", e);
+        }
+    }
+
+    public WSO2CEPFaultyMemberPublisher() {
+        super(createStreamDefinition());
+    }
+
+    /**
+     * Publish faulty members.
+     *
+     * @param clusterId
+     * @param memberId
+     */
+    public void publish(String clusterId, String memberId) {
+        List<Object> payload = new ArrayList<Object>();
+        // Payload values
+        payload.add(clusterId);
+        payload.add(memberId);
+        super.publish(payload.toArray());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/790b2ea2/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/publisher/WSO2CEPInFlightRequestPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/publisher/WSO2CEPInFlightRequestPublisher.java b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/publisher/WSO2CEPInFlightRequestPublisher.java
new file mode 100644
index 0000000..7e52ba0
--- /dev/null
+++ b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/publisher/WSO2CEPInFlightRequestPublisher.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.load.balancer.common.statistics.publisher;
+
+import org.apache.stratos.common.statistics.publisher.WSO2CEPStatisticsPublisher;
+import org.wso2.carbon.databridge.commons.Attribute;
+import org.wso2.carbon.databridge.commons.AttributeType;
+import org.wso2.carbon.databridge.commons.StreamDefinition;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * WSO2 CEP in flight request count publisher.
+ * <p/>
+ * In-flight request count:
+ * Number of requests being served at a given moment could be identified as in-flight request count.
+ */
+public class WSO2CEPInFlightRequestPublisher extends WSO2CEPStatisticsPublisher {
+
+    private static final String DATA_STREAM_NAME = "in_flight_requests";
+    private static final String VERSION = "1.0.0";
+
+    private static StreamDefinition createStreamDefinition() {
+        try {
+            StreamDefinition streamDefinition = new StreamDefinition(DATA_STREAM_NAME, VERSION);
+            streamDefinition.setNickName("lb stats");
+            streamDefinition.setDescription("lb stats");
+            List<Attribute> payloadData = new ArrayList<Attribute>();
+            // Payload definition
+            payloadData.add(new Attribute("cluster_id", AttributeType.STRING));
+            payloadData.add(new Attribute("network_partition_id", AttributeType.STRING));
+            payloadData.add(new Attribute("in_flight_request_count", AttributeType.INT));
+            streamDefinition.setPayloadData(payloadData);
+            return streamDefinition;
+        } catch (Exception e) {
+            throw new RuntimeException("Could not create stream definition", e);
+        }
+    }
+
+    public WSO2CEPInFlightRequestPublisher() {
+        super(createStreamDefinition());
+    }
+
+    /**
+     * Publish in-flight request count of a cluster.
+     *
+     * @param clusterId
+     * @param networkPartitionId
+     * @param inFlightRequestCount
+     */
+    public void publish(String clusterId, String networkPartitionId, int inFlightRequestCount) {
+        List<Object> payload = new ArrayList<Object>();
+        // Payload values
+        payload.add(clusterId);
+        payload.add(networkPartitionId);
+        payload.add(inFlightRequestCount);
+        super.publish(payload.toArray());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/790b2ea2/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerInFlightRequestCountNotifier.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerInFlightRequestCountNotifier.java b/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerInFlightRequestCountNotifier.java
index 5fa3c71..c5bf52b 100644
--- a/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerInFlightRequestCountNotifier.java
+++ b/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerInFlightRequestCountNotifier.java
@@ -22,13 +22,11 @@ package org.apache.stratos.load.balancer.extension.api;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.load.balancer.common.statistics.WSO2CEPInFlightRequestPublisher;
+import org.apache.stratos.load.balancer.common.statistics.publisher.WSO2CEPInFlightRequestPublisher;
 import org.apache.stratos.messaging.domain.topology.Cluster;
 import org.apache.stratos.messaging.domain.topology.Service;
 import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
 
-import java.util.Collection;
-
 /**
  * Load balancer statistics notifier thread for publishing statistics periodically to CEP.
  */

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/790b2ea2/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/observers/WSO2CEPInFlightRequestCountObserver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/observers/WSO2CEPInFlightRequestCountObserver.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/observers/WSO2CEPInFlightRequestCountObserver.java
index 390905e..47a2602 100644
--- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/observers/WSO2CEPInFlightRequestCountObserver.java
+++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/observers/WSO2CEPInFlightRequestCountObserver.java
@@ -21,7 +21,7 @@ package org.apache.stratos.load.balancer.statistics.observers;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.load.balancer.common.statistics.WSO2CEPInFlightRequestPublisher;
+import org.apache.stratos.load.balancer.common.statistics.publisher.WSO2CEPInFlightRequestPublisher;
 
 import java.util.Map;
 import java.util.Observable;


[2/2] git commit: Merge remote-tracking branch 'origin/master'

Posted by im...@apache.org.
Merge remote-tracking branch 'origin/master'


Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/880c346a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/880c346a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/880c346a

Branch: refs/heads/master
Commit: 880c346a1a3ca831158ce4a0c3003f74276c5baa
Parents: 790b2ea ba1fe83
Author: Imesh Gunaratne <im...@apache.org>
Authored: Thu Dec 19 12:50:55 2013 +0530
Committer: Imesh Gunaratne <im...@apache.org>
Committed: Thu Dec 19 12:50:55 2013 +0530

----------------------------------------------------------------------
 .../stratos/adc/mgt/deploy/service/Service.java |   5 +-
 .../service/ServiceDeploymentManager.java       |  15 +-
 .../service/multitenant/MultiTenantService.java |   5 +-
 .../mgt/listener/InstanceStatusListener.java    |  19 +-
 .../manager/CartridgeSubscriptionManager.java   |  12 ++
 .../adc/mgt/utils/PersistenceManager.java       |  42 ++++
 .../java/org/apache/stratos/cli/RestClient.java |  16 +-
 .../stratos/cli/RestCommandLineService.java     |  34 +--
 .../rest/endpoint/services/ServiceUtils.java    |  17 +-
 .../extension/FaultHandlingWindowProcessor.java |   6 +-
 .../cartridge-agent/ec2/php/cartridge-agent.sh  |  12 +-
 .../subscriber/CartridgeAgentConstants.java     |   5 +
 .../cartridge/agent/event/subscriber/Main.java  | 207 ++++++++++++++-----
 .../puppet/etc/puppet/files/cartridge-agent.sh  |  12 +-
 14 files changed, 297 insertions(+), 110 deletions(-)
----------------------------------------------------------------------