You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by ra...@apache.org on 2015/11/01 04:51:09 UTC

stratos git commit: Fixing STRATOS-1594: Failed to publish member fault event log message printed repeatedly

Repository: stratos
Updated Branches:
  refs/heads/stratos-4.1.x 3a2acec71 -> b80bfc4f6


Fixing STRATOS-1594: Failed to publish member fault event log message printed repeatedly


Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/b80bfc4f
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/b80bfc4f
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/b80bfc4f

Branch: refs/heads/stratos-4.1.x
Commit: b80bfc4f6ae9a546b539676c3bc2a67fa56b106c
Parents: 3a2acec
Author: Akila Perera <ra...@gmail.com>
Authored: Sun Nov 1 09:20:41 2015 +0530
Committer: Akila Perera <ra...@gmail.com>
Committed: Sun Nov 1 09:20:41 2015 +0530

----------------------------------------------------------------------
 .../extension/FaultHandlingWindowProcessor.java | 189 +++++++++---------
 .../extension/FaultHandlingWindowProcessor.java | 191 +++++++++----------
 2 files changed, 189 insertions(+), 191 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/stratos/blob/b80bfc4f/extensions/cep/modules/stratos-cep-extension/wso2cep-3.0.0/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
----------------------------------------------------------------------
diff --git a/extensions/cep/modules/stratos-cep-extension/wso2cep-3.0.0/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java b/extensions/cep/modules/stratos-cep-extension/wso2cep-3.0.0/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
index 0aa01ed..0526f6a 100644
--- a/extensions/cep/modules/stratos-cep-extension/wso2cep-3.0.0/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
+++ b/extensions/cep/modules/stratos-cep-extension/wso2cep-3.0.0/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
@@ -48,20 +48,17 @@ import org.wso2.siddhi.query.api.extension.annotation.SiddhiExtension;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
 
 /**
  * CEP window processor to handle faulty member instances. This window processor is responsible for
  * publishing MemberFault event if health stats are not received within a given time window.
  */
