You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by re...@apache.org on 2014/10/27 09:44:36 UTC
[2/2] git commit: fixing cep listening on topology issue by merging
with master
fixing cep listening on topology issue by merging with master
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/7a5797df
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/7a5797df
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/7a5797df
Branch: refs/heads/4.0.0-grouping
Commit: 7a5797df08bbf92483cd5993e6ce9ca576686e3d
Parents: bea691b
Author: reka <rt...@gmail.com>
Authored: Mon Oct 27 14:10:15 2014 +0530
Committer: reka <rt...@gmail.com>
Committed: Mon Oct 27 14:14:22 2014 +0530
----------------------------------------------------------------------
.../cep/extension/CEPTopologyEventReceiver.java | 125 +++++++++++
.../extension/FaultHandlingWindowProcessor.java | 217 ++++++++++++-------
2 files changed, 267 insertions(+), 75 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/stratos/blob/7a5797df/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java
----------------------------------------------------------------------
diff --git a/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java b/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java
new file mode 100644
index 0000000..90c67f0
--- /dev/null
+++ b/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/CEPTopologyEventReceiver.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.cep.extension;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.event.Event;
+import org.apache.stratos.messaging.event.topology.CompleteTopologyEvent;
+import org.apache.stratos.messaging.event.topology.MemberActivatedEvent;
+import org.apache.stratos.messaging.event.topology.MemberTerminatedEvent;
+import org.apache.stratos.messaging.listener.topology.CompleteTopologyEventListener;
+import org.apache.stratos.messaging.listener.topology.MemberActivatedEventListener;
+import org.apache.stratos.messaging.listener.topology.MemberTerminatedEventListener;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyEventReceiver;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
+
+/**
+ * CEP Topology Receiver for Fault Handling Window Processor.
+ */
+public class CEPTopologyEventReceiver implements Runnable {
+
+ private static final Log log = LogFactory.getLog(CEPTopologyEventReceiver.class);
+
+ private TopologyEventReceiver topologyEventReceiver;
+ private boolean terminated;
+ private FaultHandlingWindowProcessor faultHandler;
+
+ public CEPTopologyEventReceiver(FaultHandlingWindowProcessor faultHandler) {
+ this.topologyEventReceiver = new TopologyEventReceiver();
+ this.faultHandler = faultHandler;
+ addEventListeners();
+ }
+
+ private void addEventListeners() {
+ // Load member time stamp map from the topology as a one time task
+ topologyEventReceiver.addEventListener(new CompleteTopologyEventListener() {
+ private boolean initialized;
+
+ @Override
+ protected void onEvent(Event event) {
+ if (!initialized) {
+ try {
+ TopologyManager.acquireReadLock();
+ log.info("Complete topology event received to fault handling window processor.");
+ CompleteTopologyEvent completeTopologyEvent = (CompleteTopologyEvent) event;
+ initialized = faultHandler.loadTimeStampMapFromTopology(completeTopologyEvent.getTopology());
+ } catch (Exception e) {
+ log.error("Error loading member time stamp map from complete topology event.", e);
+ } finally {
+ TopologyManager.releaseReadLock();
+ }
+ }
+ }
+ });
+
+ // Remove member from the time stamp map when MemberTerminated event is received.
+ topologyEventReceiver.addEventListener(new MemberTerminatedEventListener() {
+ @Override
+ protected void onEvent(Event event) {
+ MemberTerminatedEvent memberTerminatedEvent = (MemberTerminatedEvent) event;
+ faultHandler.getMemberTimeStampMap().remove(memberTerminatedEvent.getMemberId());
+ log.info("Member [member id] " + memberTerminatedEvent.getMemberId() +
+ " was removed from the time stamp map.");
+ }
+ });
+
+ // Add member to time stamp map whenever member is activated
+ topologyEventReceiver.addEventListener(new MemberActivatedEventListener() {
+ @Override
+ protected void onEvent(Event event) {
+ MemberActivatedEvent memberActivatedEvent = (MemberActivatedEvent) event;
+
+ // do not put this member if we have already received a health event
+ faultHandler.getMemberTimeStampMap().putIfAbsent(memberActivatedEvent.getMemberId(), System.currentTimeMillis());
+ log.info("Member [member id] " + memberActivatedEvent.getMemberId() +
+ " was added to the time stamp map.");
+ }
+ });
+ }
+
+ @Override
+ public void run() {
+ try {
+ Thread.sleep(15000);
+ } catch (InterruptedException ignore) {
+ }
+ Thread thread = new Thread(topologyEventReceiver);
+ thread.start();
+ log.info("CEP topology receiver thread started");
+
+ // Keep the thread live until terminated
+ while (!terminated) {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ignore) {
+ }
+ }
+ log.info("CEP topology receiver thread terminated");
+ }
+
+ /**
+ * Terminate CEP topology receiver thread.
+ */
+ public void terminate() {
+ topologyEventReceiver.terminate();
+ terminated = true;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/stratos/blob/7a5797df/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
----------------------------------------------------------------------
diff --git a/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java b/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
index 80174f4..0104a03 100644
--- a/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
+++ b/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
@@ -18,14 +18,19 @@
*/
package org.apache.stratos.cep.extension;
+import org.apache.commons.lang3.StringUtils;
import org.apache.log4j.Logger;
import org.apache.stratos.messaging.broker.publish.EventPublisher;
import org.apache.stratos.messaging.broker.publish.EventPublisherPool;
-import org.apache.stratos.messaging.domain.topology.Cluster;
-import org.apache.stratos.messaging.domain.topology.Member;
-import org.apache.stratos.messaging.domain.topology.MemberStatus;
-import org.apache.stratos.messaging.domain.topology.Service;
+import org.apache.stratos.messaging.domain.topology.*;
+import org.apache.stratos.messaging.event.Event;
import org.apache.stratos.messaging.event.health.stat.MemberFaultEvent;
+import org.apache.stratos.messaging.event.topology.CompleteTopologyEvent;
+import org.apache.stratos.messaging.event.topology.MemberActivatedEvent;
+import org.apache.stratos.messaging.event.topology.MemberTerminatedEvent;
+import org.apache.stratos.messaging.listener.topology.CompleteTopologyEventListener;
+import org.apache.stratos.messaging.listener.topology.MemberActivatedEventListener;
+import org.apache.stratos.messaging.listener.topology.MemberTerminatedEventListener;
import org.apache.stratos.messaging.message.receiver.topology.TopologyEventReceiver;
import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
import org.apache.stratos.messaging.util.Constants;
@@ -52,23 +57,31 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+/**
+ * CEP window processor to handle faulty member instances. This window processor is responsible for
+ * publishing MemberFault event if health stats are not received within a given time window.
+ */
@SiddhiExtension(namespace = "stratos", function = "faultHandling")
public class FaultHandlingWindowProcessor extends WindowProcessor implements RunnableWindowProcessor {
private static final int TIME_OUT = 60 * 1000;
static final Logger log = Logger.getLogger(FaultHandlingWindowProcessor.class);
- private ScheduledExecutorService eventRemoverScheduler;
- private int subjectedAttrIndex;
+ private ScheduledExecutorService faultHandleScheduler;
private ThreadBarrier threadBarrier;
private long timeToKeep;
private ISchedulerSiddhiQueue<StreamEvent> window;
+ private EventPublisher healthStatPublisher = EventPublisherPool.getPublisher(Constants.HEALTH_STAT_TOPIC);
+ private Map<String, Object> MemberFaultEventMap = new HashMap<String, Object>();
+ private Map<String, Object> memberFaultEventMessageMap = new HashMap<String, Object>();
+
+ // Map of member id's to their last received health event time stamp
private ConcurrentHashMap<String, Long> memberTimeStampMap = new ConcurrentHashMap<String, Long>();
- private ConcurrentHashMap<String, Member> memberIdMap = new ConcurrentHashMap<String, Member>();
- EventPublisher healthStatPublisher = EventPublisherPool.getPublisher(Constants.HEALTH_STAT_TOPIC);
- Map<String, Object> MemberFaultEventMap = new HashMap<String, Object>();
- Map<String, Object> memberFaultEventMessageMap = new HashMap<String, Object>();
- private TopologyEventReceiver topologyEventReceiver;
- private String memberID;
+
+ // Event receiver to receive topology events published by cloud-controller
+ private CEPTopologyEventReceiver cepTopologyEventReceiver = new CEPTopologyEventReceiver(this);
+
+ // Stratos member id attribute index in stream execution plan
+ private int memberIdAttrIndex;
@Override
protected void processEvent(InEvent event) {
@@ -77,20 +90,34 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run
@Override
protected void processEvent(InListEvent listEvent) {
- System.out.println(listEvent);
for (int i = 0, size = listEvent.getActiveEvents(); i < size; i++) {
addDataToMap((InEvent) listEvent.getEvent(i));
}
}
+ /**
+ * Add new entry to time stamp map from the received event.
+ *
+ * @param event Event received by Siddhi.
+ */
protected void addDataToMap(InEvent event) {
- if (memberID != null) {
- String id = (String)event.getData()[subjectedAttrIndex];
+ String id = (String) event.getData()[memberIdAttrIndex];
+ //checking whether this member is the topology.
+ //sometimes there can be a delay between publishing member terminated events
+ //and actually terminating instances. Hence CEP might get events for already terminated members
+ //so we are checking the topology for the member existence
+ Member member = getMemberFromId(id);
+ if (null == member) {
+ log.debug("Member not found in the toplogy. Event rejected");
+ return;
+ }
+ if (StringUtils.isNotEmpty(id)) {
memberTimeStampMap.put(id, event.getTimeStamp());
- log.debug("Event received from [member-id] " + id);
+ } else {
+ log.warn("NULL member id found in the event received. Event rejected.");
}
- else {
- log.error("NULL member ID in the event received");
+ if (log.isDebugEnabled()){
+ log.debug("Event received from [member-id] " + id + " [time-stamp] " + event.getTimeStamp());
}
}
@@ -108,55 +135,88 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run
}
}
- /*
- * Retrieve the current activated member list from the topology and put them into the
- * memberTimeStampMap if not already exists. This will allow the system to recover
- * from any inconsistent state caused by MB/CEP failures.
- */
- private void loadFromTopology(){
- if (TopologyManager.getTopology().isInitialized()){
- TopologyManager.acquireReadLock();
- memberIdMap.clear();
- long currentTimeStamp = System.currentTimeMillis();
- Iterator<Service> servicesItr = TopologyManager.getTopology().getServices().iterator();
- while(servicesItr.hasNext()){
- Service service = servicesItr.next();
- Iterator<Cluster> clusterItr = service.getClusters().iterator();
- while(clusterItr.hasNext()){
- Cluster cluster = clusterItr.next();
- Iterator<Member> memberItr = cluster.getMembers().iterator();
- while(memberItr.hasNext()){
- Member member = memberItr.next();
- if (member.getStatus().equals(MemberStatus.Activated)){
- memberTimeStampMap.putIfAbsent(member.getMemberId(), currentTimeStamp);
- memberIdMap.put(member.getMemberId(), member);
+ /**
+ * Retrieve the current activated members from the topology and initialize the time stamp map.
+ * This will allow the system to recover from a restart
+ *
+ * @param topology Topology model object
+ */
+ boolean loadTimeStampMapFromTopology(Topology topology){
+
+ long currentTimeStamp = System.currentTimeMillis();
+ if (topology == null || topology.getServices() == null){
+ return false;
+ }
+ // TODO make this efficient by adding APIs to messaging component
+ for (Service service : topology.getServices()) {
+ if (service.getClusters() != null) {
+ for (Cluster cluster : service.getClusters()) {
+ if (cluster.getMembers() != null) {
+ for (Member member : cluster.getMembers()) {
+ // we are checking faulty status only in previously activated members
+ if (member != null && MemberStatus.Activated.equals(member.getStatus())) {
+ // Initialize the member time stamp map from the topology at the beginning
+ memberTimeStampMap.putIfAbsent(member.getMemberId(), currentTimeStamp);
+ }
}
}
}
}
- TopologyManager.releaseReadLock();
}
+
+ log.info("Member time stamp map was successfully loaded from the topology.");
if (log.isDebugEnabled()){
log.debug("Member TimeStamp Map: " + memberTimeStampMap);
- log.debug("Member ID Map: " + memberIdMap);
}
+ return true;
}
- private void publishMemberFault(String memberID){
- Member member = memberIdMap.get(memberID);
+ private Member getMemberFromId(String memberId){
+ if (StringUtils.isEmpty(memberId)){
+ return null;
+ }
+ if (TopologyManager.getTopology().isInitialized()){
+ try {
+ TopologyManager.acquireReadLock();
+ if (TopologyManager.getTopology().getServices() == null){
+ return null;
+ }
+ // TODO make this efficient by adding APIs to messaging component
+ for (Service service : TopologyManager.getTopology().getServices()) {
+ if (service.getClusters() != null) {
+ for (Cluster cluster : service.getClusters()) {
+ if (cluster.getMembers() != null) {
+ for (Member member : cluster.getMembers()){
+ if (memberId.equals(member.getMemberId())){
+ return member;
+ }
+ }
+ }
+ }
+ }
+ }
+ } catch (Exception e) {
+ log.error("Error while reading topology" + e);
+ } finally {
+ TopologyManager.releaseReadLock();
+ }
+ }
+ return null;
+ }
+
+ private void publishMemberFault(String memberId){
+ Member member = getMemberFromId(memberId);
if (member == null){
- log.error("Failed to publish MemberFault event. Member having [member-id] " + memberID + " does not exist in topology");
+ log.error("Failed to publish member fault event. Member having [member-id] " + memberId +
+ " does not exist in topology");
return;
}
- MemberFaultEvent memberFaultEvent = new MemberFaultEvent(member.getClusterId(), member.getMemberId(), member.getPartitionId(), 0);
- memberFaultEventMessageMap.put("message", memberFaultEvent);
- Properties headers = new Properties();
- headers.put(Constants.EVENT_CLASS_NAME, memberFaultEvent.getClass().getName());
- healthStatPublisher.publish(MemberFaultEventMap, headers, true);
+ log.info("Publishing member fault event for [member-id] " + memberId);
- if (log.isDebugEnabled()){
- log.debug("Published MemberFault event for [member-id] " + memberID);
- }
+ MemberFaultEvent memberFaultEvent = new MemberFaultEvent(member.getClusterId(), member.getMemberId(),
+ member.getPartitionId(), 0);
+ memberFaultEventMessageMap.put("message", memberFaultEvent);
+ healthStatPublisher.publish(MemberFaultEventMap, true);
}
@@ -164,26 +224,26 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run
public void run() {
try {
threadBarrier.pass();
- loadFromTopology();
- Iterator it = memberTimeStampMap.entrySet().iterator();
- while ( it.hasNext() ) {
- Map.Entry pair = (Map.Entry)it.next();
+ for (Object o : memberTimeStampMap.entrySet()) {
+ Map.Entry pair = (Map.Entry) o;
long currentTime = System.currentTimeMillis();
Long eventTimeStamp = (Long) pair.getValue();
if ((currentTime - eventTimeStamp) > TIME_OUT) {
- log.info("Faulty member detected [member-id] " + pair.getKey() + " with [last time-stamp] " + eventTimeStamp + " [time-out] " + TIME_OUT + " milliseconds");
- it.remove();
+ log.info("Faulty member detected [member-id] " + pair.getKey() + " with [last time-stamp] " +
+ eventTimeStamp + " [time-out] " + TIME_OUT + " milliseconds");
publishMemberFault((String) pair.getKey());
}
}
if (log.isDebugEnabled()){
- log.debug("Fault handling processor iteration completed with [time-stamp map length] " + memberTimeStampMap.size() + " [activated member-count] " + memberIdMap.size());
+ log.debug("Fault handling processor iteration completed with [time-stamp map length] " +
+ memberTimeStampMap.size() + " [time-stamp map] " + memberTimeStampMap);
}
- eventRemoverScheduler.schedule(this, timeToKeep, TimeUnit.MILLISECONDS);
} catch (Throwable t) {
log.error(t.getMessage(), t);
+ } finally {
+ faultHandleScheduler.schedule(this, timeToKeep, TimeUnit.MILLISECONDS);
}
}
@@ -200,17 +260,16 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run
}
@Override
- protected void init(Expression[] parameters, QueryPostProcessingElement nextProcessor, AbstractDefinition streamDefinition, String elementId, boolean async, SiddhiContext siddhiContext) {
+ protected void init(Expression[] parameters, QueryPostProcessingElement nextProcessor,
+ AbstractDefinition streamDefinition, String elementId, boolean async, SiddhiContext siddhiContext) {
if (parameters[0] instanceof IntConstant) {
timeToKeep = ((IntConstant) parameters[0]).getValue();
} else {
timeToKeep = ((LongConstant) parameters[0]).getValue();
}
- memberID = ((Variable)parameters[1]).getAttributeName();
-
- String subjectedAttr = ((Variable)parameters[1]).getAttributeName();
- subjectedAttrIndex = streamDefinition.getAttributePosition(subjectedAttr);
+ String memberIdAttrName = ((Variable) parameters[1]).getAttributeName();
+ memberIdAttrIndex = streamDefinition.getAttributePosition(memberIdAttrName);
if (this.siddhiContext.isDistributedProcessingEnabled()) {
window = new SchedulerSiddhiQueueGrid<StreamEvent>(elementId, this, this.siddhiContext, this.async);
@@ -218,29 +277,32 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run
window = new SchedulerSiddhiQueue<StreamEvent>(this);
}
MemberFaultEventMap.put("org.apache.stratos.messaging.event.health.stat.MemberFaultEvent", memberFaultEventMessageMap);
- this.topologyEventReceiver = new TopologyEventReceiver();
- Thread thread = new Thread(topologyEventReceiver);
- thread.start();
- log.info("WSO2 CEP topology receiver thread started");
+
+ Thread topologyTopicSubscriberThread = new Thread(cepTopologyEventReceiver);
+ topologyTopicSubscriberThread.start();
//Ordinary scheduling
window.schedule();
-
+ if (log.isDebugEnabled()){
+ log.debug("Fault handling window processor initialized with [timeToKeep] " + timeToKeep +
+ ", [memberIdAttrName] " + memberIdAttrName + ", [memberIdAttrIndex] " + memberIdAttrIndex +
+ ", [distributed-enabled] " + this.siddhiContext.isDistributedProcessingEnabled());
+ }
}
@Override
public void schedule() {
- eventRemoverScheduler.schedule(this, timeToKeep, TimeUnit.MILLISECONDS);
+ faultHandleScheduler.schedule(this, timeToKeep, TimeUnit.MILLISECONDS);
}
@Override
public void scheduleNow() {
- eventRemoverScheduler.schedule(this, 0, TimeUnit.MILLISECONDS);
+ faultHandleScheduler.schedule(this, 0, TimeUnit.MILLISECONDS);
}
@Override
public void setScheduledExecutorService(ScheduledExecutorService scheduledExecutorService) {
- this.eventRemoverScheduler = scheduledExecutorService;
+ this.faultHandleScheduler = scheduledExecutorService;
}
@Override
@@ -250,7 +312,12 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run
@Override
public void destroy(){
- this.topologyEventReceiver.terminate();
+ // terminate topology listener thread
+ cepTopologyEventReceiver.terminate();
window = null;
}
+
+ public ConcurrentHashMap<String, Long> getMemberTimeStampMap() {
+ return memberTimeStampMap;
+ }
}