You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by ma...@apache.org on 2013/11/21 11:49:06 UTC

git commit: CEP extention to detect inactive health publishers

Updated Branches:
  refs/heads/master f02cc2bf9 -> ff900c43c


CEP extention to detect inactive health publishers


Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/ff900c43
Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/ff900c43
Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/ff900c43

Branch: refs/heads/master
Commit: ff900c43c7cb050a624994a6d6e08e26247363f5
Parents: f02cc2b
Author: Manula Thantriwatte <ma...@apache.org>
Authored: Thu Nov 21 16:11:35 2013 +0530
Committer: Manula Thantriwatte <ma...@apache.org>
Committed: Thu Nov 21 16:11:35 2013 +0530

----------------------------------------------------------------------
 .../extension/FaultHandlingWindowProcessor.java | 180 +++++++++++++++++++
 .../src/main/bin/health-publisher.sh            |   2 +-
 .../agent/health/publisher/HealthPublisher.java |  23 +--
 .../health/publisher/HealthPublisherClient.java |  54 +++---
 .../cartridge/agent/health/publisher/Main.java  |  26 +--
 5 files changed, 234 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/ff900c43/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
----------------------------------------------------------------------
diff --git a/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java b/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
new file mode 100644
index 0000000..a995331
--- /dev/null
+++ b/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.cep.extension;
+
+import org.apache.log4j.Logger;
+import org.wso2.siddhi.core.config.SiddhiContext;
+import org.wso2.siddhi.core.event.StreamEvent;
+import org.wso2.siddhi.core.event.in.InEvent;
+import org.wso2.siddhi.core.event.in.InListEvent;
+import org.wso2.siddhi.core.persistence.ThreadBarrier;
+import org.wso2.siddhi.core.query.QueryPostProcessingElement;
+import org.wso2.siddhi.core.query.processor.window.RunnableWindowProcessor;
+import org.wso2.siddhi.core.query.processor.window.WindowProcessor;
+import org.wso2.siddhi.core.util.collection.queue.scheduler.ISchedulerSiddhiQueue;
+import org.wso2.siddhi.core.util.collection.queue.scheduler.SchedulerSiddhiQueue;
+import org.wso2.siddhi.core.util.collection.queue.scheduler.SchedulerSiddhiQueueGrid;
+import org.wso2.siddhi.query.api.definition.AbstractDefinition;
+import org.wso2.siddhi.query.api.expression.Expression;
+import org.wso2.siddhi.query.api.expression.Variable;
+import org.wso2.siddhi.query.api.expression.constant.IntConstant;
+import org.wso2.siddhi.query.api.expression.constant.LongConstant;
+import org.wso2.siddhi.query.api.extension.annotation.SiddhiExtension;
+
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+@SiddhiExtension(namespace = "stratos", function = "faultHandling")
+public class FaultHandlingWindowProcessor extends WindowProcessor implements RunnableWindowProcessor {
+
+    private static final int MILI_TO_MINUTE = 1000;
+    private static final int TIME_OUT = 100;
+
+    static final Logger log = Logger.getLogger(FaultHandlingWindowProcessor.class);
+    private ScheduledExecutorService eventRemoverScheduler;
+    private int subjectedAttrIndex;
+    private ThreadBarrier threadBarrier;
+    private long timeToKeep;
+    private ISchedulerSiddhiQueue<StreamEvent> window;
+    private ConcurrentHashMap<String, InEvent> timeStampMap = new ConcurrentHashMap<String, InEvent>();
+    private String memberID;
+
+    @Override
+    protected void processEvent(InEvent event) {
+        addDataToMap(event);
+    }
+
+    @Override
+    protected void processEvent(InListEvent listEvent) {
+        System.out.println(listEvent);
+        for (int i = 0, size = listEvent.getActiveEvents(); i < size; i++) {
+            addDataToMap((InEvent) listEvent.getEvent(i));
+        }
+    }
+
+    protected void addDataToMap(InEvent event) {
+        if (memberID != null) {
+            String id = (String)event.getData()[subjectedAttrIndex];
+            timeStampMap.put(id, event);
+        }
+        else {
+            System.out.println("Member ID null");
+            log.error("NULL Member ID");
+        }
+    }
+
+    @Override
+    public Iterator<StreamEvent> iterator() {
+        return window.iterator();
+    }
+
+    @Override
+    public Iterator<StreamEvent> iterator(String predicate) {
+        if (siddhiContext.isDistributedProcessingEnabled()) {
+            return ((SchedulerSiddhiQueueGrid<StreamEvent>) window).iterator(predicate);
+        } else {
+            return window.iterator();
+        }
+    }
+
+    @Override
+    public void run() {
+        try {
+            while (true) {
+                threadBarrier.pass();
+                Iterator it = timeStampMap.entrySet().iterator();
+
+                while ( it.hasNext() ) {
+                    Map.Entry pair = (Map.Entry)it.next();
+                    long currentTime = System.currentTimeMillis();
+                    InEvent event = (InEvent)pair.getValue();
+
+                    if ((currentTime - event.getTimeStamp()) / MILI_TO_MINUTE > TIME_OUT) {
+                        log.info("Member Inactive : " + pair.getKey());
+                        it.remove();
+                        nextProcessor.process(event);
+                    }
+                }
+            }
+        } catch (Throwable t) {
+            log.error(t.getMessage(), t);
+        }
+    }
+
+    @Override
+    protected Object[] currentState() {
+        return new Object[]{window.currentState()};
+    }
+
+    @Override
+    protected void restoreState(Object[] data) {
+        window.restoreState(data);
+        window.restoreState((Object[]) data[0]);
+        window.reSchedule();
+    }
+
+    @Override
+    protected void init(Expression[] parameters, QueryPostProcessingElement nextProcessor, AbstractDefinition streamDefinition, String elementId, boolean async, SiddhiContext siddhiContext) {
+        if (parameters[0] instanceof IntConstant) {
+            timeToKeep = ((IntConstant) parameters[0]).getValue();
+        } else {
+            timeToKeep = ((LongConstant) parameters[0]).getValue();
+        }
+
+        memberID = ((Variable)parameters[1]).getAttributeName();
+
+        String subjectedAttr = ((Variable)parameters[1]).getAttributeName();
+        subjectedAttrIndex = streamDefinition.getAttributePosition(subjectedAttr);
+
+        if (this.siddhiContext.isDistributedProcessingEnabled()) {
+            window = new SchedulerSiddhiQueueGrid<StreamEvent>(elementId, this, this.siddhiContext, this.async);
+        } else {
+            window = new SchedulerSiddhiQueue<StreamEvent>(this);
+        }
+
+        //Ordinary scheduling
+        window.schedule();
+
+    }
+
+    @Override
+    public void schedule() {
+        eventRemoverScheduler.schedule(this, timeToKeep, TimeUnit.MILLISECONDS);
+    }
+
+    public void scheduleNow() {
+        eventRemoverScheduler.schedule(this, 0, TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public void setScheduledExecutorService(ScheduledExecutorService scheduledExecutorService) {
+        this.eventRemoverScheduler = scheduledExecutorService;
+    }
+
+    public void setThreadBarrier(ThreadBarrier threadBarrier) {
+        this.threadBarrier = threadBarrier;
+    }
+
+    @Override
+    public void destroy(){
+        window = null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/ff900c43/products/cartridge-agent/modules/health-stats/src/main/bin/health-publisher.sh
----------------------------------------------------------------------
diff --git a/products/cartridge-agent/modules/health-stats/src/main/bin/health-publisher.sh b/products/cartridge-agent/modules/health-stats/src/main/bin/health-publisher.sh
index 2573f32..f5c0104 100755
--- a/products/cartridge-agent/modules/health-stats/src/main/bin/health-publisher.sh
+++ b/products/cartridge-agent/modules/health-stats/src/main/bin/health-publisher.sh
@@ -30,5 +30,5 @@ current_path=`pwd`
 
 java -cp $class_path -Dmember.id=$1 -Dkey.file.path=$current_path/../security/client-truststore.jks -Dthrift.receiver.ip=$2 -Dthrift.receiver.port=$3 org.apache.stratos.cartridge.agent.health.publisher.Main $*
 
-echo "Health publisher completed" 
+echo "Health publisher completed"
 

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/ff900c43/products/cartridge-agent/modules/health-stats/src/main/java/org/apache/stratos/cartridge/agent/health/publisher/HealthPublisher.java
----------------------------------------------------------------------
diff --git a/products/cartridge-agent/modules/health-stats/src/main/java/org/apache/stratos/cartridge/agent/health/publisher/HealthPublisher.java b/products/cartridge-agent/modules/health-stats/src/main/java/org/apache/stratos/cartridge/agent/health/publisher/HealthPublisher.java
index 7d56d06..671aa85 100644
--- a/products/cartridge-agent/modules/health-stats/src/main/java/org/apache/stratos/cartridge/agent/health/publisher/HealthPublisher.java
+++ b/products/cartridge-agent/modules/health-stats/src/main/java/org/apache/stratos/cartridge/agent/health/publisher/HealthPublisher.java
@@ -1,18 +1,18 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one 
+ * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
  * regarding copyright ownership.  The ASF licenses this file
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *  http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
- * KIND, either express or implied.  See the License for the 
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
  * under the License.
  */
@@ -69,7 +69,8 @@ public class HealthPublisher implements Observer {
                     " 'metaData':[]," +
                     " 'payloadData':[" +
                     " {'name':'health_description','type':'STRING'}," +
-                    " {'name':'value','type':'INT'}" +
+                    " {'name':'value','type':'DOUBLE'}," +
+                    " {'name':'member_id','type':'STRING'}" +
                     " ]" +
                     "}";
             asyncDataPublisher.addStreamDefinition(streamDefinition, CALL_CENTER_DATA_STREAM, VERSION);
@@ -83,23 +84,25 @@ public class HealthPublisher implements Observer {
 
     public void update(Observable arg0, Object arg1) {
         if (arg1 != null && arg1 instanceof Map<?, ?>) {
-            Map<String, Integer> stats = (Map<String, Integer>) arg1;
+            Map<String, Double> stats = (Map<String, Double>) arg1;
             publishEvents(stats);
         }
     }
 
     public void update(Object healthStatObj) {
         if (healthStatObj != null && healthStatObj instanceof Map<?, ?>) {
-            Map<String, Integer> stats = (Map<String, Integer>) healthStatObj;
+            Map<String, Double> stats = (Map<String, Double>) healthStatObj;
             publishEvents(stats);
         }
     }
 
-    private void publishEvents(Map<String, Integer> stats) {
+    private void publishEvents(Map<String, Double> stats) {
+
+        String memberID = System.getProperty("member.id");
 
-        for (Map.Entry<String, Integer> entry : stats.entrySet()) {
+        for (Map.Entry<String, Double> entry : stats.entrySet()) {
 
-            Object[] payload = new Object[]{entry.getKey(), entry.getValue()};
+            Object[] payload = new Object[]{entry.getKey(), entry.getValue(), memberID};
             Event event = eventObject(null, null, payload, new HashMap<String, String>());
             try {
                 asyncDataPublisher.publish(CALL_CENTER_DATA_STREAM, VERSION, event);

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/ff900c43/products/cartridge-agent/modules/health-stats/src/main/java/org/apache/stratos/cartridge/agent/health/publisher/HealthPublisherClient.java
----------------------------------------------------------------------
diff --git a/products/cartridge-agent/modules/health-stats/src/main/java/org/apache/stratos/cartridge/agent/health/publisher/HealthPublisherClient.java b/products/cartridge-agent/modules/health-stats/src/main/java/org/apache/stratos/cartridge/agent/health/publisher/HealthPublisherClient.java
index ed270c4..580e499 100644
--- a/products/cartridge-agent/modules/health-stats/src/main/java/org/apache/stratos/cartridge/agent/health/publisher/HealthPublisherClient.java
+++ b/products/cartridge-agent/modules/health-stats/src/main/java/org/apache/stratos/cartridge/agent/health/publisher/HealthPublisherClient.java
@@ -32,37 +32,37 @@ public class HealthPublisherClient {
 
     private static final int MB = 1024 * 1024;
 
-	public Object getHealthStats() {
-
-        String memberID = System.getProperty("member.id");
+    public Object getHealthStats() {
 
         Runtime runtime = Runtime.getRuntime();
 
-		Map<String, Object> statsMap = new HashMap<String, Object>();
+        Map<String, Double> statsMap = new HashMap<String, Double>();
 
         //statsMap.put("Available Processors", (int)runtime.availableProcessors());
-        statsMap.put("total_memory", (int)(runtime.totalMemory() / MB));
-        statsMap.put("max_memory", (int)(runtime.maxMemory() / MB));
-        statsMap.put("used_memory", (int)((runtime.totalMemory() - runtime.freeMemory()) / MB));
-        statsMap.put("free_memory", (int)(runtime.freeMemory() / MB));
-        statsMap.put("load_average", (int)ManagementFactory.getOperatingSystemMXBean().getSystemLoadAverage());
-        statsMap.put("member_id", memberID);
+        statsMap.put("total_memory", (double)(runtime.totalMemory() / MB));
+        //statsMap.put("max_memory", (int)(runtime.maxMemory() / MB));
+        statsMap.put("used_memory", (double)((runtime.totalMemory() - runtime.freeMemory()) / MB));
+        //statsMap.put("free_memory", (int)(runtime.freeMemory() / MB));
+        statsMap.put("load_average", (double)ManagementFactory.getOperatingSystemMXBean().getSystemLoadAverage());
+        //statsMap.put("member_id", Integer.parseInt(memberID));
+
+        Object statObj = (Object)statsMap;
+
+        return statObj;
+    }
+
+    public void run() {
+        try {
+            HealthPublisher publisher = new HealthPublisher();
+
+            while (true) {
+                Object healthStatObj = getHealthStats();
+                publisher.update(healthStatObj);
 
-        return statsMap;
-	}
-	
-	public void run() {
-		try {
-			HealthPublisher publisher = new HealthPublisher();
-			
-			while (true) {
-				Object healthStatObj = getHealthStats();
-				publisher.update(healthStatObj);
-			
-				Thread.sleep(10000);
-			}
-		} catch(InterruptedException ex) {
-		    Thread.currentThread().interrupt();
-		}
-	}
+                Thread.sleep(10000);
+            }
+        } catch(InterruptedException ex) {
+            Thread.currentThread().interrupt();
+        }
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/ff900c43/products/cartridge-agent/modules/health-stats/src/main/java/org/apache/stratos/cartridge/agent/health/publisher/Main.java
----------------------------------------------------------------------
diff --git a/products/cartridge-agent/modules/health-stats/src/main/java/org/apache/stratos/cartridge/agent/health/publisher/Main.java b/products/cartridge-agent/modules/health-stats/src/main/java/org/apache/stratos/cartridge/agent/health/publisher/Main.java
index 13a8e67..98c6224 100644
--- a/products/cartridge-agent/modules/health-stats/src/main/java/org/apache/stratos/cartridge/agent/health/publisher/Main.java
+++ b/products/cartridge-agent/modules/health-stats/src/main/java/org/apache/stratos/cartridge/agent/health/publisher/Main.java
@@ -27,27 +27,27 @@ import org.apache.commons.logging.LogFactory;
  */
 
 public class Main {
-	private static final Log log = LogFactory.getLog(Main.class);
-	
-	public static void main (String args[]) {
-		try {
+    private static final Log log = LogFactory.getLog(Main.class);
+
+    public static void main (String args[]) {
+        try {
             if (log.isInfoEnabled()) {
                 log.info("Health publisher started");
             }
-            
+
             System.out.println("This is health stat publisher module");
-            
+
             HealthPublisherClient client = new HealthPublisherClient();
             client.run();
-            
+
             System.exit(0);
-			
-		} catch (Exception e) {
+
+        } catch (Exception e) {
             if (log.isErrorEnabled()) {
                 log.error("Could not publish health stats", e);
             }
-		}
-		
-		System.exit(-1);
-	}
+        }
+
+        System.exit(-1);
+    }
 }
\ No newline at end of file