You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by re...@apache.org on 2014/10/27 09:44:36 UTC

[2/2] git commit: fixing cep listening on topology issue by merging with master

fixing cep listening on topology issue by merging with master


Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/7a5797df
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/7a5797df
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/7a5797df

Branch: refs/heads/4.0.0-grouping
Commit: 7a5797df08bbf92483cd5993e6ce9ca576686e3d
Parents: bea691b
Author: reka <rt...@gmail.com>
Authored: Mon Oct 27 14:10:15 2014 +0530
Committer: reka <rt...@gmail.com>
Committed: Mon Oct 27 14:14:22 2014 +0530

----------------------------------------------------------------------
 .../cep/extension/CEPTopologyEventReceiver.java | 125 +++++++++++
 .../extension/FaultHandlingWindowProcessor.java | 217 ++++++++++++-------
 2 files changed, 267 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/stratos/blob/7a5797df/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java
----------------------------------------------------------------------
diff --git a/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java b/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java
new file mode 100644
index 0000000..90c67f0
--- /dev/null
+++ b/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.cep.extension;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.event.Event;
+import org.apache.stratos.messaging.event.topology.CompleteTopologyEvent;
+import org.apache.stratos.messaging.event.topology.MemberActivatedEvent;
+import org.apache.stratos.messaging.event.topology.MemberTerminatedEvent;
+import org.apache.stratos.messaging.listener.topology.CompleteTopologyEventListener;
+import org.apache.stratos.messaging.listener.topology.MemberActivatedEventListener;
+import org.apache.stratos.messaging.listener.topology.MemberTerminatedEventListener;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyEventReceiver;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
+
+/**
+ * CEP Topology Receiver for Fault Handling Window Processor.
+ */
+public class CEPTopologyEventReceiver implements Runnable {
+
+    private static final Log log = LogFactory.getLog(CEPTopologyEventReceiver.class);
+
+    private TopologyEventReceiver topologyEventReceiver;
+    private boolean terminated;
+    private FaultHandlingWindowProcessor faultHandler;
+
+    public CEPTopologyEventReceiver(FaultHandlingWindowProcessor faultHandler) {
+        this.topologyEventReceiver = new TopologyEventReceiver();
+        this.faultHandler = faultHandler;
+        addEventListeners();
+    }
+
+    private void addEventListeners() {
+        // Load member time stamp map from the topology as a one time task
+        topologyEventReceiver.addEventListener(new CompleteTopologyEventListener() {
+            private boolean initialized;
+
+            @Override
+            protected void onEvent(Event event) {
+                if (!initialized) {
+                    try {
+                        TopologyManager.acquireReadLock();
+                        log.info("Complete topology event received to fault handling window processor.");
+                        CompleteTopologyEvent completeTopologyEvent = (CompleteTopologyEvent) event;
+                        initialized = faultHandler.loadTimeStampMapFromTopology(completeTopologyEvent.getTopology());
+                    } catch (Exception e) {
+                        log.error("Error loading member time stamp map from complete topology event.", e);
+                    } finally {
+                        TopologyManager.releaseReadLock();
+                    }
+                }
+            }
+        });
+
+        // Remove member from the time stamp map when MemberTerminated event is received.
+        topologyEventReceiver.addEventListener(new MemberTerminatedEventListener() {
+            @Override
+            protected void onEvent(Event event) {
+                MemberTerminatedEvent memberTerminatedEvent = (MemberTerminatedEvent) event;
+                faultHandler.getMemberTimeStampMap().remove(memberTerminatedEvent.getMemberId());
+                log.info("Member [member id] " + memberTerminatedEvent.getMemberId() +
+                        " was removed from the time stamp map.");
+            }
+        });
+
+        // Add member to time stamp map whenever member is activated
+        topologyEventReceiver.addEventListener(new MemberActivatedEventListener() {
+            @Override
+            protected void onEvent(Event event) {
+                MemberActivatedEvent memberActivatedEvent = (MemberActivatedEvent) event;
+
+                // do not put this member if we have already received a health event
+                faultHandler.getMemberTimeStampMap().putIfAbsent(memberActivatedEvent.getMemberId(), System.currentTimeMillis());
+                log.info("Member [member id] " + memberActivatedEvent.getMemberId() +
+                        " was added to the time stamp map.");
+            }
+        });
+    }
+
+    @Override
+    public void run() {
+        try {
+            Thread.sleep(15000);
+        } catch (InterruptedException ignore) {
+        }
+        Thread thread = new Thread(topologyEventReceiver);
+        thread.start();
+        log.info("CEP topology receiver thread started");
+
+        // Keep the thread live until terminated
+        while (!terminated) {
+            try {
+                Thread.sleep(1000);
+            } catch (InterruptedException ignore) {
+            }
+        }
+        log.info("CEP topology receiver thread terminated");
+    }
+
+    /**
+     * Terminate CEP topology receiver thread.
+     */
+    public void terminate() {
+        topologyEventReceiver.terminate();
+        terminated = true;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/stratos/blob/7a5797df/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
----------------------------------------------------------------------
diff --git a/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java b/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
index 80174f4..0104a03 100644
--- a/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
+++ b/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
@@ -18,14 +18,19 @@
  */
 package org.apache.stratos.cep.extension;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.log4j.Logger;
 import org.apache.stratos.messaging.broker.publish.EventPublisher;
 import org.apache.stratos.messaging.broker.publish.EventPublisherPool;
-import org.apache.stratos.messaging.domain.topology.Cluster;
-import org.apache.stratos.messaging.domain.topology.Member;
-import org.apache.stratos.messaging.domain.topology.MemberStatus;
-import org.apache.stratos.messaging.domain.topology.Service;
+import org.apache.stratos.messaging.domain.topology.*;
+import org.apache.stratos.messaging.event.Event;
 import org.apache.stratos.messaging.event.health.stat.MemberFaultEvent;
+import org.apache.stratos.messaging.event.topology.CompleteTopologyEvent;
+import org.apache.stratos.messaging.event.topology.MemberActivatedEvent;
+import org.apache.stratos.messaging.event.topology.MemberTerminatedEvent;
+import org.apache.stratos.messaging.listener.topology.CompleteTopologyEventListener;
+import org.apache.stratos.messaging.listener.topology.MemberActivatedEventListener;
+import org.apache.stratos.messaging.listener.topology.MemberTerminatedEventListener;
 import org.apache.stratos.messaging.message.receiver.topology.TopologyEventReceiver;
 import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
 import org.apache.stratos.messaging.util.Constants;
@@ -52,23 +57,31 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
+/**
+ * CEP window processor to handle faulty member instances. This window processor is responsible for
+ * publishing MemberFault event if health stats are not received within a given time window.
+ */
 @SiddhiExtension(namespace = "stratos", function = "faultHandling")
 public class FaultHandlingWindowProcessor extends WindowProcessor implements RunnableWindowProcessor {
 
     private static final int TIME_OUT = 60 * 1000;
     static final Logger log = Logger.getLogger(FaultHandlingWindowProcessor.class);
-    private ScheduledExecutorService eventRemoverScheduler;
-    private int subjectedAttrIndex;
+    private ScheduledExecutorService faultHandleScheduler;
     private ThreadBarrier threadBarrier;
     private long timeToKeep;
     private ISchedulerSiddhiQueue<StreamEvent> window;
+    private EventPublisher healthStatPublisher = EventPublisherPool.getPublisher(Constants.HEALTH_STAT_TOPIC);
+    private Map<String, Object> MemberFaultEventMap = new HashMap<String, Object>();
+    private Map<String, Object> memberFaultEventMessageMap = new HashMap<String, Object>();
+
+    // Map of member id's to their last received health event time stamp
     private ConcurrentHashMap<String, Long> memberTimeStampMap = new ConcurrentHashMap<String, Long>();
-    private ConcurrentHashMap<String, Member> memberIdMap = new ConcurrentHashMap<String, Member>();
-    EventPublisher healthStatPublisher = EventPublisherPool.getPublisher(Constants.HEALTH_STAT_TOPIC);
-    Map<String, Object> MemberFaultEventMap = new HashMap<String, Object>();
-    Map<String, Object> memberFaultEventMessageMap = new HashMap<String, Object>();
-    private TopologyEventReceiver topologyEventReceiver;
-    private String memberID;
+
+    // Event receiver to receive topology events published by cloud-controller
+    private CEPTopologyEventReceiver cepTopologyEventReceiver = new CEPTopologyEventReceiver(this);
+
+    // Stratos member id attribute index in stream execution plan
+    private int memberIdAttrIndex;
 
     @Override
     protected void processEvent(InEvent event) {
@@ -77,20 +90,34 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run
 
     @Override
     protected void processEvent(InListEvent listEvent) {
-        System.out.println(listEvent);
         for (int i = 0, size = listEvent.getActiveEvents(); i < size; i++) {
             addDataToMap((InEvent) listEvent.getEvent(i));
         }
     }
 
+    /**
+     * Add new entry to time stamp map from the received event.
+     *
+     * @param event Event received by Siddhi.
+     */
     protected void addDataToMap(InEvent event) {
-        if (memberID != null) {
-            String id = (String)event.getData()[subjectedAttrIndex];
+        String id = (String) event.getData()[memberIdAttrIndex];
+        //checking whether this member is the topology.
+        //sometimes there can be a delay between publishing member terminated events
+        //and actually terminating instances. Hence CEP might get events for already terminated members
+        //so we are checking the topology for the member existence
+        Member member = getMemberFromId(id);
+        if (null == member) {
+            log.debug("Member not found in the toplogy. Event rejected");
+            return;
+        }
+        if (StringUtils.isNotEmpty(id)) {
             memberTimeStampMap.put(id, event.getTimeStamp());
-            log.debug("Event received from [member-id] " + id);
+        } else {
+            log.warn("NULL member id found in the event received. Event rejected.");
         }
-        else {
-            log.error("NULL member ID in the event received");
+        if (log.isDebugEnabled()){
+            log.debug("Event received from [member-id] " + id + " [time-stamp] " + event.getTimeStamp());
         }
     }
 
@@ -108,55 +135,88 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run
         }
     }
 
-    /*
-    *  Retrieve the current activated member list from the topology and put them into the
-    *  memberTimeStampMap if not already exists. This will allow the system to recover
-    *  from any inconsistent state caused by MB/CEP failures.
-    */
-    private void loadFromTopology(){
-        if (TopologyManager.getTopology().isInitialized()){
-            TopologyManager.acquireReadLock();
-            memberIdMap.clear();
-            long currentTimeStamp = System.currentTimeMillis();
-            Iterator<Service> servicesItr = TopologyManager.getTopology().getServices().iterator();
-            while(servicesItr.hasNext()){
-                Service service = servicesItr.next();
-                Iterator<Cluster> clusterItr = service.getClusters().iterator();
-                while(clusterItr.hasNext()){
-                    Cluster cluster = clusterItr.next();
-                    Iterator<Member> memberItr = cluster.getMembers().iterator();
-                    while(memberItr.hasNext()){
-                        Member member = memberItr.next();
-                        if (member.getStatus().equals(MemberStatus.Activated)){
-                            memberTimeStampMap.putIfAbsent(member.getMemberId(), currentTimeStamp);
-                            memberIdMap.put(member.getMemberId(), member);
+    /**
+     *  Retrieve the current activated members from the topology and initialize the time stamp map.
+     *  This will allow the system to recover from a restart
+     *
+     *  @param topology Topology model object
+     */
+    boolean loadTimeStampMapFromTopology(Topology topology){
+
+        long currentTimeStamp = System.currentTimeMillis();
+        if (topology == null || topology.getServices() == null){
+            return false;
+        }
+        // TODO make this efficient by adding APIs to messaging component
+        for (Service service : topology.getServices()) {
+            if (service.getClusters() != null) {
+                for (Cluster cluster : service.getClusters()) {
+                    if (cluster.getMembers() != null) {
+                        for (Member member : cluster.getMembers()) {
+                            // we are checking faulty status only in previously activated members
+                            if (member != null && MemberStatus.Activated.equals(member.getStatus())) {
+                                // Initialize the member time stamp map from the topology at the beginning
+                                memberTimeStampMap.putIfAbsent(member.getMemberId(), currentTimeStamp);
+                            }
                         }
                     }
                 }
             }
-            TopologyManager.releaseReadLock();
         }
+
+        log.info("Member time stamp map was successfully loaded from the topology.");
         if (log.isDebugEnabled()){
             log.debug("Member TimeStamp Map: " + memberTimeStampMap);
-            log.debug("Member ID Map: " + memberIdMap);
         }
+        return true;
     }
 
-    private void publishMemberFault(String memberID){
-        Member member = memberIdMap.get(memberID);
+    private Member getMemberFromId(String memberId){
+        if (StringUtils.isEmpty(memberId)){
+            return null;
+        }
+        if (TopologyManager.getTopology().isInitialized()){
+            try {
+                TopologyManager.acquireReadLock();
+                if (TopologyManager.getTopology().getServices() == null){
+                    return null;
+                }
+                // TODO make this efficient by adding APIs to messaging component
+                for (Service service : TopologyManager.getTopology().getServices()) {
+                    if (service.getClusters() != null) {
+                        for (Cluster cluster : service.getClusters()) {
+                            if (cluster.getMembers() != null) {
+                                for (Member member : cluster.getMembers()){
+                                    if (memberId.equals(member.getMemberId())){
+                                        return member;
+                                    }
+                                }
+                            }
+                        }
+                    }
+                }
+            } catch (Exception e) {
+                log.error("Error while reading topology" + e);
+            } finally {
+                TopologyManager.releaseReadLock();
+            }
+        }
+        return null;
+    }
+
+    private void publishMemberFault(String memberId){
+        Member member = getMemberFromId(memberId);
         if (member == null){
-            log.error("Failed to publish MemberFault event. Member having [member-id] " + memberID + " does not exist in topology");
+            log.error("Failed to publish member fault event. Member having [member-id] " + memberId +
+                    " does not exist in topology");
             return;
         }
-        MemberFaultEvent memberFaultEvent = new MemberFaultEvent(member.getClusterId(), member.getMemberId(), member.getPartitionId(), 0);
-        memberFaultEventMessageMap.put("message", memberFaultEvent);
-        Properties headers = new Properties();
-        headers.put(Constants.EVENT_CLASS_NAME, memberFaultEvent.getClass().getName());
-        healthStatPublisher.publish(MemberFaultEventMap, headers, true);
+        log.info("Publishing member fault event for [member-id] " + memberId);
 
-        if (log.isDebugEnabled()){
-            log.debug("Published MemberFault event for [member-id] " + memberID);
-        }
+        MemberFaultEvent memberFaultEvent = new MemberFaultEvent(member.getClusterId(), member.getMemberId(),
+                member.getPartitionId(), 0);
+        memberFaultEventMessageMap.put("message", memberFaultEvent);
+        healthStatPublisher.publish(MemberFaultEventMap, true);
     }
 
 
@@ -164,26 +224,26 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run
     public void run() {
         try {
             threadBarrier.pass();
-            loadFromTopology();
-            Iterator it = memberTimeStampMap.entrySet().iterator();
 
-            while ( it.hasNext() ) {
-                Map.Entry pair = (Map.Entry)it.next();
+            for (Object o : memberTimeStampMap.entrySet()) {
+                Map.Entry pair = (Map.Entry) o;
                 long currentTime = System.currentTimeMillis();
                 Long eventTimeStamp = (Long) pair.getValue();
 
                 if ((currentTime - eventTimeStamp) > TIME_OUT) {
-                    log.info("Faulty member detected [member-id] " + pair.getKey() + " with [last time-stamp] " + eventTimeStamp + " [time-out] " + TIME_OUT + " milliseconds");
-                    it.remove();
+                    log.info("Faulty member detected [member-id] " + pair.getKey() + " with [last time-stamp] " +
+                            eventTimeStamp + " [time-out] " + TIME_OUT + " milliseconds");
                     publishMemberFault((String) pair.getKey());
                 }
             }
             if (log.isDebugEnabled()){
-                log.debug("Fault handling processor iteration completed with [time-stamp map length] " + memberTimeStampMap.size() + " [activated member-count] " + memberIdMap.size());
+                log.debug("Fault handling processor iteration completed with [time-stamp map length] " +
+                        memberTimeStampMap.size() + " [time-stamp map] " + memberTimeStampMap);
             }
-            eventRemoverScheduler.schedule(this, timeToKeep, TimeUnit.MILLISECONDS);
         } catch (Throwable t) {
             log.error(t.getMessage(), t);
+        } finally {
+            faultHandleScheduler.schedule(this, timeToKeep, TimeUnit.MILLISECONDS);
         }
     }
 
@@ -200,17 +260,16 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run
     }
 
     @Override
-    protected void init(Expression[] parameters, QueryPostProcessingElement nextProcessor, AbstractDefinition streamDefinition, String elementId, boolean async, SiddhiContext siddhiContext) {
+    protected void init(Expression[] parameters, QueryPostProcessingElement nextProcessor,
+                        AbstractDefinition streamDefinition, String elementId, boolean async, SiddhiContext siddhiContext) {
         if (parameters[0] instanceof IntConstant) {
             timeToKeep = ((IntConstant) parameters[0]).getValue();
         } else {
             timeToKeep = ((LongConstant) parameters[0]).getValue();
         }
 
-        memberID = ((Variable)parameters[1]).getAttributeName();
-
-        String subjectedAttr = ((Variable)parameters[1]).getAttributeName();
-        subjectedAttrIndex = streamDefinition.getAttributePosition(subjectedAttr);
+        String memberIdAttrName = ((Variable) parameters[1]).getAttributeName();
+        memberIdAttrIndex = streamDefinition.getAttributePosition(memberIdAttrName);
 
         if (this.siddhiContext.isDistributedProcessingEnabled()) {
             window = new SchedulerSiddhiQueueGrid<StreamEvent>(elementId, this, this.siddhiContext, this.async);
@@ -218,29 +277,32 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run
             window = new SchedulerSiddhiQueue<StreamEvent>(this);
         }
         MemberFaultEventMap.put("org.apache.stratos.messaging.event.health.stat.MemberFaultEvent", memberFaultEventMessageMap);
-        this.topologyEventReceiver = new TopologyEventReceiver();
-        Thread thread = new Thread(topologyEventReceiver);
-        thread.start();
-        log.info("WSO2 CEP topology receiver thread started");
+
+        Thread topologyTopicSubscriberThread = new Thread(cepTopologyEventReceiver);
+        topologyTopicSubscriberThread.start();
 
         //Ordinary scheduling
         window.schedule();
-
+        if (log.isDebugEnabled()){
+            log.debug("Fault handling window processor initialized with [timeToKeep] " + timeToKeep +
+                    ", [memberIdAttrName] " + memberIdAttrName + ", [memberIdAttrIndex] " + memberIdAttrIndex +
+                    ", [distributed-enabled] " + this.siddhiContext.isDistributedProcessingEnabled());
+        }
     }
 
     @Override
     public void schedule() {
-        eventRemoverScheduler.schedule(this, timeToKeep, TimeUnit.MILLISECONDS);
+        faultHandleScheduler.schedule(this, timeToKeep, TimeUnit.MILLISECONDS);
     }
 
     @Override
     public void scheduleNow() {
-        eventRemoverScheduler.schedule(this, 0, TimeUnit.MILLISECONDS);
+        faultHandleScheduler.schedule(this, 0, TimeUnit.MILLISECONDS);
     }
 
     @Override
     public void setScheduledExecutorService(ScheduledExecutorService scheduledExecutorService) {
-        this.eventRemoverScheduler = scheduledExecutorService;
+        this.faultHandleScheduler = scheduledExecutorService;
     }
 
     @Override
@@ -250,7 +312,12 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run
 
     @Override
     public void destroy(){
-        this.topologyEventReceiver.terminate();
+        // terminate topology listener thread
+        cepTopologyEventReceiver.terminate();
         window = null;
     }
+
+    public ConcurrentHashMap<String, Long> getMemberTimeStampMap() {
+        return memberTimeStampMap;
+    }
 }