-@SiddhiExtension(namespace = "stratos", function = "faultHandling")
+@SiddhiExtension(namespace = "stratos",
+                 function = "faultHandling")
 public class FaultHandlingWindowProcessor extends WindowProcessor implements RunnableWindowProcessor {
 
-	private static final Logger log = Logger.getLogger(FaultHandlingWindowProcessor.class);
+    private static final Logger log = Logger.getLogger(FaultHandlingWindowProcessor.class);
 
     private static final int TIME_OUT = 60 * 1000;
     public static final String CEP_EXTENSION_THREAD_POOL_KEY = "cep.extension.thread.pool";
@@ -70,57 +67,57 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run
     private ExecutorService executorService;
     private ScheduledExecutorService faultHandleScheduler;
     private ScheduledFuture<?> lastSchedule;
-	private ThreadBarrier threadBarrier;
-	private long timeToKeep;
-	private ISchedulerSiddhiQueue<StreamEvent> window;
-	private EventPublisher healthStatPublisher =
-			EventPublisherPool.getPublisher(MessagingUtil.Topics.HEALTH_STAT_TOPIC.getTopicName());
-	private Map<String, Object> MemberFaultEventMap = new HashMap<String, Object>();
-	private Map<String, Object> memberFaultEventMessageMap = new HashMap<String, Object>();
-
-	// Map of member id's to their last received health event time stamp
-	private ConcurrentHashMap<String, Long> memberTimeStampMap = new ConcurrentHashMap<String, Long>();
-
-	// Event receiver to receive topology events published by cloud-controller
-	private CEPTopologyEventReceiver cepTopologyEventReceiver = new CEPTopologyEventReceiver(this);
-
-	// Stratos member id attribute index in stream execution plan
-	private int memberIdAttrIndex;
-
-	@Override
-	protected void processEvent(InEvent event) {
-		addDataToMap(event);
-	}
-
-	@Override
-	protected void processEvent(InListEvent listEvent) {
-		for (int i = 0, size = listEvent.getActiveEvents(); i < size; i++) {
-			addDataToMap((InEvent) listEvent.getEvent(i));
-		}
-	}
-
-	/**
-	 * Add new entry to time stamp map from the received event.
-	 *
-	 * @param event Event received by Siddhi.
-	 */
-	protected void addDataToMap(InEvent event) {
-		String id = (String) event.getData()[memberIdAttrIndex];
-		//checking whether this member is the topology.
-		//sometimes there can be a delay between publishing member terminated events
-		//and actually terminating instances. Hence CEP might get events for already terminated members
-		//so we are checking the topology for the member existence
-		Member member = getMemberFromId(id);
-		if (null == member) {
-			log.debug("Member not found in the topology. Event rejected");
-			return;
-		}
+    private ThreadBarrier threadBarrier;
+    private long timeToKeep;
+    private ISchedulerSiddhiQueue<StreamEvent> window;
+    private EventPublisher healthStatPublisher = EventPublisherPool
+            .getPublisher(MessagingUtil.Topics.HEALTH_STAT_TOPIC.getTopicName());
+    private Map<String, Object> MemberFaultEventMap = new HashMap<String, Object>();
+    private Map<String, Object> memberFaultEventMessageMap = new HashMap<String, Object>();
+
+    // Map of member id's to their last received health event time stamp
+    private ConcurrentHashMap<String, Long> memberTimeStampMap = new ConcurrentHashMap<String, Long>();
+
+    // Event receiver to receive topology events published by cloud-controller
+    private CEPTopologyEventReceiver cepTopologyEventReceiver = new CEPTopologyEventReceiver(this);
+
+    // Stratos member id attribute index in stream execution plan
+    private int memberIdAttrIndex;
+
+    @Override
+    protected void processEvent(InEvent event) {
+        addDataToMap(event);
+    }
+
+    @Override
+    protected void processEvent(InListEvent listEvent) {
+        for (int i = 0, size = listEvent.getActiveEvents(); i < size; i++) {
+            addDataToMap((InEvent) listEvent.getEvent(i));
+        }
+    }
+
+    /**
+     * Add new entry to time stamp map from the received event.
+     *
+     * @param event Event received by Siddhi.
+     */
+    protected void addDataToMap(InEvent event) {
+        String id = (String) event.getData()[memberIdAttrIndex];
+        //checking whether this member is the topology.
+        //sometimes there can be a delay between publishing member terminated events
+        //and actually terminating instances. Hence CEP might get events for already terminated members
+        //so we are checking the topology for the member existence
+        Member member = getMemberFromId(id);
+        if (null == member) {
+            log.warn(String.format("Member with [id] %s not found in the topology. Event rejected", id));
+            return;
+        }
         if (StringUtils.isNotEmpty(id)) {
             memberTimeStampMap.put(id, event.getTimeStamp());
         } else {
             log.warn("NULL member id found in the event received. Event rejected.");
         }
-        if (log.isDebugEnabled()){
+        if (log.isDebugEnabled()) {
             log.debug("Event received from [member-id] " + id + " [time-stamp] " + event.getTimeStamp());
         }
     }
@@ -140,15 +137,15 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run
     }
 
     /**
-     *  Retrieve the current activated members from the topology and initialize the timestamp map.
-     *  This will allow the system to recover from a restart
+     * Retrieve the current activated members from the topology and initialize the timestamp map.
+     * This will allow the system to recover from a restart
      *
-     *  @param topology Topology model object
+     * @param topology Topology model object
      */
-    boolean loadTimeStampMapFromTopology(Topology topology){
+    boolean loadTimeStampMapFromTopology(Topology topology) {
 
         long currentTimeStamp = System.currentTimeMillis();
-        if (topology == null || topology.getServices() == null){
+        if (topology == null || topology.getServices() == null) {
             return false;
         }
         // TODO make this efficient by adding APIs to messaging component
@@ -168,21 +165,21 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run
             }
         }
 
-        if (log.isDebugEnabled()){
-            log.debug("Member timestamps were successfully loaded from the topology: [timestamps] " +
-                    memberTimeStampMap);
+        if (log.isDebugEnabled()) {
+            log.debug(
+                    "Member timestamps were successfully loaded from the topology: [timestamps] " + memberTimeStampMap);
         }
         return true;
     }
 
-    private Member getMemberFromId(String memberId){
-        if (StringUtils.isEmpty(memberId)){
+    private Member getMemberFromId(String memberId) {
+        if (StringUtils.isEmpty(memberId)) {
             return null;
         }
-        if (TopologyManager.getTopology().isInitialized()){
-        	try {
+        if (TopologyManager.getTopology().isInitialized()) {
+            try {
                 TopologyManager.acquireReadLock();
-                if (TopologyManager.getTopology().getServices() == null){
+                if (TopologyManager.getTopology().getServices() == null) {
                     return null;
                 }
                 // TODO make this efficient by adding APIs to messaging component
@@ -190,8 +187,8 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run
                     if (service.getClusters() != null) {
                         for (Cluster cluster : service.getClusters()) {
                             if (cluster.getMembers() != null) {
-                                for (Member member : cluster.getMembers()){
-                                    if (memberId.equals(member.getMemberId())){
+                                for (Member member : cluster.getMembers()) {
+                                    if (memberId.equals(member.getMemberId())) {
                                         return member;
                                     }
                                 }
@@ -199,27 +196,24 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run
                         }
                     }
                 }
-        	} catch (Exception e) {
-        		log.error("Error while reading topology" + e);
-        	} finally {
-        		TopologyManager.releaseReadLock();
-        	}
+            } catch (Exception e) {
+                log.error("Error while reading topology" + e);
+            } finally {
+                TopologyManager.releaseReadLock();
+            }
         }
         return null;
     }
 
-    private void publishMemberFault(String memberId){
-        Member member = getMemberFromId(memberId);
-        if (member == null){
-            log.warn("Failed to publish member fault event. Member having [member-id] " + memberId +
-                    " does not exist in topology");
+    private void publishMemberFault(Member member) {
+        if (member == null) {
+            log.warn("Failed to publish member fault event. Member object is null");
             return;
         }
-        log.info("Publishing member fault event for [member-id] " + memberId);
+        log.info("Publishing member fault event for [member-id] " + member.getMemberId());
 
         MemberFaultEvent memberFaultEvent = new MemberFaultEvent(member.getClusterId(), member.getClusterInstanceId(),
-                member.getMemberId(), member.getPartitionId(),
-                member.getNetworkPartitionId(), 0);
+                member.getMemberId(), member.getPartitionId(), member.getNetworkPartitionId(), 0);
 
         memberFaultEventMessageMap.put("message", memberFaultEvent);
         healthStatPublisher.publish(MemberFaultEventMap, true);
@@ -229,19 +223,23 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run
     public void run() {
         try {
             threadBarrier.pass();
-
             for (Object o : memberTimeStampMap.entrySet()) {
                 Map.Entry pair = (Map.Entry) o;
                 long currentTime = System.currentTimeMillis();
                 Long eventTimeStamp = (Long) pair.getValue();
 
                 if ((currentTime - eventTimeStamp) > TIME_OUT) {
-                    log.info("Faulty member detected [member-id] " + pair.getKey() + " with [last time-stamp] " +
-                            eventTimeStamp + " [time-out] " + TIME_OUT + " milliseconds");
-                    publishMemberFault((String) pair.getKey());
+                    String memberId = (String) pair.getKey();
+                    Member member = getMemberFromId(memberId);
+                    if (member != null) {
+                        log.info("Faulty member detected [member-id] " + pair.getKey() + " with [last time-stamp] " +
+                                eventTimeStamp + " [time-out] " + TIME_OUT + " milliseconds");
+                        publishMemberFault(member);
+                    }
+                    memberTimeStampMap.remove(memberId);
                 }
             }
-            if (log.isDebugEnabled()){
+            if (log.isDebugEnabled()) {
                 log.debug("Fault handling processor iteration completed with [time-stamp map length] " +
                         memberTimeStampMap.size() + " [time-stamp map] " + memberTimeStampMap);
             }
@@ -257,7 +255,7 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run
 
     @Override
     protected Object[] currentState() {
-        return new Object[]{window.currentState()};
+        return new Object[] { window.currentState() };
     }
 
     @Override
@@ -269,7 +267,7 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run
 
     @Override
     protected void init(Expression[] parameters, QueryPostProcessingElement nextProcessor,
-                        AbstractDefinition streamDefinition, String elementId, boolean async, SiddhiContext siddhiContext) {
+            AbstractDefinition streamDefinition, String elementId, boolean async, SiddhiContext siddhiContext) {
 
         if (parameters[0] instanceof IntConstant) {
             timeToKeep = ((IntConstant) parameters[0]).getValue();
@@ -285,16 +283,17 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run
         } else {
             window = new SchedulerSiddhiQueue<StreamEvent>(this);
         }
-        MemberFaultEventMap.put("org.apache.stratos.messaging.event.health.stat.MemberFaultEvent", memberFaultEventMessageMap);
+        MemberFaultEventMap
+                .put("org.apache.stratos.messaging.event.health.stat.MemberFaultEvent", memberFaultEventMessageMap);
 
-	    executorService = StratosThreadPool.getExecutorService(CEP_EXTENSION_THREAD_POOL_KEY,
-                CEP_EXTENSION_THREAD_POOL_SIZE);
-	    cepTopologyEventReceiver.setExecutorService(executorService);
+        executorService = StratosThreadPool
+                .getExecutorService(CEP_EXTENSION_THREAD_POOL_KEY, CEP_EXTENSION_THREAD_POOL_SIZE);
+        cepTopologyEventReceiver.setExecutorService(executorService);
         cepTopologyEventReceiver.execute();
 
         //Ordinary scheduling
         window.schedule();
-        if (log.isDebugEnabled()){
+        if (log.isDebugEnabled()) {
             log.debug("Fault handling window processor initialized with [timeToKeep] " + timeToKeep +
                     ", [memberIdAttrName] " + memberIdAttrName + ", [memberIdAttrIndex] " + memberIdAttrIndex +
                     ", [distributed-enabled] " + this.siddhiContext.isDistributedProcessingEnabled());
@@ -328,13 +327,13 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run
     }
 
     @Override
-    public void destroy(){
+    public void destroy() {
         // terminate topology listener thread
         cepTopologyEventReceiver.terminate();
         window = null;
 
         // Shutdown executor service
-        if(executorService != null) {
+        if (executorService != null) {
             try {
                 executorService.shutdownNow();
             } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/stratos/blob/b80bfc4f/extensions/cep/modules/stratos-cep-extension/wso2cep-3.1.0/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
----------------------------------------------------------------------
diff --git a/extensions/cep/modules/stratos-cep-extension/wso2cep-3.1.0/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java b/extensions/cep/modules/stratos-cep-extension/wso2cep-3.1.0/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
index 305eda8..7d8a2a5 100644
--- a/extensions/cep/modules/stratos-cep-extension/wso2cep-3.1.0/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
+++ b/extensions/cep/modules/stratos-cep-extension/wso2cep-3.1.0/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
@@ -25,10 +25,10 @@ import org.wso2.siddhi.core.config.SiddhiContext;
 import org.wso2.siddhi.core.event.StreamEvent;
 import org.wso2.siddhi.core.event.in.InEvent;
 import org.wso2.siddhi.core.event.in.InListEvent;
-import org.wso2.siddhi.core.snapshot.ThreadBarrier;
 import org.wso2.siddhi.core.query.QueryPostProcessingElement;
 import org.wso2.siddhi.core.query.processor.window.RunnableWindowProcessor;
 import org.wso2.siddhi.core.query.processor.window.WindowProcessor;
+import org.wso2.siddhi.core.snapshot.ThreadBarrier;
 import org.wso2.siddhi.core.util.collection.queue.scheduler.ISchedulerSiddhiQueue;
 import org.wso2.siddhi.core.util.collection.queue.scheduler.SchedulerSiddhiQueue;
 import org.wso2.siddhi.core.util.collection.queue.scheduler.SchedulerSiddhiQueueGrid;
@@ -42,20 +42,17 @@ import org.wso2.siddhi.query.api.extension.annotation.SiddhiExtension;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
 
 /**
  * CEP window processor to handle faulty member instances. This window processor is responsible for
  * publishing MemberFault event if health stats are not received within a given time window.
  */
-@SiddhiExtension(namespace = "stratos", function = "faultHandling")
+@SiddhiExtension(namespace = "stratos",
+                 function = "faultHandling")
 public class FaultHandlingWindowProcessor extends WindowProcessor implements RunnableWindowProcessor {
 
-	private static final Logger log = Logger.getLogger(FaultHandlingWindowProcessor.class);
+    private static final Logger log = Logger.getLogger(FaultHandlingWindowProcessor.class);
 
     private static final int TIME_OUT = 60 * 1000;
     public static final String CEP_EXTENSION_THREAD_POOL_KEY = "cep.extension.thread.pool";
@@ -64,57 +61,57 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run
     private ExecutorService executorService;
     private ScheduledExecutorService faultHandleScheduler;
     private ScheduledFuture<?> lastSchedule;
-	private ThreadBarrier threadBarrier;
-	private long timeToKeep;
-	private ISchedulerSiddhiQueue<StreamEvent> window;
-	private EventPublisher healthStatPublisher =
-			EventPublisherPool.getPublisher(MessagingUtil.Topics.HEALTH_STAT_TOPIC.getTopicName());
-	private Map<String, Object> MemberFaultEventMap = new HashMap<String, Object>();
-	private Map<String, Object> memberFaultEventMessageMap = new HashMap<String, Object>();
-
-	// Map of member id's to their last received health event time stamp
-	private ConcurrentHashMap<String, Long> memberTimeStampMap = new ConcurrentHashMap<String, Long>();
-
-	// Event receiver to receive topology events published by cloud-controller
-	private CEPTopologyEventReceiver cepTopologyEventReceiver = new CEPTopologyEventReceiver(this);
-
-	// Stratos member id attribute index in stream execution plan
-	private int memberIdAttrIndex;
-
-	@Override
-	protected void processEvent(InEvent event) {
-		addDataToMap(event);
-	}
-
-	@Override
-	protected void processEvent(InListEvent listEvent) {
-		for (int i = 0, size = listEvent.getActiveEvents(); i < size; i++) {
-			addDataToMap((InEvent) listEvent.getEvent(i));
-		}
-	}
-
-	/**
-	 * Add new entry to time stamp map from the received event.
-	 *
-	 * @param event Event received by Siddhi.
-	 */
-	protected void addDataToMap(InEvent event) {
-		String id = (String) event.getData()[memberIdAttrIndex];
-		//checking whether this member is the topology.
-		//sometimes there can be a delay between publishing member terminated events
-		//and actually terminating instances. Hence CEP might get events for already terminated members
-		//so we are checking the topology for the member existence
-		Member member = getMemberFromId(id);
-		if (null == member) {
-			log.debug("Member not found in the topology. Event rejected");
-			return;
-		}
+    private ThreadBarrier threadBarrier;
+    private long timeToKeep;
+    private ISchedulerSiddhiQueue<StreamEvent> window;
+    private EventPublisher healthStatPublisher = EventPublisherPool
+            .getPublisher(MessagingUtil.Topics.HEALTH_STAT_TOPIC.getTopicName());
+    private Map<String, Object> MemberFaultEventMap = new HashMap<String, Object>();
+    private Map<String, Object> memberFaultEventMessageMap = new HashMap<String, Object>();
+
+    // Map of member id's to their last received health event time stamp
+    private ConcurrentHashMap<String, Long> memberTimeStampMap = new ConcurrentHashMap<String, Long>();
+
+    // Event receiver to receive topology events published by cloud-controller
+    private CEPTopologyEventReceiver cepTopologyEventReceiver = new CEPTopologyEventReceiver(this);
+
+    // Stratos member id attribute index in stream execution plan
+    private int memberIdAttrIndex;
+
+    @Override
+    protected void processEvent(InEvent event) {
+        addDataToMap(event);
+    }
+
+    @Override
+    protected void processEvent(InListEvent listEvent) {
+        for (int i = 0, size = listEvent.getActiveEvents(); i < size; i++) {
+            addDataToMap((InEvent) listEvent.getEvent(i));
+        }
+    }
+
+    /**
+     * Add new entry to time stamp map from the received event.
+     *
+     * @param event Event received by Siddhi.
+     */
+    protected void addDataToMap(InEvent event) {
+        String id = (String) event.getData()[memberIdAttrIndex];
+        //checking whether this member is the topology.
+        //sometimes there can be a delay between publishing member terminated events
+        //and actually terminating instances. Hence CEP might get events for already terminated members
+        //so we are checking the topology for the member existence
+        Member member = getMemberFromId(id);
+        if (null == member) {
+            log.warn("Member not found in the topology. Event rejected");
+            return;
+        }
         if (StringUtils.isNotEmpty(id)) {
             memberTimeStampMap.put(id, event.getTimeStamp());
         } else {
             log.warn("NULL member id found in the event received. Event rejected.");
         }
-        if (log.isDebugEnabled()){
+        if (log.isDebugEnabled()) {
             log.debug("Event received from [member-id] " + id + " [time-stamp] " + event.getTimeStamp());
         }
     }
@@ -134,15 +131,15 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run
     }
 
     /**
-     *  Retrieve the current activated members from the topology and initialize the timestamp map.
-     *  This will allow the system to recover from a restart
+     * Retrieve the current activated members from the topology and initialize the timestamp map.
+     * This will allow the system to recover from a restart
      *
-     *  @param topology Topology model object
+     * @param topology Topology model object
      */
-    boolean loadTimeStampMapFromTopology(Topology topology){
+    boolean loadTimeStampMapFromTopology(Topology topology) {
 
         long currentTimeStamp = System.currentTimeMillis();
-        if (topology == null || topology.getServices() == null){
+        if (topology == null || topology.getServices() == null) {
             return false;
         }
         // TODO make this efficient by adding APIs to messaging component
@@ -162,21 +159,21 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run
             }
         }
 
-        if (log.isDebugEnabled()){
-            log.debug("Member timestamps were successfully loaded from the topology: [timestamps] " +
-                    memberTimeStampMap);
+        if (log.isDebugEnabled()) {
+            log.debug(
+                    "Member timestamps were successfully loaded from the topology: [timestamps] " + memberTimeStampMap);
         }
         return true;
     }
 
-    private Member getMemberFromId(String memberId){
-        if (StringUtils.isEmpty(memberId)){
+    private Member getMemberFromId(String memberId) {
+        if (StringUtils.isEmpty(memberId)) {
             return null;
         }
-        if (TopologyManager.getTopology().isInitialized()){
-        	try {
+        if (TopologyManager.getTopology().isInitialized()) {
+            try {
                 TopologyManager.acquireReadLock();
-                if (TopologyManager.getTopology().getServices() == null){
+                if (TopologyManager.getTopology().getServices() == null) {
                     return null;
                 }
                 // TODO make this efficient by adding APIs to messaging component
@@ -184,8 +181,8 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run
                     if (service.getClusters() != null) {
                         for (Cluster cluster : service.getClusters()) {
                             if (cluster.getMembers() != null) {
-                                for (Member member : cluster.getMembers()){
-                                    if (memberId.equals(member.getMemberId())){
+                                for (Member member : cluster.getMembers()) {
+                                    if (memberId.equals(member.getMemberId())) {
                                         return member;
                                     }
                                 }
@@ -193,27 +190,24 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run
                         }
                     }
                 }
-        	} catch (Exception e) {
-        		log.error("Error while reading topology" + e);
-        	} finally {
-        		TopologyManager.releaseReadLock();
-        	}
+            } catch (Exception e) {
+                log.error("Error while reading topology" + e);
+            } finally {
+                TopologyManager.releaseReadLock();
+            }
         }
         return null;
     }
 
-    private void publishMemberFault(String memberId){
-        Member member = getMemberFromId(memberId);
-        if (member == null){
-            log.warn("Failed to publish member fault event. Member having [member-id] " + memberId +
-                    " does not exist in topology");
+    private void publishMemberFault(Member member) {
+        if (member == null) {
+            log.warn("Failed to publish member fault event. Member object is null");
             return;
         }
-        log.info("Publishing member fault event for [member-id] " + memberId);
+        log.info("Publishing member fault event for [member-id] " + member.getMemberId());
 
         MemberFaultEvent memberFaultEvent = new MemberFaultEvent(member.getClusterId(), member.getClusterInstanceId(),
-                member.getMemberId(), member.getPartitionId(),
-                member.getNetworkPartitionId(), 0);
+                member.getMemberId(), member.getPartitionId(), member.getNetworkPartitionId(), 0);
 
         memberFaultEventMessageMap.put("message", memberFaultEvent);
         healthStatPublisher.publish(MemberFaultEventMap, true);
@@ -223,19 +217,23 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run
     public void run() {
         try {
             threadBarrier.pass();
-
             for (Object o : memberTimeStampMap.entrySet()) {
                 Map.Entry pair = (Map.Entry) o;
                 long currentTime = System.currentTimeMillis();
                 Long eventTimeStamp = (Long) pair.getValue();
 
                 if ((currentTime - eventTimeStamp) > TIME_OUT) {
-                    log.info("Faulty member detected [member-id] " + pair.getKey() + " with [last time-stamp] " +
-                            eventTimeStamp + " [time-out] " + TIME_OUT + " milliseconds");
-                    publishMemberFault((String) pair.getKey());
+                    String memberId = (String) pair.getKey();
+                    Member member = getMemberFromId(memberId);
+                    if (member != null) {
+                        log.info("Faulty member detected [member-id] " + pair.getKey() + " with [last time-stamp] " +
+                                eventTimeStamp + " [time-out] " + TIME_OUT + " milliseconds");
+                        publishMemberFault(member);
+                    }
+                    memberTimeStampMap.remove(memberId);
                 }
             }
-            if (log.isDebugEnabled()){
+            if (log.isDebugEnabled()) {
                 log.debug("Fault handling processor iteration completed with [time-stamp map length] " +
                         memberTimeStampMap.size() + " [time-stamp map] " + memberTimeStampMap);
             }
@@ -251,7 +249,7 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run
 
     @Override
     protected Object[] currentState() {
-        return new Object[]{window.currentState()};
+        return new Object[] { window.currentState() };
     }
 
     @Override
@@ -262,7 +260,7 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run
 
     @Override
     protected void init(Expression[] parameters, QueryPostProcessingElement nextProcessor,
-                        AbstractDefinition streamDefinition, String elementId, boolean async, SiddhiContext siddhiContext) {
+            AbstractDefinition streamDefinition, String elementId, boolean async, SiddhiContext siddhiContext) {
 
         if (parameters[0] instanceof IntConstant) {
             timeToKeep = ((IntConstant) parameters[0]).getValue();
@@ -278,16 +276,17 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run
         } else {
             window = new SchedulerSiddhiQueue<StreamEvent>(this);
         }
-        MemberFaultEventMap.put("org.apache.stratos.messaging.event.health.stat.MemberFaultEvent", memberFaultEventMessageMap);
+        MemberFaultEventMap
+                .put("org.apache.stratos.messaging.event.health.stat.MemberFaultEvent", memberFaultEventMessageMap);
 
-	    executorService = StratosThreadPool.getExecutorService(CEP_EXTENSION_THREAD_POOL_KEY,
-                CEP_EXTENSION_THREAD_POOL_SIZE);
-	    cepTopologyEventReceiver.setExecutorService(executorService);
+        executorService = StratosThreadPool
+                .getExecutorService(CEP_EXTENSION_THREAD_POOL_KEY, CEP_EXTENSION_THREAD_POOL_SIZE);
+        cepTopologyEventReceiver.setExecutorService(executorService);
         cepTopologyEventReceiver.execute();
 
         //Ordinary scheduling
         window.schedule();
-        if (log.isDebugEnabled()){
+        if (log.isDebugEnabled()) {
             log.debug("Fault handling window processor initialized with [timeToKeep] " + timeToKeep +
                     ", [memberIdAttrName] " + memberIdAttrName + ", [memberIdAttrIndex] " + memberIdAttrIndex +
                     ", [distributed-enabled] " + this.siddhiContext.isDistributedProcessingEnabled());
@@ -321,13 +320,13 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run
     }
 
     @Override
-    public void destroy(){
+    public void destroy() {
         // terminate topology listener thread
         cepTopologyEventReceiver.terminate();
         window = null;
 
         // Shutdown executor service
-        if(executorService != null) {
+        if (executorService != null) {
             try {
                 executorService.shutdownNow();
             } catch (Exception e) {