You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by ma...@apache.org on 2013/11/21 11:49:06 UTC
git commit: CEP extention to detect inactive health publishers
Updated Branches:
refs/heads/master f02cc2bf9 -> ff900c43c
CEP extention to detect inactive health publishers
Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/ff900c43
Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/ff900c43
Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/ff900c43
Branch: refs/heads/master
Commit: ff900c43c7cb050a624994a6d6e08e26247363f5
Parents: f02cc2b
Author: Manula Thantriwatte <ma...@apache.org>
Authored: Thu Nov 21 16:11:35 2013 +0530
Committer: Manula Thantriwatte <ma...@apache.org>
Committed: Thu Nov 21 16:11:35 2013 +0530
----------------------------------------------------------------------
.../extension/FaultHandlingWindowProcessor.java | 180 +++++++++++++++++++
.../src/main/bin/health-publisher.sh | 2 +-
.../agent/health/publisher/HealthPublisher.java | 23 +--
.../health/publisher/HealthPublisherClient.java | 54 +++---
.../cartridge/agent/health/publisher/Main.java | 26 +--
5 files changed, 234 insertions(+), 51 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/ff900c43/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
----------------------------------------------------------------------
diff --git a/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java b/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
new file mode 100644
index 0000000..a995331
--- /dev/null
+++ b/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.cep.extension;
+
+import org.apache.log4j.Logger;
+import org.wso2.siddhi.core.config.SiddhiContext;
+import org.wso2.siddhi.core.event.StreamEvent;
+import org.wso2.siddhi.core.event.in.InEvent;
+import org.wso2.siddhi.core.event.in.InListEvent;
+import org.wso2.siddhi.core.persistence.ThreadBarrier;
+import org.wso2.siddhi.core.query.QueryPostProcessingElement;
+import org.wso2.siddhi.core.query.processor.window.RunnableWindowProcessor;
+import org.wso2.siddhi.core.query.processor.window.WindowProcessor;
+import org.wso2.siddhi.core.util.collection.queue.scheduler.ISchedulerSiddhiQueue;
+import org.wso2.siddhi.core.util.collection.queue.scheduler.SchedulerSiddhiQueue;
+import org.wso2.siddhi.core.util.collection.queue.scheduler.SchedulerSiddhiQueueGrid;
+import org.wso2.siddhi.query.api.definition.AbstractDefinition;
+import org.wso2.siddhi.query.api.expression.Expression;
+import org.wso2.siddhi.query.api.expression.Variable;
+import org.wso2.siddhi.query.api.expression.constant.IntConstant;
+import org.wso2.siddhi.query.api.expression.constant.LongConstant;
+import org.wso2.siddhi.query.api.extension.annotation.SiddhiExtension;
+
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+@SiddhiExtension(namespace = "stratos", function = "faultHandling")
+public class FaultHandlingWindowProcessor extends WindowProcessor implements RunnableWindowProcessor {
+
+ private static final int MILI_TO_MINUTE = 1000;
+ private static final int TIME_OUT = 100;
+
+ static final Logger log = Logger.getLogger(FaultHandlingWindowProcessor.class);
+ private ScheduledExecutorService eventRemoverScheduler;
+ private int subjectedAttrIndex;
+ private ThreadBarrier threadBarrier;
+ private long timeToKeep;
+ private ISchedulerSiddhiQueue<StreamEvent> window;
+ private ConcurrentHashMap<String, InEvent> timeStampMap = new ConcurrentHashMap<String, InEvent>();
+ private String memberID;
+
+ @Override
+ protected void processEvent(InEvent event) {
+ addDataToMap(event);
+ }
+
+ @Override
+ protected void processEvent(InListEvent listEvent) {
+ System.out.println(listEvent);
+ for (int i = 0, size = listEvent.getActiveEvents(); i < size; i++) {
+ addDataToMap((InEvent) listEvent.getEvent(i));
+ }
+ }
+
+ protected void addDataToMap(InEvent event) {
+ if (memberID != null) {
+ String id = (String)event.getData()[subjectedAttrIndex];
+ timeStampMap.put(id, event);
+ }
+ else {
+ System.out.println("Member ID null");
+ log.error("NULL Member ID");
+ }
+ }
+
+ @Override
+ public Iterator<StreamEvent> iterator() {
+ return window.iterator();
+ }
+
+ @Override
+ public Iterator<StreamEvent> iterator(String predicate) {
+ if (siddhiContext.isDistributedProcessingEnabled()) {
+ return ((SchedulerSiddhiQueueGrid<StreamEvent>) window).iterator(predicate);
+ } else {
+ return window.iterator();
+ }
+ }
+
+ @Override
+ public void run() {
+ try {
+ while (true) {
+ threadBarrier.pass();
+ Iterator it = timeStampMap.entrySet().iterator();
+
+ while ( it.hasNext() ) {
+ Map.Entry pair = (Map.Entry)it.next();
+ long currentTime = System.currentTimeMillis();
+ InEvent event = (InEvent)pair.getValue();
+
+ if ((currentTime - event.getTimeStamp()) / MILI_TO_MINUTE > TIME_OUT) {
+ log.info("Member Inactive : " + pair.getKey());
+ it.remove();
+ nextProcessor.process(event);
+ }
+ }
+ }
+ } catch (Throwable t) {
+ log.error(t.getMessage(), t);
+ }
+ }
+
+ @Override
+ protected Object[] currentState() {
+ return new Object[]{window.currentState()};
+ }
+
+ @Override
+ protected void restoreState(Object[] data) {
+ window.restoreState(data);
+ window.restoreState((Object[]) data[0]);
+ window.reSchedule();
+ }
+
+ @Override
+ protected void init(Expression[] parameters, QueryPostProcessingElement nextProcessor, AbstractDefinition streamDefinition, String elementId, boolean async, SiddhiContext siddhiContext) {
+ if (parameters[0] instanceof IntConstant) {
+ timeToKeep = ((IntConstant) parameters[0]).getValue();
+ } else {
+ timeToKeep = ((LongConstant) parameters[0]).getValue();
+ }
+
+ memberID = ((Variable)parameters[1]).getAttributeName();
+
+ String subjectedAttr = ((Variable)parameters[1]).getAttributeName();
+ subjectedAttrIndex = streamDefinition.getAttributePosition(subjectedAttr);
+
+ if (this.siddhiContext.isDistributedProcessingEnabled()) {
+ window = new SchedulerSiddhiQueueGrid<StreamEvent>(elementId, this, this.siddhiContext, this.async);
+ } else {
+ window = new SchedulerSiddhiQueue<StreamEvent>(this);
+ }
+
+ //Ordinary scheduling
+ window.schedule();
+
+ }
+
+ @Override
+ public void schedule() {
+ eventRemoverScheduler.schedule(this, timeToKeep, TimeUnit.MILLISECONDS);
+ }
+
+ public void scheduleNow() {
+ eventRemoverScheduler.schedule(this, 0, TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public void setScheduledExecutorService(ScheduledExecutorService scheduledExecutorService) {
+ this.eventRemoverScheduler = scheduledExecutorService;
+ }
+
+ public void setThreadBarrier(ThreadBarrier threadBarrier) {
+ this.threadBarrier = threadBarrier;
+ }
+
+ @Override
+ public void destroy(){
+ window = null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/ff900c43/products/cartridge-agent/modules/health-stats/src/main/bin/health-publisher.sh
----------------------------------------------------------------------
diff --git a/products/cartridge-agent/modules/health-stats/src/main/bin/health-publisher.sh b/products/cartridge-agent/modules/health-stats/src/main/bin/health-publisher.sh
index 2573f32..f5c0104 100755
--- a/products/cartridge-agent/modules/health-stats/src/main/bin/health-publisher.sh
+++ b/products/cartridge-agent/modules/health-stats/src/main/bin/health-publisher.sh
@@ -30,5 +30,5 @@ current_path=`pwd`
java -cp $class_path -Dmember.id=$1 -Dkey.file.path=$current_path/../security/client-truststore.jks -Dthrift.receiver.ip=$2 -Dthrift.receiver.port=$3 org.apache.stratos.cartridge.agent.health.publisher.Main $*
-echo "Health publisher completed"
+echo "Health publisher completed"
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/ff900c43/products/cartridge-agent/modules/health-stats/src/main/java/org/apache/stratos/cartridge/agent/health/publisher/HealthPublisher.java
----------------------------------------------------------------------
diff --git a/products/cartridge-agent/modules/health-stats/src/main/java/org/apache/stratos/cartridge/agent/health/publisher/HealthPublisher.java b/products/cartridge-agent/modules/health-stats/src/main/java/org/apache/stratos/cartridge/agent/health/publisher/HealthPublisher.java
index 7d56d06..671aa85 100644
--- a/products/cartridge-agent/modules/health-stats/src/main/java/org/apache/stratos/cartridge/agent/health/publisher/HealthPublisher.java
+++ b/products/cartridge-agent/modules/health-stats/src/main/java/org/apache/stratos/cartridge/agent/health/publisher/HealthPublisher.java
@@ -1,18 +1,18 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one
+ * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
@@ -69,7 +69,8 @@ public class HealthPublisher implements Observer {
" 'metaData':[]," +
" 'payloadData':[" +
" {'name':'health_description','type':'STRING'}," +
- " {'name':'value','type':'INT'}" +
+ " {'name':'value','type':'DOUBLE'}," +
+ " {'name':'member_id','type':'STRING'}" +
" ]" +
"}";
asyncDataPublisher.addStreamDefinition(streamDefinition, CALL_CENTER_DATA_STREAM, VERSION);
@@ -83,23 +84,25 @@ public class HealthPublisher implements Observer {
public void update(Observable arg0, Object arg1) {
if (arg1 != null && arg1 instanceof Map<?, ?>) {
- Map<String, Integer> stats = (Map<String, Integer>) arg1;
+ Map<String, Double> stats = (Map<String, Double>) arg1;
publishEvents(stats);
}
}
public void update(Object healthStatObj) {
if (healthStatObj != null && healthStatObj instanceof Map<?, ?>) {
- Map<String, Integer> stats = (Map<String, Integer>) healthStatObj;
+ Map<String, Double> stats = (Map<String, Double>) healthStatObj;
publishEvents(stats);
}
}
- private void publishEvents(Map<String, Integer> stats) {
+ private void publishEvents(Map<String, Double> stats) {
+
+ String memberID = System.getProperty("member.id");
- for (Map.Entry<String, Integer> entry : stats.entrySet()) {
+ for (Map.Entry<String, Double> entry : stats.entrySet()) {
- Object[] payload = new Object[]{entry.getKey(), entry.getValue()};
+ Object[] payload = new Object[]{entry.getKey(), entry.getValue(), memberID};
Event event = eventObject(null, null, payload, new HashMap<String, String>());
try {
asyncDataPublisher.publish(CALL_CENTER_DATA_STREAM, VERSION, event);
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/ff900c43/products/cartridge-agent/modules/health-stats/src/main/java/org/apache/stratos/cartridge/agent/health/publisher/HealthPublisherClient.java
----------------------------------------------------------------------
diff --git a/products/cartridge-agent/modules/health-stats/src/main/java/org/apache/stratos/cartridge/agent/health/publisher/HealthPublisherClient.java b/products/cartridge-agent/modules/health-stats/src/main/java/org/apache/stratos/cartridge/agent/health/publisher/HealthPublisherClient.java
index ed270c4..580e499 100644
--- a/products/cartridge-agent/modules/health-stats/src/main/java/org/apache/stratos/cartridge/agent/health/publisher/HealthPublisherClient.java
+++ b/products/cartridge-agent/modules/health-stats/src/main/java/org/apache/stratos/cartridge/agent/health/publisher/HealthPublisherClient.java
@@ -32,37 +32,37 @@ public class HealthPublisherClient {
private static final int MB = 1024 * 1024;
- public Object getHealthStats() {
-
- String memberID = System.getProperty("member.id");
+ public Object getHealthStats() {
Runtime runtime = Runtime.getRuntime();
- Map<String, Object> statsMap = new HashMap<String, Object>();
+ Map<String, Double> statsMap = new HashMap<String, Double>();
//statsMap.put("Available Processors", (int)runtime.availableProcessors());
- statsMap.put("total_memory", (int)(runtime.totalMemory() / MB));
- statsMap.put("max_memory", (int)(runtime.maxMemory() / MB));
- statsMap.put("used_memory", (int)((runtime.totalMemory() - runtime.freeMemory()) / MB));
- statsMap.put("free_memory", (int)(runtime.freeMemory() / MB));
- statsMap.put("load_average", (int)ManagementFactory.getOperatingSystemMXBean().getSystemLoadAverage());
- statsMap.put("member_id", memberID);
+ statsMap.put("total_memory", (double)(runtime.totalMemory() / MB));
+ //statsMap.put("max_memory", (int)(runtime.maxMemory() / MB));
+ statsMap.put("used_memory", (double)((runtime.totalMemory() - runtime.freeMemory()) / MB));
+ //statsMap.put("free_memory", (int)(runtime.freeMemory() / MB));
+ statsMap.put("load_average", (double)ManagementFactory.getOperatingSystemMXBean().getSystemLoadAverage());
+ //statsMap.put("member_id", Integer.parseInt(memberID));
+
+ Object statObj = (Object)statsMap;
+
+ return statObj;
+ }
+
+ public void run() {
+ try {
+ HealthPublisher publisher = new HealthPublisher();
+
+ while (true) {
+ Object healthStatObj = getHealthStats();
+ publisher.update(healthStatObj);
- return statsMap;
- }
-
- public void run() {
- try {
- HealthPublisher publisher = new HealthPublisher();
-
- while (true) {
- Object healthStatObj = getHealthStats();
- publisher.update(healthStatObj);
-
- Thread.sleep(10000);
- }
- } catch(InterruptedException ex) {
- Thread.currentThread().interrupt();
- }
- }
+ Thread.sleep(10000);
+ }
+ } catch(InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ }
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/ff900c43/products/cartridge-agent/modules/health-stats/src/main/java/org/apache/stratos/cartridge/agent/health/publisher/Main.java
----------------------------------------------------------------------
diff --git a/products/cartridge-agent/modules/health-stats/src/main/java/org/apache/stratos/cartridge/agent/health/publisher/Main.java b/products/cartridge-agent/modules/health-stats/src/main/java/org/apache/stratos/cartridge/agent/health/publisher/Main.java
index 13a8e67..98c6224 100644
--- a/products/cartridge-agent/modules/health-stats/src/main/java/org/apache/stratos/cartridge/agent/health/publisher/Main.java
+++ b/products/cartridge-agent/modules/health-stats/src/main/java/org/apache/stratos/cartridge/agent/health/publisher/Main.java
@@ -27,27 +27,27 @@ import org.apache.commons.logging.LogFactory;
*/
public class Main {
- private static final Log log = LogFactory.getLog(Main.class);
-
- public static void main (String args[]) {
- try {
+ private static final Log log = LogFactory.getLog(Main.class);
+
+ public static void main (String args[]) {
+ try {
if (log.isInfoEnabled()) {
log.info("Health publisher started");
}
-
+
System.out.println("This is health stat publisher module");
-
+
HealthPublisherClient client = new HealthPublisherClient();
client.run();
-
+
System.exit(0);
-
- } catch (Exception e) {
+
+ } catch (Exception e) {
if (log.isErrorEnabled()) {
log.error("Could not publish health stats", e);
}
- }
-
- System.exit(-1);
- }
+ }
+
+ System.exit(-1);
+ }
}
\ No newline at end of file