You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by ra...@apache.org on 2015/11/01 04:51:09 UTC
stratos git commit: Fixing STRATOS-1594: Failed to publish member
fault event log message printed repeatedly
Repository: stratos
Updated Branches:
refs/heads/stratos-4.1.x 3a2acec71 -> b80bfc4f6
Fixing STRATOS-1594: Failed to publish member fault event log message printed repeatedly
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/b80bfc4f
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/b80bfc4f
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/b80bfc4f
Branch: refs/heads/stratos-4.1.x
Commit: b80bfc4f6ae9a546b539676c3bc2a67fa56b106c
Parents: 3a2acec
Author: Akila Perera <ra...@gmail.com>
Authored: Sun Nov 1 09:20:41 2015 +0530
Committer: Akila Perera <ra...@gmail.com>
Committed: Sun Nov 1 09:20:41 2015 +0530
----------------------------------------------------------------------
.../extension/FaultHandlingWindowProcessor.java | 189 +++++++++---------
.../extension/FaultHandlingWindowProcessor.java | 191 +++++++++----------
2 files changed, 189 insertions(+), 191 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/stratos/blob/b80bfc4f/extensions/cep/modules/stratos-cep-extension/wso2cep-3.0.0/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
----------------------------------------------------------------------
diff --git a/extensions/cep/modules/stratos-cep-extension/wso2cep-3.0.0/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java b/extensions/cep/modules/stratos-cep-extension/wso2cep-3.0.0/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
index 0aa01ed..0526f6a 100644
--- a/extensions/cep/modules/stratos-cep-extension/wso2cep-3.0.0/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
+++ b/extensions/cep/modules/stratos-cep-extension/wso2cep-3.0.0/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
@@ -48,20 +48,17 @@ import org.wso2.siddhi.query.api.extension.annotation.SiddhiExtension;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
/**
* CEP window processor to handle faulty member instances. This window processor is responsible for
* publishing MemberFault event if health stats are not received within a given time window.
*/
-@SiddhiExtension(namespace = "stratos", function = "faultHandling")
+@SiddhiExtension(namespace = "stratos",
+ function = "faultHandling")
public class FaultHandlingWindowProcessor extends WindowProcessor implements RunnableWindowProcessor {
- private static final Logger log = Logger.getLogger(FaultHandlingWindowProcessor.class);
+ private static final Logger log = Logger.getLogger(FaultHandlingWindowProcessor.class);
private static final int TIME_OUT = 60 * 1000;
public static final String CEP_EXTENSION_THREAD_POOL_KEY = "cep.extension.thread.pool";
@@ -70,57 +67,57 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run
private ExecutorService executorService;
private ScheduledExecutorService faultHandleScheduler;
private ScheduledFuture<?> lastSchedule;
- private ThreadBarrier threadBarrier;
- private long timeToKeep;
- private ISchedulerSiddhiQueue<StreamEvent> window;
- private EventPublisher healthStatPublisher =
- EventPublisherPool.getPublisher(MessagingUtil.Topics.HEALTH_STAT_TOPIC.getTopicName());
- private Map<String, Object> MemberFaultEventMap = new HashMap<String, Object>();
- private Map<String, Object> memberFaultEventMessageMap = new HashMap<String, Object>();
-
- // Map of member id's to their last received health event time stamp
- private ConcurrentHashMap<String, Long> memberTimeStampMap = new ConcurrentHashMap<String, Long>();
-
- // Event receiver to receive topology events published by cloud-controller
- private CEPTopologyEventReceiver cepTopologyEventReceiver = new CEPTopologyEventReceiver(this);
-
- // Stratos member id attribute index in stream execution plan
- private int memberIdAttrIndex;
-
- @Override
- protected void processEvent(InEvent event) {
- addDataToMap(event);
- }
-
- @Override
- protected void processEvent(InListEvent listEvent) {
- for (int i = 0, size = listEvent.getActiveEvents(); i < size; i++) {
- addDataToMap((InEvent) listEvent.getEvent(i));
- }
- }
-
- /**
- * Add new entry to time stamp map from the received event.
- *
- * @param event Event received by Siddhi.
- */
- protected void addDataToMap(InEvent event) {
- String id = (String) event.getData()[memberIdAttrIndex];
- //checking whether this member is the topology.
- //sometimes there can be a delay between publishing member terminated events
- //and actually terminating instances. Hence CEP might get events for already terminated members
- //so we are checking the topology for the member existence
- Member member = getMemberFromId(id);
- if (null == member) {
- log.debug("Member not found in the topology. Event rejected");
- return;
- }
+ private ThreadBarrier threadBarrier;
+ private long timeToKeep;
+ private ISchedulerSiddhiQueue<StreamEvent> window;
+ private EventPublisher healthStatPublisher = EventPublisherPool
+ .getPublisher(MessagingUtil.Topics.HEALTH_STAT_TOPIC.getTopicName());
+ private Map<String, Object> MemberFaultEventMap = new HashMap<String, Object>();
+ private Map<String, Object> memberFaultEventMessageMap = new HashMap<String, Object>();
+
+ // Map of member id's to their last received health event time stamp
+ private ConcurrentHashMap<String, Long> memberTimeStampMap = new ConcurrentHashMap<String, Long>();
+
+ // Event receiver to receive topology events published by cloud-controller
+ private CEPTopologyEventReceiver cepTopologyEventReceiver = new CEPTopologyEventReceiver(this);
+
+ // Stratos member id attribute index in stream execution plan
+ private int memberIdAttrIndex;
+
+ @Override
+ protected void processEvent(InEvent event) {
+ addDataToMap(event);
+ }
+
+ @Override
+ protected void processEvent(InListEvent listEvent) {
+ for (int i = 0, size = listEvent.getActiveEvents(); i < size; i++) {
+ addDataToMap((InEvent) listEvent.getEvent(i));
+ }
+ }
+
+ /**
+ * Add new entry to time stamp map from the received event.
+ *
+ * @param event Event received by Siddhi.
+ */
+ protected void addDataToMap(InEvent event) {
+ String id = (String) event.getData()[memberIdAttrIndex];
+ //checking whether this member is the topology.
+ //sometimes there can be a delay between publishing member terminated events
+ //and actually terminating instances. Hence CEP might get events for already terminated members
+ //so we are checking the topology for the member existence
+ Member member = getMemberFromId(id);
+ if (null == member) {
+ log.warn(String.format("Member with [id] %s not found in the topology. Event rejected", id));
+ return;
+ }
if (StringUtils.isNotEmpty(id)) {
memberTimeStampMap.put(id, event.getTimeStamp());
} else {
log.warn("NULL member id found in the event received. Event rejected.");
}
- if (log.isDebugEnabled()){
+ if (log.isDebugEnabled()) {
log.debug("Event received from [member-id] " + id + " [time-stamp] " + event.getTimeStamp());
}
}
@@ -140,15 +137,15 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run
}
/**
- * Retrieve the current activated members from the topology and initialize the timestamp map.
- * This will allow the system to recover from a restart
+ * Retrieve the current activated members from the topology and initialize the timestamp map.
+ * This will allow the system to recover from a restart
*
- * @param topology Topology model object
+ * @param topology Topology model object
*/
- boolean loadTimeStampMapFromTopology(Topology topology){
+ boolean loadTimeStampMapFromTopology(Topology topology) {
long currentTimeStamp = System.currentTimeMillis();
- if (topology == null || topology.getServices() == null){
+ if (topology == null || topology.getServices() == null) {
return false;
}
// TODO make this efficient by adding APIs to messaging component
@@ -168,21 +165,21 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run
}
}
- if (log.isDebugEnabled()){
- log.debug("Member timestamps were successfully loaded from the topology: [timestamps] " +
- memberTimeStampMap);
+ if (log.isDebugEnabled()) {
+ log.debug(
+ "Member timestamps were successfully loaded from the topology: [timestamps] " + memberTimeStampMap);
}
return true;
}
- private Member getMemberFromId(String memberId){
- if (StringUtils.isEmpty(memberId)){
+ private Member getMemberFromId(String memberId) {
+ if (StringUtils.isEmpty(memberId)) {
return null;
}
- if (TopologyManager.getTopology().isInitialized()){
- try {
+ if (TopologyManager.getTopology().isInitialized()) {
+ try {
TopologyManager.acquireReadLock();
- if (TopologyManager.getTopology().getServices() == null){
+ if (TopologyManager.getTopology().getServices() == null) {
return null;
}
// TODO make this efficient by adding APIs to messaging component
@@ -190,8 +187,8 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run
if (service.getClusters() != null) {
for (Cluster cluster : service.getClusters()) {
if (cluster.getMembers() != null) {
- for (Member member : cluster.getMembers()){
- if (memberId.equals(member.getMemberId())){
+ for (Member member : cluster.getMembers()) {
+ if (memberId.equals(member.getMemberId())) {
return member;
}
}
@@ -199,27 +196,24 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run
}
}
}
- } catch (Exception e) {
- log.error("Error while reading topology" + e);
- } finally {
- TopologyManager.releaseReadLock();
- }
+ } catch (Exception e) {
+ log.error("Error while reading topology" + e);
+ } finally {
+ TopologyManager.releaseReadLock();
+ }
}
return null;
}
- private void publishMemberFault(String memberId){
- Member member = getMemberFromId(memberId);
- if (member == null){
- log.warn("Failed to publish member fault event. Member having [member-id] " + memberId +
- " does not exist in topology");
+ private void publishMemberFault(Member member) {
+ if (member == null) {
+ log.warn("Failed to publish member fault event. Member object is null");
return;
}
- log.info("Publishing member fault event for [member-id] " + memberId);
+ log.info("Publishing member fault event for [member-id] " + member.getMemberId());
MemberFaultEvent memberFaultEvent = new MemberFaultEvent(member.getClusterId(), member.getClusterInstanceId(),
- member.getMemberId(), member.getPartitionId(),
- member.getNetworkPartitionId(), 0);
+ member.getMemberId(), member.getPartitionId(), member.getNetworkPartitionId(), 0);
memberFaultEventMessageMap.put("message", memberFaultEvent);
healthStatPublisher.publish(MemberFaultEventMap, true);
@@ -229,19 +223,23 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run
public void run() {
try {
threadBarrier.pass();
-
for (Object o : memberTimeStampMap.entrySet()) {
Map.Entry pair = (Map.Entry) o;
long currentTime = System.currentTimeMillis();
Long eventTimeStamp = (Long) pair.getValue();
if ((currentTime - eventTimeStamp) > TIME_OUT) {
- log.info("Faulty member detected [member-id] " + pair.getKey() + " with [last time-stamp] " +
- eventTimeStamp + " [time-out] " + TIME_OUT + " milliseconds");
- publishMemberFault((String) pair.getKey());
+ String memberId = (String) pair.getKey();
+ Member member = getMemberFromId(memberId);
+ if (member != null) {
+ log.info("Faulty member detected [member-id] " + pair.getKey() + " with [last time-stamp] " +
+ eventTimeStamp + " [time-out] " + TIME_OUT + " milliseconds");
+ publishMemberFault(member);
+ }
+ memberTimeStampMap.remove(memberId);
}
}
- if (log.isDebugEnabled()){
+ if (log.isDebugEnabled()) {
log.debug("Fault handling processor iteration completed with [time-stamp map length] " +
memberTimeStampMap.size() + " [time-stamp map] " + memberTimeStampMap);
}
@@ -257,7 +255,7 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run
@Override
protected Object[] currentState() {
- return new Object[]{window.currentState()};
+ return new Object[] { window.currentState() };
}
@Override
@@ -269,7 +267,7 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run
@Override
protected void init(Expression[] parameters, QueryPostProcessingElement nextProcessor,
- AbstractDefinition streamDefinition, String elementId, boolean async, SiddhiContext siddhiContext) {
+ AbstractDefinition streamDefinition, String elementId, boolean async, SiddhiContext siddhiContext) {
if (parameters[0] instanceof IntConstant) {
timeToKeep = ((IntConstant) parameters[0]).getValue();
@@ -285,16 +283,17 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run
} else {
window = new SchedulerSiddhiQueue<StreamEvent>(this);
}
- MemberFaultEventMap.put("org.apache.stratos.messaging.event.health.stat.MemberFaultEvent", memberFaultEventMessageMap);
+ MemberFaultEventMap
+ .put("org.apache.stratos.messaging.event.health.stat.MemberFaultEvent", memberFaultEventMessageMap);
- executorService = StratosThreadPool.getExecutorService(CEP_EXTENSION_THREAD_POOL_KEY,
- CEP_EXTENSION_THREAD_POOL_SIZE);
- cepTopologyEventReceiver.setExecutorService(executorService);
+ executorService = StratosThreadPool
+ .getExecutorService(CEP_EXTENSION_THREAD_POOL_KEY, CEP_EXTENSION_THREAD_POOL_SIZE);
+ cepTopologyEventReceiver.setExecutorService(executorService);
cepTopologyEventReceiver.execute();
//Ordinary scheduling
window.schedule();
- if (log.isDebugEnabled()){
+ if (log.isDebugEnabled()) {
log.debug("Fault handling window processor initialized with [timeToKeep] " + timeToKeep +
", [memberIdAttrName] " + memberIdAttrName + ", [memberIdAttrIndex] " + memberIdAttrIndex +
", [distributed-enabled] " + this.siddhiContext.isDistributedProcessingEnabled());
@@ -328,13 +327,13 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run
}
@Override
- public void destroy(){
+ public void destroy() {
// terminate topology listener thread
cepTopologyEventReceiver.terminate();
window = null;
// Shutdown executor service
- if(executorService != null) {
+ if (executorService != null) {
try {
executorService.shutdownNow();
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/stratos/blob/b80bfc4f/extensions/cep/modules/stratos-cep-extension/wso2cep-3.1.0/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
----------------------------------------------------------------------
diff --git a/extensions/cep/modules/stratos-cep-extension/wso2cep-3.1.0/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java b/extensions/cep/modules/stratos-cep-extension/wso2cep-3.1.0/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
index 305eda8..7d8a2a5 100644
--- a/extensions/cep/modules/stratos-cep-extension/wso2cep-3.1.0/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
+++ b/extensions/cep/modules/stratos-cep-extension/wso2cep-3.1.0/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
@@ -25,10 +25,10 @@ import org.wso2.siddhi.core.config.SiddhiContext;
import org.wso2.siddhi.core.event.StreamEvent;
import org.wso2.siddhi.core.event.in.InEvent;
import org.wso2.siddhi.core.event.in.InListEvent;
-import org.wso2.siddhi.core.snapshot.ThreadBarrier;
import org.wso2.siddhi.core.query.QueryPostProcessingElement;
import org.wso2.siddhi.core.query.processor.window.RunnableWindowProcessor;
import org.wso2.siddhi.core.query.processor.window.WindowProcessor;
+import org.wso2.siddhi.core.snapshot.ThreadBarrier;
import org.wso2.siddhi.core.util.collection.queue.scheduler.ISchedulerSiddhiQueue;
import org.wso2.siddhi.core.util.collection.queue.scheduler.SchedulerSiddhiQueue;
import org.wso2.siddhi.core.util.collection.queue.scheduler.SchedulerSiddhiQueueGrid;
@@ -42,20 +42,17 @@ import org.wso2.siddhi.query.api.extension.annotation.SiddhiExtension;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
/**
* CEP window processor to handle faulty member instances. This window processor is responsible for
* publishing MemberFault event if health stats are not received within a given time window.
*/
-@SiddhiExtension(namespace = "stratos", function = "faultHandling")
+@SiddhiExtension(namespace = "stratos",
+ function = "faultHandling")
public class FaultHandlingWindowProcessor extends WindowProcessor implements RunnableWindowProcessor {
- private static final Logger log = Logger.getLogger(FaultHandlingWindowProcessor.class);
+ private static final Logger log = Logger.getLogger(FaultHandlingWindowProcessor.class);
private static final int TIME_OUT = 60 * 1000;
public static final String CEP_EXTENSION_THREAD_POOL_KEY = "cep.extension.thread.pool";
@@ -64,57 +61,57 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run
private ExecutorService executorService;
private ScheduledExecutorService faultHandleScheduler;
private ScheduledFuture<?> lastSchedule;
- private ThreadBarrier threadBarrier;
- private long timeToKeep;
- private ISchedulerSiddhiQueue<StreamEvent> window;
- private EventPublisher healthStatPublisher =
- EventPublisherPool.getPublisher(MessagingUtil.Topics.HEALTH_STAT_TOPIC.getTopicName());
- private Map<String, Object> MemberFaultEventMap = new HashMap<String, Object>();
- private Map<String, Object> memberFaultEventMessageMap = new HashMap<String, Object>();
-
- // Map of member id's to their last received health event time stamp
- private ConcurrentHashMap<String, Long> memberTimeStampMap = new ConcurrentHashMap<String, Long>();
-
- // Event receiver to receive topology events published by cloud-controller
- private CEPTopologyEventReceiver cepTopologyEventReceiver = new CEPTopologyEventReceiver(this);
-
- // Stratos member id attribute index in stream execution plan
- private int memberIdAttrIndex;
-
- @Override
- protected void processEvent(InEvent event) {
- addDataToMap(event);
- }
-
- @Override
- protected void processEvent(InListEvent listEvent) {
- for (int i = 0, size = listEvent.getActiveEvents(); i < size; i++) {
- addDataToMap((InEvent) listEvent.getEvent(i));
- }
- }
-
- /**
- * Add new entry to time stamp map from the received event.
- *
- * @param event Event received by Siddhi.
- */
- protected void addDataToMap(InEvent event) {
- String id = (String) event.getData()[memberIdAttrIndex];
- //checking whether this member is the topology.
- //sometimes there can be a delay between publishing member terminated events
- //and actually terminating instances. Hence CEP might get events for already terminated members
- //so we are checking the topology for the member existence
- Member member = getMemberFromId(id);
- if (null == member) {
- log.debug("Member not found in the topology. Event rejected");
- return;
- }
+ private ThreadBarrier threadBarrier;
+ private long timeToKeep;
+ private ISchedulerSiddhiQueue<StreamEvent> window;
+ private EventPublisher healthStatPublisher = EventPublisherPool
+ .getPublisher(MessagingUtil.Topics.HEALTH_STAT_TOPIC.getTopicName());
+ private Map<String, Object> MemberFaultEventMap = new HashMap<String, Object>();
+ private Map<String, Object> memberFaultEventMessageMap = new HashMap<String, Object>();
+
+ // Map of member id's to their last received health event time stamp
+ private ConcurrentHashMap<String, Long> memberTimeStampMap = new ConcurrentHashMap<String, Long>();
+
+ // Event receiver to receive topology events published by cloud-controller
+ private CEPTopologyEventReceiver cepTopologyEventReceiver = new CEPTopologyEventReceiver(this);
+
+ // Stratos member id attribute index in stream execution plan
+ private int memberIdAttrIndex;
+
+ @Override
+ protected void processEvent(InEvent event) {
+ addDataToMap(event);
+ }
+
+ @Override
+ protected void processEvent(InListEvent listEvent) {
+ for (int i = 0, size = listEvent.getActiveEvents(); i < size; i++) {
+ addDataToMap((InEvent) listEvent.getEvent(i));
+ }
+ }
+
+ /**
+ * Add new entry to time stamp map from the received event.
+ *
+ * @param event Event received by Siddhi.
+ */
+ protected void addDataToMap(InEvent event) {
+ String id = (String) event.getData()[memberIdAttrIndex];
+ //checking whether this member is the topology.
+ //sometimes there can be a delay between publishing member terminated events
+ //and actually terminating instances. Hence CEP might get events for already terminated members
+ //so we are checking the topology for the member existence
+ Member member = getMemberFromId(id);
+ if (null == member) {
+ log.warn("Member not found in the topology. Event rejected");
+ return;
+ }
if (StringUtils.isNotEmpty(id)) {
memberTimeStampMap.put(id, event.getTimeStamp());
} else {
log.warn("NULL member id found in the event received. Event rejected.");
}
- if (log.isDebugEnabled()){
+ if (log.isDebugEnabled()) {
log.debug("Event received from [member-id] " + id + " [time-stamp] " + event.getTimeStamp());
}
}
@@ -134,15 +131,15 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run
}
/**
- * Retrieve the current activated members from the topology and initialize the timestamp map.
- * This will allow the system to recover from a restart
+ * Retrieve the current activated members from the topology and initialize the timestamp map.
+ * This will allow the system to recover from a restart
*
- * @param topology Topology model object
+ * @param topology Topology model object
*/
- boolean loadTimeStampMapFromTopology(Topology topology){
+ boolean loadTimeStampMapFromTopology(Topology topology) {
long currentTimeStamp = System.currentTimeMillis();
- if (topology == null || topology.getServices() == null){
+ if (topology == null || topology.getServices() == null) {
return false;
}
// TODO make this efficient by adding APIs to messaging component
@@ -162,21 +159,21 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run
}
}
- if (log.isDebugEnabled()){
- log.debug("Member timestamps were successfully loaded from the topology: [timestamps] " +
- memberTimeStampMap);
+ if (log.isDebugEnabled()) {
+ log.debug(
+ "Member timestamps were successfully loaded from the topology: [timestamps] " + memberTimeStampMap);
}
return true;
}
- private Member getMemberFromId(String memberId){
- if (StringUtils.isEmpty(memberId)){
+ private Member getMemberFromId(String memberId) {
+ if (StringUtils.isEmpty(memberId)) {
return null;
}
- if (TopologyManager.getTopology().isInitialized()){
- try {
+ if (TopologyManager.getTopology().isInitialized()) {
+ try {
TopologyManager.acquireReadLock();
- if (TopologyManager.getTopology().getServices() == null){
+ if (TopologyManager.getTopology().getServices() == null) {
return null;
}
// TODO make this efficient by adding APIs to messaging component
@@ -184,8 +181,8 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run
if (service.getClusters() != null) {
for (Cluster cluster : service.getClusters()) {
if (cluster.getMembers() != null) {
- for (Member member : cluster.getMembers()){
- if (memberId.equals(member.getMemberId())){
+ for (Member member : cluster.getMembers()) {
+ if (memberId.equals(member.getMemberId())) {
return member;
}
}
@@ -193,27 +190,24 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run
}
}
}
- } catch (Exception e) {
- log.error("Error while reading topology" + e);
- } finally {
- TopologyManager.releaseReadLock();
- }
+ } catch (Exception e) {
+ log.error("Error while reading topology" + e);
+ } finally {
+ TopologyManager.releaseReadLock();
+ }
}
return null;
}
- private void publishMemberFault(String memberId){
- Member member = getMemberFromId(memberId);
- if (member == null){
- log.warn("Failed to publish member fault event. Member having [member-id] " + memberId +
- " does not exist in topology");
+ private void publishMemberFault(Member member) {
+ if (member == null) {
+ log.warn("Failed to publish member fault event. Member object is null");
return;
}
- log.info("Publishing member fault event for [member-id] " + memberId);
+ log.info("Publishing member fault event for [member-id] " + member.getMemberId());
MemberFaultEvent memberFaultEvent = new MemberFaultEvent(member.getClusterId(), member.getClusterInstanceId(),
- member.getMemberId(), member.getPartitionId(),
- member.getNetworkPartitionId(), 0);
+ member.getMemberId(), member.getPartitionId(), member.getNetworkPartitionId(), 0);
memberFaultEventMessageMap.put("message", memberFaultEvent);
healthStatPublisher.publish(MemberFaultEventMap, true);
@@ -223,19 +217,23 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run
public void run() {
try {
threadBarrier.pass();
-
for (Object o : memberTimeStampMap.entrySet()) {
Map.Entry pair = (Map.Entry) o;
long currentTime = System.currentTimeMillis();
Long eventTimeStamp = (Long) pair.getValue();
if ((currentTime - eventTimeStamp) > TIME_OUT) {
- log.info("Faulty member detected [member-id] " + pair.getKey() + " with [last time-stamp] " +
- eventTimeStamp + " [time-out] " + TIME_OUT + " milliseconds");
- publishMemberFault((String) pair.getKey());
+ String memberId = (String) pair.getKey();
+ Member member = getMemberFromId(memberId);
+ if (member != null) {
+ log.info("Faulty member detected [member-id] " + pair.getKey() + " with [last time-stamp] " +
+ eventTimeStamp + " [time-out] " + TIME_OUT + " milliseconds");
+ publishMemberFault(member);
+ }
+ memberTimeStampMap.remove(memberId);
}
}
- if (log.isDebugEnabled()){
+ if (log.isDebugEnabled()) {
log.debug("Fault handling processor iteration completed with [time-stamp map length] " +
memberTimeStampMap.size() + " [time-stamp map] " + memberTimeStampMap);
}
@@ -251,7 +249,7 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run
@Override
protected Object[] currentState() {
- return new Object[]{window.currentState()};
+ return new Object[] { window.currentState() };
}
@Override
@@ -262,7 +260,7 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run
@Override
protected void init(Expression[] parameters, QueryPostProcessingElement nextProcessor,
- AbstractDefinition streamDefinition, String elementId, boolean async, SiddhiContext siddhiContext) {
+ AbstractDefinition streamDefinition, String elementId, boolean async, SiddhiContext siddhiContext) {
if (parameters[0] instanceof IntConstant) {
timeToKeep = ((IntConstant) parameters[0]).getValue();
@@ -278,16 +276,17 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run
} else {
window = new SchedulerSiddhiQueue<StreamEvent>(this);
}
- MemberFaultEventMap.put("org.apache.stratos.messaging.event.health.stat.MemberFaultEvent", memberFaultEventMessageMap);
+ MemberFaultEventMap
+ .put("org.apache.stratos.messaging.event.health.stat.MemberFaultEvent", memberFaultEventMessageMap);
- executorService = StratosThreadPool.getExecutorService(CEP_EXTENSION_THREAD_POOL_KEY,
- CEP_EXTENSION_THREAD_POOL_SIZE);
- cepTopologyEventReceiver.setExecutorService(executorService);
+ executorService = StratosThreadPool
+ .getExecutorService(CEP_EXTENSION_THREAD_POOL_KEY, CEP_EXTENSION_THREAD_POOL_SIZE);
+ cepTopologyEventReceiver.setExecutorService(executorService);
cepTopologyEventReceiver.execute();
//Ordinary scheduling
window.schedule();
- if (log.isDebugEnabled()){
+ if (log.isDebugEnabled()) {
log.debug("Fault handling window processor initialized with [timeToKeep] " + timeToKeep +
", [memberIdAttrName] " + memberIdAttrName + ", [memberIdAttrIndex] " + memberIdAttrIndex +
", [distributed-enabled] " + this.siddhiContext.isDistributedProcessingEnabled());
@@ -321,13 +320,13 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run
}
@Override
- public void destroy(){
+ public void destroy() {
// terminate topology listener thread
cepTopologyEventReceiver.terminate();
window = null;
// Shutdown executor service
- if(executorService != null) {
+ if (executorService != null) {
try {
executorService.shutdownNow();
} catch (Exception e) {