You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by st...@apache.org on 2015/10/12 15:54:15 UTC

svn commit: r1708119 - in /sling/trunk/bundles/extensions/discovery/impl: ./ src/main/java/org/apache/sling/discovery/impl/ src/main/java/org/apache/sling/discovery/impl/common/heartbeat/ src/main/java/org/apache/sling/discovery/impl/topology/ src/test...

Author: stefanegli
Date: Mon Oct 12 13:54:14 2015
New Revision: 1708119

URL: http://svn.apache.org/viewvc?rev=1708119&view=rev
Log:
SLING-5094 : refactoring DiscoveryServiceImpl to use ViewStateManager of discovery.commons

Modified:
    sling/trunk/bundles/extensions/discovery/impl/pom.xml
    sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/DiscoveryServiceImpl.java
    sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/common/heartbeat/HeartbeatHandler.java
    sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/topology/TopologyChangeHandler.java
    sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/topology/TopologyViewImpl.java
    sling/trunk/bundles/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/cluster/ClusterTest.java
    sling/trunk/bundles/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/cluster/SingleInstanceTest.java
    sling/trunk/bundles/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/cluster/TopologyEventTest.java
    sling/trunk/bundles/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/cluster/helpers/AssertingTopologyEventListener.java
    sling/trunk/bundles/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/common/heartbeat/HeartbeatTest.java

Modified: sling/trunk/bundles/extensions/discovery/impl/pom.xml
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/discovery/impl/pom.xml?rev=1708119&r1=1708118&r2=1708119&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/discovery/impl/pom.xml (original)
+++ sling/trunk/bundles/extensions/discovery/impl/pom.xml Mon Oct 12 13:54:14 2015
@@ -128,6 +128,12 @@
 		</dependency>
 		<dependency>
 			<groupId>org.apache.sling</groupId>
+			<artifactId>org.apache.sling.discovery.commons</artifactId>
+			<version>1.0.0-SNAPSHOT</version>
+            <scope>provided</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.sling</groupId>
 			<artifactId>org.apache.sling.api</artifactId>
 			<version>2.4.0</version>
             <scope>provided</scope>

Modified: sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/DiscoveryServiceImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/DiscoveryServiceImpl.java?rev=1708119&r1=1708118&r2=1708119&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/DiscoveryServiceImpl.java (original)
+++ sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/DiscoveryServiceImpl.java Mon Oct 12 13:54:14 2015
@@ -20,20 +20,18 @@ package org.apache.sling.discovery.impl;
 
 import java.net.URL;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.Date;
 import java.util.Dictionary;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Hashtable;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
@@ -53,10 +51,12 @@ import org.apache.sling.discovery.Cluste
 import org.apache.sling.discovery.DiscoveryService;
 import org.apache.sling.discovery.InstanceDescription;
 import org.apache.sling.discovery.PropertyProvider;
-import org.apache.sling.discovery.TopologyEvent;
-import org.apache.sling.discovery.TopologyEvent.Type;
 import org.apache.sling.discovery.TopologyEventListener;
 import org.apache.sling.discovery.TopologyView;
+import org.apache.sling.discovery.commons.providers.BaseTopologyView;
+import org.apache.sling.discovery.commons.providers.ViewStateManager;
+import org.apache.sling.discovery.commons.providers.impl.ViewStateManagerFactory;
+import org.apache.sling.discovery.commons.providers.spi.ConsistencyService;
 import org.apache.sling.discovery.impl.cluster.ClusterViewService;
 import org.apache.sling.discovery.impl.cluster.UndefinedClusterViewException;
 import org.apache.sling.discovery.impl.cluster.UndefinedClusterViewException.Reason;
@@ -85,157 +85,11 @@ public class DiscoveryServiceImpl implem
 
     private final static Logger logger = LoggerFactory.getLogger(DiscoveryServiceImpl.class);
 
-    /** SLING-4755 : encapsulates an event that yet has to be sent (asynchronously) for a particular listener **/
-    private final static class AsyncEvent {
-        private final TopologyEventListener listener;
-        private final TopologyEvent event;
-        AsyncEvent(TopologyEventListener listener, TopologyEvent event) {
-            if (listener==null) {
-                throw new IllegalArgumentException("listener must not be null");
-            }
-            if (event==null) {
-                throw new IllegalArgumentException("event must not be null");
-            }
-            this.listener = listener;
-            this.event = event;
-        }
-        @Override
-        public String toString() {
-            return "an AsyncEvent[event="+event+", listener="+listener+"]";
-        }
-    }
-    
-    /** 
-     * SLING-4755 : background runnable that takes care of asynchronously sending events.
-     * <p>
-     * API is: enqueue() puts a listener-event tuple onto the internal Q, which
-     * is processed in a loop in run that does so (uninterruptably, even catching
-     * Throwables to be 'very safe', but sleeps 5sec if an Error happens) until
-     * flushThenStop() is called - which puts the sender in a state where any pending
-     * events are still sent (flush) but then stops automatically. The argument of
-     * using flush before stop is that the event was originally meant to be sent
-     * before the bundle was stopped - thus just because the bundle is stopped
-     * doesn't undo the event and it still has to be sent. That obviously can
-     * mean that listeners can receive a topology event after deactivate. But I
-     * guess that was already the case before the change to become asynchronous.
-     */
-    private final static class AsyncEventSender implements Runnable {
-        
-        /** stopped is always false until flushThenStop is called **/
-        private boolean stopped = false;
-
-        /** eventQ contains all AsyncEvent objects that have yet to be sent - in order to be sent **/
-        private final List<AsyncEvent> eventQ = new LinkedList<AsyncEvent>();
-        
-        /** Enqueues a particular event for asynchronous sending to a particular listener **/
-        void enqueue(TopologyEventListener listener, TopologyEvent event) {
-            final AsyncEvent asyncEvent = new AsyncEvent(listener, event);
-            synchronized(eventQ) {
-                eventQ.add(asyncEvent);
-                if (logger.isDebugEnabled()) {
-                    logger.debug("enqueue: enqueued event {} for async sending (Q size: {})", asyncEvent, eventQ.size());
-                }
-                eventQ.notifyAll();
-            }
-        }
-        
-        /**
-         * Stops the AsyncEventSender as soon as the queue is empty
-         */
-        void flushThenStop() {
-            synchronized(eventQ) {
-                logger.info("AsyncEventSender.flushThenStop: flushing (size: {}) & stopping...", eventQ.size());
-                stopped = true;
-                eventQ.notifyAll();
-            }
-        }
-        
-        /** Main worker loop that dequeues from the eventQ and calls sendTopologyEvent with each **/
-        public void run() {
-            logger.info("AsyncEventSender.run: started.");
-            try{
-                while(true) {
-                    try{
-                        final AsyncEvent asyncEvent;
-                        synchronized(eventQ) {
-                            while(!stopped && eventQ.isEmpty()) {
-                                try {
-                                    eventQ.wait();
-                                } catch (InterruptedException e) {
-                                    // issue a log debug but otherwise continue
-                                    logger.debug("AsyncEventSender.run: interrupted while waiting for async events");
-                                }
-                            }
-                            if (stopped) {
-                                if (eventQ.isEmpty()) {
-                                    // then we have flushed, so we can now finally stop
-                                    logger.info("AsyncEventSender.run: flush finished. stopped.");
-                                    return;
-                                } else {
-                                    // otherwise the eventQ is not yet empty, so we are still in flush mode
-                                    logger.info("AsyncEventSender.run: flushing another event. (pending {})", eventQ.size());
-                                }
-                            }
-                            asyncEvent = eventQ.remove(0);
-                            if (logger.isDebugEnabled()) {
-                                logger.debug("AsyncEventSender.run: dequeued event {}, remaining: {}", asyncEvent, eventQ.size());
-                            }
-                        }
-                        if (asyncEvent!=null) {
-                            sendTopologyEvent(asyncEvent);
-                        }
-                    } catch(Throwable th) {
-                        // Even though we should never catch Error or RuntimeException
-                        // here's the thinking about doing it anyway:
-                        //  * in case of a RuntimeException that would be less dramatic
-                        //    and catching it is less of an issue - we rather want
-                        //    the background thread to be able to continue than
-                        //    having it finished just because of a RuntimeException
-                        //  * catching an Error is of course not so nice.
-                        //    however, should we really give up this thread even in
-                        //    case of an Error? It could be an OOM or some other 
-                        //    nasty one, for sure. But even if. Chances are that
-                        //    other parts of the system would also get that Error
-                        //    if it is very dramatic. If not, then catching it
-                        //    sounds feasible. 
-                        // My two cents..
-                        // the goal is to avoid quitting the AsyncEventSender thread
-                        logger.error("AsyncEventSender.run: Throwable occurred. Sleeping 5sec. Throwable: "+th, th);
-                        try {
-                            Thread.sleep(5000);
-                        } catch (InterruptedException e) {
-                            logger.warn("AsyncEventSender.run: interrupted while sleeping");
-                        }
-                    }
-                }
-            } finally {
-                logger.info("AsyncEventSender.run: quits (finally).");
-            }
-        }
-
-        /** Actual sending of the asynchronous event - catches RuntimeExceptions a listener can send. (Error is caught outside) **/
-        private void sendTopologyEvent(AsyncEvent asyncEvent) {
-            final TopologyEventListener listener = asyncEvent.listener;
-            final TopologyEvent event = asyncEvent.event;
-            logger.debug("sendTopologyEvent: start: listener: {}, event: {}", listener, event);
-            try{
-                listener.handleTopologyEvent(event);
-            } catch(final Exception e) {
-                logger.warn("sendTopologyEvent: handler threw exception. handler: "+listener+", exception: "+e, e);
-            }
-            logger.debug("sendTopologyEvent: start: listener: {}, event: {}", listener, event);
-        }
-        
-    }
-    
     @Reference
     private SlingSettingsService settingsService;
 
     @Reference(cardinality = ReferenceCardinality.OPTIONAL_MULTIPLE, policy = ReferencePolicy.DYNAMIC, referenceInterface = TopologyEventListener.class)
     private TopologyEventListener[] eventListeners = new TopologyEventListener[0];
-
-    /** SLING-5030 : this map contains the event last sent to each listener to prevent duplicate CHANGING events when scheduler is broken**/
-    private Map<TopologyEventListener,TopologyEvent.Type> lastEventMap = new HashMap<TopologyEventListener, TopologyEvent.Type>();
     
     /**
      * All property providers.
@@ -252,9 +106,6 @@ public class DiscoveryServiceImpl implem
      **/
     private boolean activated = false;
 
-    /** SLING-3750 : set when activate() could not send INIT events due to being in isolated mode **/
-    private boolean initEventDelayed = false;
-    
     @Reference
     private ResourceResolverFactory resourceResolverFactory;
 
@@ -282,20 +133,24 @@ public class DiscoveryServiceImpl implem
     /** the old view previously valid and sent to the TopologyEventListeners **/
     private TopologyViewImpl oldView;
 
-    /** 
-     * whether or not there is a delayed event sending pending.
-     * Marked volatile to allow getTopology() to read this without need for
-     * synchronized(lock) (which would be deadlock-prone). (introduced with SLING-4638).
-     **/
-    private volatile boolean delayedEventPending = false;
-    
-    /** used to continue functioning when scheduler is broken **/
-    private volatile boolean delayedEventPendingFailed = false;
-
     private ServiceRegistration mbeanRegistration;
 
-    /** SLING-4755 : reference to the background AsyncEventSender. Started/stopped in activate/deactivate **/
-    private AsyncEventSender asyncEventSender;
+    private ViewStateManager viewStateManager;
+
+    private ReentrantLock viewStateManagerLock; 
+    
+    public DiscoveryServiceImpl() {
+        viewStateManagerLock = new ReentrantLock();
+        final ConsistencyService consistencyService = new ConsistencyService() {
+
+            @Override
+            public void sync(BaseTopologyView view, Runnable callback) {
+                logger.debug("sync: no syncToken applicable");
+                callback.run();
+            }
+        };
+        viewStateManager = ViewStateManagerFactory.newViewStateManager(viewStateManagerLock, consistencyService);
+    }
 
     protected void registerMBean(BundleContext bundleContext) {
         if (this.mbeanRegistration!=null) {
@@ -323,6 +178,7 @@ public class DiscoveryServiceImpl implem
         if (view==null) {
             throw new IllegalArgumentException("view must not be null");
         }
+        logger.debug("setOldView: oldView is now: {}", oldView);
         oldView = view;
     }
     
@@ -342,6 +198,10 @@ public class DiscoveryServiceImpl implem
 
         slingId = settingsService.getSlingId();
 
+        if (config.getMinEventDelay()>0) {
+            viewStateManager.installMinEventDelayHandler(this, scheduler, config.getMinEventDelay());
+        }
+
         final String isolatedClusterId = UUID.randomUUID().toString();
         {
             // create a pre-voting/isolated topologyView which would be used
@@ -357,11 +217,11 @@ public class DiscoveryServiceImpl implem
             col.add(isolatedInstance);
             final TopologyViewImpl topology = new TopologyViewImpl();
             topology.addInstances(col);
-            topology.markOld();
+            topology.setNotCurrent();
             setOldView(topology);
         }
         setOldView((TopologyViewImpl) getTopology());
-        oldView.markOld();
+        oldView.setNotCurrent();
 
         // make sure the first heartbeat is issued as soon as possible - which
         // is right after this service starts. since the two (discoveryservice
@@ -369,35 +229,25 @@ public class DiscoveryServiceImpl implem
         // is passed on to the heartbeatHandler in this initialize call).
         heartbeatHandler.initialize(this, isolatedClusterId);
 
-        final TopologyEventListener[] registeredServices;
-        synchronized (lock) {
-            // SLING-4755 : start the asyncEventSender in the background
-            //              will be stopped in deactivate (at which point
-            //              all pending events will still be sent but no
-            //              new events can be enqueued)
-            asyncEventSender = new AsyncEventSender();
-            Thread th = new Thread(asyncEventSender);
-            th.setName("Discovery-AsyncEventSender");
-            th.setDaemon(true);
-            th.start();
+        viewStateManagerLock.lock();
+        try{
+            viewStateManager.handleActivated();
 
-            registeredServices = this.eventListeners;
             doUpdateProperties();
 
             TopologyViewImpl newView = (TopologyViewImpl) getTopology();
-            if (!newView.isCurrent()) {
+            if (newView.isCurrent()) {
+                viewStateManager.handleNewView(newView);
+            } else {
                 // SLING-3750: just issue a log.info about the delaying
                 logger.info("activate: this instance is in isolated mode and must yet finish voting before it can send out TOPOLOGY_INIT.");
-                initEventDelayed = true;
-            } else {
-                final TopologyEvent event = new TopologyEvent(Type.TOPOLOGY_INIT, null,
-                        newView);
-                for (final TopologyEventListener da : registeredServices) {
-                    enqueueAsyncTopologyEvent(da, event);
-                }
             }
             activated = true;
             setOldView(newView);
+        } finally {
+            if (viewStateManagerLock!=null) {
+                viewStateManagerLock.unlock();
+            }
         }
 
         URL[] topologyConnectorURLs = config.getTopologyConnectorURLs();
@@ -420,40 +270,20 @@ public class DiscoveryServiceImpl implem
         logger.debug("DiscoveryServiceImpl activated.");
     }
 
-    private void enqueueAsyncTopologyEvent(final TopologyEventListener da, final TopologyEvent event) {
-        if (logger.isDebugEnabled()) {
-            logger.debug("enqueueAsyncTopologyEvent: sending topologyEvent {}, to {}", event, da);
-        }
-        if (asyncEventSender==null) {
-            // this should never happen - sendTopologyEvent should only be called
-            // when activated
-            logger.warn("enqueueAsyncTopologyEvent: asyncEventSender is null, cannot send event ({}, {})!", da, event);
-            return;
-        }
-        if (lastEventMap.get(da)==event.getType() && event.getType()==Type.TOPOLOGY_CHANGING) {
-            // don't sent TOPOLOGY_CHANGING twice
-            logger.debug("enqueueAsyncTopologyEvent: listener already got TOPOLOGY_CHANGING: {}", da);
-            return;
-        }
-        asyncEventSender.enqueue(da, event);
-        lastEventMap.put(da, event.getType());
-        if (logger.isDebugEnabled()) {
-            logger.debug("enqueueAsyncTopologyEvent: sending topologyEvent {}, to {}", event, da);
-        }
-    }
-
     /**
      * Deactivate this service
      */
     @Deactivate
     protected void deactivate() {
         logger.debug("DiscoveryServiceImpl deactivated.");
-        synchronized (lock) {
+        viewStateManagerLock.lock();
+        try{
+            viewStateManager.handleDeactivated();
+            
             activated = false;
-            if (asyncEventSender!=null) {
-                // it should always be not-null though
-                asyncEventSender.flushThenStop();
-                asyncEventSender = null;
+        } finally {
+            if (viewStateManagerLock!=null) {
+                viewStateManagerLock.unlock();
             }
         }
         try{
@@ -470,45 +300,14 @@ public class DiscoveryServiceImpl implem
      * bind a topology event listener
      */
     protected void bindTopologyEventListener(final TopologyEventListener eventListener) {
-
-        logger.debug("bindTopologyEventListener: Binding TopologyEventListener {}",
-                eventListener);
-
-        synchronized (lock) {
-            final List<TopologyEventListener> currentList = new ArrayList<TopologyEventListener>(
-                    Arrays.asList(eventListeners));
-            currentList.add(eventListener);
-            this.eventListeners = currentList
-                    .toArray(new TopologyEventListener[currentList.size()]);
-            if (activated && !initEventDelayed) {
-                final TopologyViewImpl topology = (TopologyViewImpl) getTopology();
-                if (delayedEventPending) {
-                    // that means that for other TopologyEventListeners that were already bound
-                    // and in general: the topology is currently CHANGING
-                    // so we must reflect this with the isCurrent() flag (SLING-4638)
-                    topology.markOld();
-                }
-                enqueueAsyncTopologyEvent(eventListener, new TopologyEvent(
-                        Type.TOPOLOGY_INIT, null, topology));
-            }
-        }
+        viewStateManager.bind(eventListener);
     }
 
     /**
      * Unbind a topology event listener
      */
     protected void unbindTopologyEventListener(final TopologyEventListener eventListener) {
-
-        logger.debug("unbindTopologyEventListener: Releasing TopologyEventListener {}",
-                eventListener);
-
-        synchronized (lock) {
-            final List<TopologyEventListener> currentList = new ArrayList<TopologyEventListener>(
-                    Arrays.asList(eventListeners));
-            currentList.remove(eventListener);
-            this.eventListeners = currentList
-                    .toArray(new TopologyEventListener[currentList.size()]);
-        }
+        viewStateManager.unbind(eventListener);
     }
 
     /**
@@ -688,7 +487,7 @@ public class DiscoveryServiceImpl implem
             // update the announcements but just return
             // the previous oldView marked as !current
             logger.info("getTopology: undefined cluster view: "+e.getReason()+"] "+e);
-            oldView.markOld();
+            oldView.setNotCurrent();
             if (e.getReason()==Reason.ISOLATED_FROM_TOPOLOGY) {
                 if (heartbeatHandler!=null) {
                     // SLING-5030 part 2: when we detect being isolated we should
@@ -734,195 +533,6 @@ public class DiscoveryServiceImpl implem
     }
 
     /**
-     * Internal handle method which checks if anything in the topology has
-     * changed and informs the TopologyEventListeners if such a change occurred.
-     * <p>
-     * All changes should go through this method. This method keeps track of
-     * oldView/newView as well.
-     */
-    private void handlePotentialTopologyChange() {
-        if (!activated) {
-            // ignore this call then - an early call to issue
-            // a topologyevent before even activated
-            logger.debug("handlePotentialTopologyChange: ignoring early change before activate finished.");
-            return;
-        }
-        if (delayedEventPending && !delayedEventPendingFailed) {
-            logger.debug("handlePotentialTopologyChange: ignoring potential change since a delayed event is pending.");
-            return;
-        }
-        TopologyViewImpl newView = (TopologyViewImpl) getTopology();
-        if (initEventDelayed) {
-            // this means activate could not yet send a TOPOLOGY_INIT event
-            // (which can happen frequently) - so we have to do this now
-            // that we potentially have a valid view
-            if (!newView.isCurrent()) {
-                // we cannot proceed until we're out of the isolated mode..
-                // SLING-4535 : while this has warning character, it happens very frequently,
-                //              eg also when binding a PropertyProvider (so normal processing)
-                //              hence lowering to info for now
-                logger.info("handlePotentialTopologyChange: still in isolated mode - cannot send TOPOLOGY_INIT yet.");
-            } else {
-                logger.info("handlePotentialTopologyChange: new view is no longer isolated sending delayed TOPOLOGY_INIT now.");
-                // SLING-4638: OK: newView is current==true as we're just coming out of initEventDelayed first time.
-                enqueueForAll(Type.TOPOLOGY_INIT, null, newView);
-                initEventDelayed = false;
-            }
-            return;
-        }
-
-        TopologyViewImpl oldView = this.oldView;
-        Type difference;
-        if (!newView.isCurrent()) {
-            difference = Type.TOPOLOGY_CHANGING;
-        } else {
-            difference = newView.compareTopology(oldView);
-        }
-        if (difference == null) { // indicating: equals
-            if (delayedEventPendingFailed) {
-                // when the delayed event handling for some very odd reason could
-                // not re-spawn itself (via runAfter) - in that case we now
-                // have listeners in CHANGING state .. which we should wake up
-                enqueueForAll(Type.TOPOLOGY_CHANGED, oldView, newView);
-                delayedEventPendingFailed = false;
-                delayedEventPending = false;
-            } else {
-                // then dont send any event then
-                logger.debug("handlePotentialTopologyChange: identical views. not informing listeners");
-            }
-            return;
-        } else if (difference == Type.PROPERTIES_CHANGED) {
-            enqueueForAll(Type.PROPERTIES_CHANGED, oldView, newView);
-            return;
-        }
-        delayedEventPendingFailed = false;
-        delayedEventPending = false;
-
-        // else: TOPOLOGY_CHANGING or CHANGED
-        if (logger.isDebugEnabled()) {
-            logger.debug("handlePotentialTopologyChange: difference: {}, oldView={}, newView={}",
-                    new Object[] {difference, oldView, newView});
-        }
-
-    	// send a TOPOLOGY_CHANGING first
-        logger.info("handlePotentialTopologyChange: sending "+Type.TOPOLOGY_CHANGING+
-                " to all listeners (that have not gotten one yet) (oldView={}).", oldView);
-        oldView.markOld();
-        for (final TopologyEventListener da : eventListeners) {
-            enqueueAsyncTopologyEvent(da, new TopologyEvent(Type.TOPOLOGY_CHANGING, oldView,
-                    null));
-        }
-
-        int minEventDelay = config.getMinEventDelay();
-        if ((!newView.isCurrent()) && minEventDelay<=0) {
-            // if newView is isolated
-            // then we should not send a TOPOLOGY_CHANGED yet - but instead
-            // wait until the view gets resolved. that is achieved by
-            // going into event-delaying and retrying that way.
-            // and if minEventDelay is not configured, then auto-switch
-            // to a 1sec such minEventDelay:
-            minEventDelay=1;
-        }
-        
-        if (minEventDelay<=0) {
-            // otherwise, send the TOPOLOGY_CHANGED now
-            enqueueForAll(Type.TOPOLOGY_CHANGED, oldView, newView);
-            return;
-        }
-
-        // then delay the sending of the next event
-        logger.debug("handlePotentialTopologyChange: delaying event sending to avoid event flooding");
-
-        if (runAfter(minEventDelay /*seconds*/ , new Runnable() {
-
-            public void run() {
-                logger.debug("handlePotentialTopologyChange: acquiring synchronized(lock)...");
-                synchronized(lock) {
-                	logger.debug("handlePotentialTopologyChange: sending delayed event now");
-                	if (!activated) {
-                	    delayedEventPending = false;
-                	    delayedEventPendingFailed = false;
-                		logger.debug("handlePotentialTopologyChange: no longer activated. not sending delayed event");
-                		return;
-                	}
-                    final TopologyViewImpl newView = (TopologyViewImpl) getTopology();
-                    // irrespective of the difference, send the latest topology
-                    // via a topology_changed event (since we already sent a changing)
-                    if (!newView.isCurrent()) {
-                        // if the newView is isolated at this stage we have sent
-                        // TOPOLOGY_CHANGING to the listeners, and they are now waiting
-                        // for TOPOLOGY_CHANGED. But we can't send them that yet..
-                        // we must do a loop via the minEventDelay mechanism and log 
-                        // accordingly
-                        if (runAfter(1/*sec*/, this)) {
-                            logger.warn("handlePotentialTopologyChange: local instance is isolated from topology. Waiting for rejoining...");
-                            return;
-                        }
-                        // otherwise we have to fall back to still sending a TOPOLOGY_CHANGED
-                        // but that's unexpected! (back to delayedEventPending=false..)
-                        delayedEventPendingFailed = true;
-                        logger.warn("handlePotentialTopologyChange: local instance is isolated from topology but failed to trigger delay-job");
-                        return;
-                    }
-
-                    enqueueForAll(Type.TOPOLOGY_CHANGED, DiscoveryServiceImpl.this.oldView, newView);
-                    delayedEventPending = false;
-                    delayedEventPendingFailed = false;
-                }
-            }
-        })) {
-        	delayedEventPending = true;
-            delayedEventPendingFailed = false;
-            logger.debug("handlePotentialTopologyChange: delayed event triggering.");
-            return;
-        } else {
-        	logger.debug("handlePotentialTopologyChange: delaying event triggering did not work for some reason. "
-        	        + "Will be retriggered lazily via later heartbeat.");
-        	delayedEventPending = true;
-        	delayedEventPendingFailed = true;
-        	return;
-        }
-    }
-    
-    private void enqueueForAll(Type eventType, TopologyViewImpl oldView, TopologyViewImpl newView) {
-        if (oldView!=null) {
-            oldView.markOld();
-        }
-        logger.info("enqueueForAll: sending "+eventType+" to all listeners (oldView={}, newView={}).", oldView, newView);
-        for (final TopologyEventListener da : eventListeners) {
-            enqueueAsyncTopologyEvent(da, new TopologyEvent(eventType, oldView, newView));
-        }
-        if (eventType!=Type.TOPOLOGY_CHANGING) {
-            setOldView(newView);
-        }
-        if (heartbeatHandler!=null) {
-            // trigger a heartbeat 'now' to pass it on to the topology asap
-            heartbeatHandler.triggerHeartbeat();
-        }
-    }
-
-    /**
-     * run the runnable after the indicated number of seconds, once.
-     * @return true if the scheduling of the runnable worked, false otherwise
-     */
-    private boolean runAfter(int seconds, final Runnable runnable) {
-        final Scheduler theScheduler = scheduler;
-    	if (theScheduler == null) {
-    		logger.info("runAfter: no scheduler set");
-    		return false;
-    	}
-    	logger.debug("runAfter: trying with scheduler.fireJob");
-    	final Date date = new Date(System.currentTimeMillis() + seconds * 1000);
-		try {
-		    theScheduler.fireJobAt(null, runnable, null, date);
-			return true;
-		} catch (Exception e) {
-			logger.info("runAfter: could not schedule a job: "+e);
-			return false;
-		}
-    }
-
-    /**
      * Internal class caching some provider infos like service id and ranking.
      */
     private final static class ProviderInfo implements Comparable<ProviderInfo> {
@@ -1002,13 +612,27 @@ public class DiscoveryServiceImpl implem
     /**
      * Handle the fact that the topology has likely changed
      */
-    public void handleTopologyChanged() {
-        logger.debug("handleTopologyChanged: calling handlePotentialTopologyChange.");
-        synchronized ( this.lock ) {
-            handlePotentialTopologyChange();
+    public void handlePotentialTopologyChange() {
+        BaseTopologyView t = (BaseTopologyView) getTopology();
+        if (t.isCurrent()) {
+            // if we have a valid view, let the viewStateManager do the
+            // comparison and sending of an event, if necessary
+            viewStateManager.handleNewView(t);
+        } else {
+            // if we don't have a view, then we might have to send
+            // a CHANGING event, let that be decided by the viewStateManager as well
+            viewStateManager.handleChanging();
         }
     }
 
+    /**
+     * Handle the fact that the topology has started to change - inform the listeners asap
+     */
+    public void handleTopologyChanging() {
+        logger.debug("handleTopologyChanging: invoking viewStateManager.handlechanging");
+        viewStateManager.handleChanging();
+    }
+
     /** SLING-2901 : send a TOPOLOGY_CHANGING event and shutdown the service thereafter **/
 	public void forcedShutdown() {
 		synchronized(lock) {
@@ -1022,11 +646,8 @@ public class DiscoveryServiceImpl implem
 	        }
 	        logger.error("forcedShutdown: sending TOPOLOGY_CHANGING to all listeners");
 	        // SLING-4638: make sure the oldView is really marked as old:
-	        oldView.markOld();
-            for (final TopologyEventListener da : eventListeners) {
-                enqueueAsyncTopologyEvent(da, new TopologyEvent(Type.TOPOLOGY_CHANGING, oldView,
-                        null));
-            }
+	        oldView.setNotCurrent();
+	        viewStateManager.handleChanging();
 	        logger.error("forcedShutdown: deactivating DiscoveryService.");
 	        // to make sure no further event is sent after this, flag this service as deactivated
             activated = false;

Modified: sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/common/heartbeat/HeartbeatHandler.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/common/heartbeat/HeartbeatHandler.java?rev=1708119&r1=1708118&r2=1708119&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/common/heartbeat/HeartbeatHandler.java (original)
+++ sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/common/heartbeat/HeartbeatHandler.java Mon Oct 12 13:54:14 2015
@@ -280,6 +280,7 @@ public class HeartbeatHandler implements
             // use 'fireJobAt' here, instead of 'fireJob' to make sure the job can always be triggered
             // 'fireJob' checks for a job from the same job-class to already exist
             // 'fireJobAt' though allows to pass a name for the job - which can be made unique, thus does not conflict/already-exist
+            logger.info("triggerHeartbeat: firing job to trigger heartbeat");
             scheduler.fireJobAt(NAME+UUID.randomUUID(), this, null, new Date(System.currentTimeMillis()-1000 /* make sure it gets triggered immediately*/));
         } catch (Exception e) {
             logger.info("triggerHeartbeat: Could not trigger heartbeat: " + e);
@@ -396,6 +397,7 @@ public class HeartbeatHandler implements
             				// sling instance accessing the same repository (ie in the same cluster)
             				// using the same sling.id - hence writing to the same
             				// resource
+            			    discoveryService.handleTopologyChanging();
             				logger.error("issueClusterLocalHeartbeat: SLING-2892: Detected unexpected, concurrent update of: "+
             						myClusterNodePath+" 'lastHeartbeat'. If not done manually, " +
             						"this likely indicates that there is more than 1 instance running in this cluster" +
@@ -415,6 +417,7 @@ public class HeartbeatHandler implements
             	    // someone deleted the resource property
             	    firstHeartbeatWritten = -1;
             	} else if (!runtimeId.equals(readRuntimeId)) {
+            	    discoveryService.handleTopologyChanging();
                     final String slingHomePath = slingSettingsService==null ? "n/a" : slingSettingsService.getSlingHomePath();
                     final String endpointsAsString = getEndpointsAsString();
                     final String readEndpoints = resourceMap.get(PROPERTY_ID_ENDPOINTS, String.class);
@@ -575,6 +578,11 @@ public class HeartbeatHandler implements
         if (winningVoting != null || (numOpenNonWinningVotes > 0)) {
             // then there are votings pending and I shall wait for them to
             // settle
+            
+            // but first: make sure we sent the TOPOLOGY_CHANGING
+            logger.info("doCheckView: there are pending votings, marking topology as changing...");
+            discoveryService.handleTopologyChanging();
+            
         	if (logger.isDebugEnabled()) {
 	            logger.debug("doCheckView: "
 	                    + numOpenNonWinningVotes
@@ -595,6 +603,11 @@ public class HeartbeatHandler implements
             logger.debug("doCheckView: no pending nor winning votes. view is fine. we're all happy.");
             return;
         }
+        
+        // immediately send a TOPOLOGY_CHANGING - could already be sent, but just to be sure
+        logger.info("doCheckView: no matching established view, marking topology as changing");
+        discoveryService.handleTopologyChanging();
+        
     	if (logger.isDebugEnabled()) {
 	        logger.debug("doCheckView: no pending nor winning votes. But: view does not match established or no established yet. Initiating a new voting");
 	        Iterator<String> it = liveInstances.iterator();

Modified: sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/topology/TopologyChangeHandler.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/topology/TopologyChangeHandler.java?rev=1708119&r1=1708118&r2=1708119&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/topology/TopologyChangeHandler.java (original)
+++ sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/topology/TopologyChangeHandler.java Mon Oct 12 13:54:14 2015
@@ -131,7 +131,7 @@ public class TopologyChangeHandler imple
 
     /** Inform the DiscoveryServiceImpl that the topology (might) have changed **/
     private void handleTopologyChanged() {
-        discoveryService.handleTopologyChanged();
+        discoveryService.handlePotentialTopologyChange();
     }
 
 }
\ No newline at end of file

Modified: sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/topology/TopologyViewImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/topology/TopologyViewImpl.java?rev=1708119&r1=1708118&r2=1708119&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/topology/TopologyViewImpl.java (original)
+++ sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/topology/TopologyViewImpl.java Mon Oct 12 13:54:14 2015
@@ -28,20 +28,17 @@ import org.apache.sling.discovery.Cluste
 import org.apache.sling.discovery.InstanceDescription;
 import org.apache.sling.discovery.InstanceFilter;
 import org.apache.sling.discovery.TopologyEvent.Type;
-import org.apache.sling.discovery.TopologyView;
+import org.apache.sling.discovery.commons.providers.BaseTopologyView;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  * Implementation of the topology view
  */
-public class TopologyViewImpl implements TopologyView {
+public class TopologyViewImpl extends BaseTopologyView {
 
     private final Logger logger = LoggerFactory.getLogger(this.getClass());
 
-    /** Whether or not this topology is considered 'current' / ie currently valid **/
-    private volatile boolean current = true;
-
     /** the instances that are part of this topology **/
     private final Set<InstanceDescription> instances = new HashSet<InstanceDescription>();
 
@@ -116,7 +113,7 @@ public class TopologyViewImpl implements
             return false;
         }
         TopologyViewImpl other = (TopologyViewImpl) obj;
-        if (this.current != other.current) {
+        if (this.isCurrent() != other.isCurrent()) {
             return false;
         }
         Type diff = compareTopology(other);
@@ -135,20 +132,6 @@ public class TopologyViewImpl implements
     }
 
     /**
-     * @see org.apache.sling.discovery.TopologyView#isCurrent()
-     */
-    public boolean isCurrent() {
-        return current;
-    }
-
-    /**
-     * Mark this topology as old
-     */
-    public void markOld() {
-        this.current = false;
-    }
-
-    /**
      * @see org.apache.sling.discovery.TopologyView#getLocalInstance()
      */
     public InstanceDescription getLocalInstance() {
@@ -234,7 +217,12 @@ public class TopologyViewImpl implements
 
     @Override
     public String toString() {
-        return "TopologyViewImpl [current=" + current + ", super.hashCode=" + super.hashCode() +
-                ", instances=" + instances + "]";
+        return "TopologyViewImpl [current=" + isCurrent() + ", num=" + instances.size() + ", instances="
+                + instances + "]";
+    }
+
+    @Override
+    public String getLocalClusterSyncTokenId() {
+        throw new IllegalStateException("no syncToken applicable");
     }
 }

Modified: sling/trunk/bundles/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/cluster/ClusterTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/cluster/ClusterTest.java?rev=1708119&r1=1708118&r2=1708119&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/cluster/ClusterTest.java (original)
+++ sling/trunk/bundles/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/cluster/ClusterTest.java Mon Oct 12 13:54:14 2015
@@ -97,7 +97,7 @@ public class ClusterTest {
     public void setup() throws Exception {
         final org.apache.log4j.Logger discoveryLogger = LogManager.getRootLogger().getLogger("org.apache.sling.discovery");
         logLevel = discoveryLogger.getLevel();
-        discoveryLogger.setLevel(Level.DEBUG);
+        discoveryLogger.setLevel(Level.TRACE);
         logger.debug("here we are");
         instance1 = Instance.newStandaloneInstance("firstInstance", true);
         instance2 = Instance.newClusterInstance("secondInstance", instance1,
@@ -1355,12 +1355,15 @@ public class ClusterTest {
         // let the instance1 become alone, instance2 is idle
         instance1.getConfig().setHeartbeatTimeout(2);
         instance2.getConfig().setHeartbeatTimeout(2);
+        logger.info("testLongRunningListener : letting instance2 remain silent from now on");
         instance1.runHeartbeatOnce();
         Thread.sleep(1500);
         instance1.runHeartbeatOnce();
         Thread.sleep(1500);
         instance1.runHeartbeatOnce();
         Thread.sleep(1500);
+        instance1.runHeartbeatOnce();
+        logger.info("testLongRunningListener : instance 2 should now be considered dead");
 //        instance1.dumpRepo();
         
         LongRunningListener longRunningListener1 = new LongRunningListener();
@@ -1368,11 +1371,14 @@ public class ClusterTest {
         fastListener2.addExpected(Type.TOPOLOGY_INIT);
         longRunningListener1.assertNoFail();
         assertEquals(1, fastListener2.getRemainingExpectedCount());
+        logger.info("testLongRunningListener : binding longRunningListener1 ...");
         instance1.bindTopologyEventListener(longRunningListener1);
+        logger.info("testLongRunningListener : binding fastListener2 ...");
         instance1.bindTopologyEventListener(fastListener2);
-        Thread.sleep(500); // SLING-4755: async event sending requires some minimal wait time nowadays
-        assertTrue(longRunningListener1.initReceived);
+        logger.info("testLongRunningListener : waiting a bit for longRunningListener1 to receive the TOPOLOGY_INIT event");
+        Thread.sleep(2500); // SLING-4755: async event sending requires some minimal wait time nowadays
         assertEquals(0, fastListener2.getRemainingExpectedCount());
+        assertTrue(longRunningListener1.initReceived);
         
         // after INIT, now do an actual change where listener1 will do a long-running handling
         fastListener2.addExpected(Type.TOPOLOGY_CHANGING);

Modified: sling/trunk/bundles/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/cluster/SingleInstanceTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/cluster/SingleInstanceTest.java?rev=1708119&r1=1708118&r2=1708119&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/cluster/SingleInstanceTest.java (original)
+++ sling/trunk/bundles/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/cluster/SingleInstanceTest.java Mon Oct 12 13:54:14 2015
@@ -194,6 +194,7 @@ public class SingleInstanceTest {
 
         AssertingTopologyEventListener assertingTopologyEventListener = new AssertingTopologyEventListener();
         assertingTopologyEventListener.addExpected(Type.TOPOLOGY_INIT);
+        logger.info("testTopologyEventListeners: binding the event listener");
         instance.bindTopologyEventListener(assertingTopologyEventListener);
         Thread.sleep(500); // SLING-4755: async event sending requires some minimal wait time nowadays
         assertEquals(0, assertingTopologyEventListener.getRemainingExpectedCount());
@@ -210,6 +211,8 @@ public class SingleInstanceTest {
         instance.bindPropertyProvider(pp, propertyName);
         logger.info("testTopologyEventListeners: 3rd sleep 1.5s");
         Thread.sleep(1500);
+        logger.info("testTopologyEventListeners: dumping due to failure: ");
+        assertingTopologyEventListener.dump();
         assertEquals(0, assertingTopologyEventListener.getRemainingExpectedCount());
         // we can only assume that the getProperty was called at least once - it
         // could be called multiple times though..
@@ -226,14 +229,14 @@ public class SingleInstanceTest {
         logger.info("testTopologyEventListeners: 4th sleep 2s");
         Thread.sleep(2000);
         assertEquals(0, assertingTopologyEventListener.getRemainingExpectedCount());
-        assertEquals(2, pp.getGetCnt());
+        assertEquals(1, pp.getGetCnt());
 
         // a heartbeat repeat should not result in another call though
         instance.runHeartbeatOnce();
         logger.info("testTopologyEventListeners: 5th sleep 2s");
         Thread.sleep(2000);
         assertEquals(0, assertingTopologyEventListener.getRemainingExpectedCount());
-        assertEquals(3, pp.getGetCnt());
+        assertEquals(2, pp.getGetCnt());
         logger.info("testTopologyEventListeners: done");
     }
 
@@ -268,6 +271,7 @@ public class SingleInstanceTest {
         instance.runHeartbeatOnce();
         Thread.sleep(1000);
         instance.dumpRepo();
+        ada.dump();
         assertEquals(0, ada.getUnexpectedCount());
         assertEquals(1, ada.getEvents().size());
         TopologyEvent initEvent = ada.getEvents().remove(0);

Modified: sling/trunk/bundles/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/cluster/TopologyEventTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/cluster/TopologyEventTest.java?rev=1708119&r1=1708118&r2=1708119&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/cluster/TopologyEventTest.java (original)
+++ sling/trunk/bundles/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/cluster/TopologyEventTest.java Mon Oct 12 13:54:14 2015
@@ -138,16 +138,6 @@ public class TopologyEventTest {
         assertEquals(0, l1Two.getRemainingExpectedCount()); // the expected one
         assertEquals(0, l1Two.getUnexpectedCount());
         
-        // one heartbeat doesn't change the history yet
-        logger.info("testDelayedInitEvent: an additional heartbeat shouldn't trigger any event for now");
-        instance1.runHeartbeatOnce();
-        instance2.runHeartbeatOnce();
-        assertEquals(1, l1.getEvents().size()); // one event
-        assertEquals(0, l1.getUnexpectedCount());
-        assertEquals(0, l2.getEvents().size());
-        assertEquals(0, l2.getUnexpectedCount());
-        assertEquals(1, l1Two.getEvents().size());
-        assertEquals(0, l1Two.getUnexpectedCount());
         
         // the second & third heartbeat though triggers the voting etc
         logger.info("testDelayedInitEvent: two more heartbeats should trigger events");
@@ -158,6 +148,9 @@ public class TopologyEventTest {
         instance1.runHeartbeatOnce();
         instance2.runHeartbeatOnce();
         Thread.sleep(500);
+        instance1.runHeartbeatOnce();
+        instance2.runHeartbeatOnce();
+        Thread.sleep(500);
         instance1.runHeartbeatOnce();
         instance2.runHeartbeatOnce();
         logger.info("testDelayedInitEvent: instance1: "+instance1.slingId);

Modified: sling/trunk/bundles/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/cluster/helpers/AssertingTopologyEventListener.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/cluster/helpers/AssertingTopologyEventListener.java?rev=1708119&r1=1708118&r2=1708119&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/cluster/helpers/AssertingTopologyEventListener.java (original)
+++ sling/trunk/bundles/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/cluster/helpers/AssertingTopologyEventListener.java Mon Oct 12 13:54:14 2015
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertTru
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertNotNull;
 
+import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 
@@ -47,12 +48,9 @@ public class AssertingTopologyEventListe
         this.debugInfo = debugInfo;
     }
     
-    public void fail(String errorMsg) {
-        this.errorMsg = errorMsg;
-    }
-    
-    public String getErrorMsg() {
-        return errorMsg;
+    @Override
+    public String toString() {
+        return super.toString()+"-[debugInfo="+debugInfo+"]";
     }
     
     private List<TopologyEvent> events_ = new LinkedList<TopologyEvent>();
@@ -147,4 +145,16 @@ public class AssertingTopologyEventListe
     public int getUnexpectedCount() {
         return unexpectedEvents_.size();
     }
+
+    public void dump() {
+        StringBuffer ue = new StringBuffer();
+        if (unexpectedEvents_.size()>0) {
+            for (Iterator<TopologyEvent> it = unexpectedEvents_.iterator(); it.hasNext();) {
+                TopologyEvent topologyEvent = it.next();
+                ue.append(topologyEvent+", ");
+            }
+            unexpectedEvents_.iterator();
+        }
+        logger.info("dump: got "+events_.size()+" events, "+unexpectedEvents_.size()+" (details: "+ue+") thereof unexpected. My list of expected events contains "+expectedEvents.size());
+    }
 }
\ No newline at end of file

Modified: sling/trunk/bundles/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/common/heartbeat/HeartbeatTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/common/heartbeat/HeartbeatTest.java?rev=1708119&r1=1708118&r2=1708119&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/common/heartbeat/HeartbeatTest.java (original)
+++ sling/trunk/bundles/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/common/heartbeat/HeartbeatTest.java Mon Oct 12 13:54:14 2015
@@ -30,6 +30,7 @@ import java.util.Calendar;
 import java.util.Date;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
@@ -55,12 +56,16 @@ import org.apache.sling.discovery.impl.s
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class HeartbeatTest {
     
+    private final Logger logger = LoggerFactory.getLogger(this.getClass());
+
     class SimpleTopologyEventListener implements TopologyEventListener {
 
+        private List<TopologyEvent> events = new LinkedList<TopologyEvent>();
         private TopologyEvent lastEvent;
         private int eventCount;
         private final String name;
@@ -71,6 +76,7 @@ public class HeartbeatTest {
         
         @Override
         public void handleTopologyEvent(TopologyEvent event) {
+            events.add(event);
             String msg = event.toString();
             TopologyView newView = event.getNewView();
             switch(event.getType()) {
@@ -119,7 +125,7 @@ public class HeartbeatTest {
     public void setup() throws Exception {
         final org.apache.log4j.Logger discoveryLogger = LogManager.getRootLogger().getLogger("org.apache.sling.discovery");
         logLevel = discoveryLogger.getLevel();
-        discoveryLogger.setLevel(Level.DEBUG);
+        discoveryLogger.setLevel(Level.TRACE);
     }
     
     @After
@@ -147,6 +153,7 @@ public class HeartbeatTest {
     }
     
     public void doTestPartitioning(boolean scheduler) throws Throwable {
+        logger.info("doTestPartitioning: creating slowMachine...");
         Instance slowMachine = Instance.newStandaloneInstance("/var/discovery/impl/", "slow", true, 10 /*10sec timeout*/, 
                 999 /* 999sec interval: to disable it*/, 0, UUID.randomUUID().toString());
         assertEquals(1, slowMachine.getDiscoveryService().getTopology().getInstances().size());
@@ -155,6 +162,8 @@ public class HeartbeatTest {
         Thread.sleep(10); // wait 10ms to ensure 'slowMachine' has the lowerst leaderElectionId (to become leader)
         SimpleTopologyEventListener slowListener = new SimpleTopologyEventListener("slow");
         slowMachine.bindTopologyEventListener(slowListener);
+        
+        logger.info("doTestPartitioning: creating fastMachine1...");
         Instance fastMachine1 = Instance.newClusterInstance("/var/discovery/impl/", "fast1", slowMachine, false, 10, 1, 0);
         assertEquals(1, fastMachine1.getDiscoveryService().getTopology().getInstances().size());
         assertEquals(fastMachine1.getSlingId(), fastMachine1.getDiscoveryService().getTopology().getInstances().iterator().next().getSlingId());
@@ -162,18 +171,24 @@ public class HeartbeatTest {
         Thread.sleep(10); // wait 10ms to ensure 'fastMachine1' has the 2nd lowerst leaderElectionId (to become leader during partitioning)
         SimpleTopologyEventListener fastListener1 = new SimpleTopologyEventListener("fast1");
         fastMachine1.bindTopologyEventListener(fastListener1);
+
+        logger.info("doTestPartitioning: creating fastMachine2...");
         Instance fastMachine2 = Instance.newClusterInstance("/var/discovery/impl/", "fast2", slowMachine, false, 10, 1, 0);
         assertEquals(1, fastMachine2.getDiscoveryService().getTopology().getInstances().size());
         assertEquals(fastMachine2.getSlingId(), fastMachine2.getDiscoveryService().getTopology().getInstances().iterator().next().getSlingId());
         instances.add(fastMachine2);
         SimpleTopologyEventListener fastListener2 = new SimpleTopologyEventListener("fast2");
         fastMachine2.bindTopologyEventListener(fastListener2);
+
+        logger.info("doTestPartitioning: creating fastMachine3...");
         Instance fastMachine3 = Instance.newClusterInstance("/var/discovery/impl/", "fast3", slowMachine, false, 10, 1, 0);
         assertEquals(1, fastMachine3.getDiscoveryService().getTopology().getInstances().size());
         assertEquals(fastMachine3.getSlingId(), fastMachine3.getDiscoveryService().getTopology().getInstances().iterator().next().getSlingId());
         instances.add(fastMachine3);
         SimpleTopologyEventListener fastListener3 = new SimpleTopologyEventListener("fast3");
         fastMachine3.bindTopologyEventListener(fastListener3);
+
+        logger.info("doTestPartitioning: creating fastMachine4...");
         Instance fastMachine4 = Instance.newClusterInstance("/var/discovery/impl/", "fast4", slowMachine, false, 10, 1, 0);
         assertEquals(1, fastMachine4.getDiscoveryService().getTopology().getInstances().size());
         assertEquals(fastMachine4.getSlingId(), fastMachine4.getDiscoveryService().getTopology().getInstances().iterator().next().getSlingId());
@@ -181,6 +196,7 @@ public class HeartbeatTest {
         SimpleTopologyEventListener fastListener4 = new SimpleTopologyEventListener("fast4");
         fastMachine4.bindTopologyEventListener(fastListener4);
         
+        logger.info("doTestPartitioning: letting heartbeats be sent by all instances for a few loops...");
         HeartbeatHandler hhSlow = slowMachine.getHeartbeatHandler();
         for(int i=0; i<3; i++) {
             hhSlow.issueHeartbeat();
@@ -199,6 +215,7 @@ public class HeartbeatTest {
         }
         
         // at this stage the 4 fast plus the slow instance should all see each other
+        logger.info("doTestPartitioning: all 4 instances should have agreed on seeing each other");
         assertNotNull(fastListener1.getLastEvent());
         assertEquals(TopologyEvent.Type.TOPOLOGY_INIT, fastListener1.getLastEvent().getType());
         assertEquals(5, fastListener1.getLastEvent().getNewView().getInstances().size());
@@ -221,6 +238,7 @@ public class HeartbeatTest {
         assertTrue(slowListener.getLastEvent().getNewView().getLocalInstance().isLeader());
         
         // after 12sec the slow instance' heartbeat should have timed out
+        logger.info("doTestPartitioning: letting slowMachine NOT send any heartbeats for 12sec, only the fast ones do...");
         for(int i=0; i<12; i++) {
             if (!scheduler) {
                 fastMachine1.getHeartbeatHandler().issueHeartbeat();
@@ -234,10 +252,13 @@ public class HeartbeatTest {
             }
             Thread.sleep(1000);
         }
+        logger.info("doTestPartitioning: this should now have decoupled slowMachine from the other 4...");
         
         // so the fast listeners should only see 4 instances remaining
         for(int i=0; i<7; i++) {
+            logger.info("doTestPartitioning: sleeping 2sec...");
             Thread.sleep(2000);
+            logger.info("doTestPartitioning: the 4 fast machines should all just see themselves...");
             assertEquals(TopologyEvent.Type.TOPOLOGY_CHANGED, fastListener1.getLastEvent().getType());
             assertEquals(4, fastListener1.getLastEvent().getNewView().getInstances().size());
             assertEquals(TopologyEvent.Type.TOPOLOGY_CHANGED, fastListener2.getLastEvent().getType());
@@ -254,7 +275,8 @@ public class HeartbeatTest {
 
             // and the slow instance should be isolated
             assertFalse(slowMachine.getDiscoveryService().getTopology().isCurrent());
-            assertEquals(5, slowMachine.getDiscoveryService().getTopology().getInstances().size());
+            // however we can't really make any assertions on how many instances a non-current topology contains...
+            // assertEquals(5, slowMachine.getDiscoveryService().getTopology().getInstances().size());
             if (i==0) {
                 assertEquals(TopologyEvent.Type.TOPOLOGY_INIT, slowListener.getLastEvent().getType());
             } else {
@@ -272,7 +294,8 @@ public class HeartbeatTest {
             TopologyView slowTopo = slowMachine.getDiscoveryService().getTopology();
             assertNotNull(slowTopo);
             assertFalse(slowTopo.isCurrent());
-            assertEquals(5, slowTopo.getInstances().size());
+            // again, can't make any assertion on how many instances the slow non-current view contains...
+            //assertEquals(5, slowTopo.getInstances().size());
             if (!scheduler) {
                 fastMachine1.getHeartbeatHandler().issueHeartbeat();
                 fastMachine1.getHeartbeatHandler().checkView();
@@ -341,11 +364,13 @@ public class HeartbeatTest {
     }
 
     public void doTestSlowAndFastMachine() throws Throwable {
+        logger.info("doTestSlowAndFastMachine: creating slowMachine... (w/o heartbeat runner)");
         Instance slowMachine = Instance.newStandaloneInstance("/var/discovery/impl/", "slow", true, 5 /*5sec timeout*/, 
                 999 /* 999sec interval: to disable it*/, 0, UUID.randomUUID().toString());
         instances.add(slowMachine);
         SimpleTopologyEventListener slowListener = new SimpleTopologyEventListener("slow");
         slowMachine.bindTopologyEventListener(slowListener);
+        logger.info("doTestSlowAndFastMachine: creating fastMachine... (w/o heartbeat runner)");
         Instance fastMachine = Instance.newClusterInstance("/var/discovery/impl/", "fast", slowMachine, false, 5, 999, 0);
         instances.add(fastMachine);
         SimpleTopologyEventListener fastListener = new SimpleTopologyEventListener("fast");
@@ -354,12 +379,14 @@ public class HeartbeatTest {
         HeartbeatHandler hhFast = fastMachine.getHeartbeatHandler();
         
         Thread.sleep(1000);
+        logger.info("doTestSlowAndFastMachine: no event should have been triggered yet");
         assertFalse(fastMachine.getDiscoveryService().getTopology().isCurrent());
         assertFalse(slowMachine.getDiscoveryService().getTopology().isCurrent());
         assertNull(fastListener.getLastEvent());
         assertNull(slowListener.getLastEvent());
 
         // make few rounds of heartbeats so that the two instances see each other
+        logger.info("doTestSlowAndFastMachine: send a couple of heartbeats to connect the two..");
         for(int i=0; i<5; i++) {
             hhSlow.issueHeartbeat();
             hhSlow.checkView();
@@ -367,6 +394,7 @@ public class HeartbeatTest {
             hhFast.checkView();
             Thread.sleep(100);
         }
+        logger.info("doTestSlowAndFastMachine: now the two instances should be connected.");
         slowMachine.dumpRepo();
         
         assertEquals(2, slowMachine.getDiscoveryService().getTopology().getInstances().size());
@@ -377,12 +405,15 @@ public class HeartbeatTest {
         assertEquals(1, slowListener.getEventCount());
         
         // now let the slow machine be slow while the fast one updates as expected
+        logger.info("doTestSlowAndFastMachine: last heartbeat of slowMachine.");
         hhSlow.issueHeartbeat();
+        logger.info("doTestSlowAndFastMachine: while the fastMachine still sends heartbeats...");
         for(int i=0; i<6; i++) {
             Thread.sleep(1500);
             hhFast.issueHeartbeat();
             hhFast.checkView();
         }
+        logger.info("doTestSlowAndFastMachine: now the fastMachine should have decoupled the slow one");
         fastMachine.dumpRepo();
         hhFast.checkView(); // one more for the start of the vote
         fastMachine.dumpRepo();
@@ -398,8 +429,10 @@ public class HeartbeatTest {
         assertFalse(topo.isCurrent());
         
         // after those 6 sec, hhSlow does the check (6sec between heartbeat and check)
+        logger.info("doTestSlowAndFastMachine: slowMachine is going to do a checkView next - and will detect being decoupled");
         hhSlow.checkView();
         slowMachine.dumpRepo();
+        logger.info("doTestSlowAndFastMachine: slowMachine is going to also do a heartbeat next");
         hhSlow.issueHeartbeat();
         assertEquals(TopologyEvent.Type.TOPOLOGY_CHANGED, fastListener.getLastEvent().getType());
         assertEquals(3, fastListener.getEventCount());
@@ -407,6 +440,7 @@ public class HeartbeatTest {
         assertEquals(2, slowListener.getEventCount());
         Thread.sleep(8000);
         // even after 8 sec the slow lsitener did not send a TOPOLOGY_CHANGED yet
+        logger.info("doTestSlowAndFastMachine: after another 8 sec of silence from slowMachine, it should still remain in CHANGING state");
         assertEquals(TopologyEvent.Type.TOPOLOGY_CHANGING, slowListener.getLastEvent().getType());
         assertFalse(slowMachine.getDiscoveryService().getTopology().isCurrent());
         assertEquals(2, slowListener.getEventCount());
@@ -415,6 +449,7 @@ public class HeartbeatTest {
         assertEquals(3, fastListener.getEventCount());
         
         // make few rounds of heartbeats so that the two instances see each other again
+        logger.info("doTestSlowAndFastMachine: now let both fast and slow issue heartbeats...");
         for(int i=0; i<4; i++) {
             hhFast.issueHeartbeat();
             hhFast.checkView();
@@ -422,6 +457,7 @@ public class HeartbeatTest {
             hhSlow.checkView();
             Thread.sleep(1000);
         }
+        logger.info("doTestSlowAndFastMachine: by now the two should have joined");
         
         // this should have put the two together again
         // even after 8 sec the slow lsitener did not send a TOPOLOGY_CHANGED yet
@@ -489,15 +525,20 @@ public class HeartbeatTest {
      */
     @Test
     public void testVotingLoop() throws Throwable {
+        logger.info("testVotingLoop: creating slowMachine1...");
         Instance slowMachine1 = Instance.newStandaloneInstance("/var/discovery/impl/", "slow1", true, 600 /*600sec timeout*/, 
                 999 /* 999sec interval: to disable it*/, 0, UUID.randomUUID().toString());
         instances.add(slowMachine1);
         SimpleTopologyEventListener slowListener1 = new SimpleTopologyEventListener("slow1");
         slowMachine1.bindTopologyEventListener(slowListener1);
+        
+        logger.info("testVotingLoop: creating slowMachine2...");
         Instance slowMachine2 = Instance.newClusterInstance("/var/discovery/impl/", "slow2", slowMachine1, false, 600, 999, 0);
         instances.add(slowMachine2);
         SimpleTopologyEventListener slowListener2 = new SimpleTopologyEventListener("slow2");
         slowMachine2.bindTopologyEventListener(slowListener2);
+        
+        logger.info("testVotingLoop: creating fastMachine...");
         Instance fastMachine = Instance.newClusterInstance("/var/discovery/impl/", "fast", slowMachine1, false, 600, 999, 0);
         instances.add(fastMachine);
         SimpleTopologyEventListener fastListener = new SimpleTopologyEventListener("fast");
@@ -507,6 +548,7 @@ public class HeartbeatTest {
         HeartbeatHandler hhFast = fastMachine.getHeartbeatHandler();
         
         Thread.sleep(1000);
+        logger.info("testVotingLoop: after some initial 1sec sleep no event should yet have been sent");
         assertFalse(fastMachine.getDiscoveryService().getTopology().isCurrent());
         assertFalse(slowMachine1.getDiscoveryService().getTopology().isCurrent());
         assertFalse(slowMachine2.getDiscoveryService().getTopology().isCurrent());
@@ -515,9 +557,11 @@ public class HeartbeatTest {
         assertNull(slowListener2.getLastEvent());
 
         // prevent the slow machine from voting
+        logger.info("testVotingLoop: stopping voting of slowMachine1...");
         slowMachine1.stopVoting();
         
         // now let all issue a heartbeat
+        logger.info("testVotingLoop: letting slow1, slow2 and fast all issue 1 heartbeat");
         hhSlow1.issueHeartbeat();
         hhSlow2.issueHeartbeat();
         hhFast.issueHeartbeat();
@@ -525,11 +569,14 @@ public class HeartbeatTest {
         // now let the fast one start a new voting, to which
         // only the fast one will vote, the slow one doesn't.
         // that will cause a voting loop
+        logger.info("testVotingLoop: let the fast one do a checkView, thus initiate a voting");
         hhFast.checkView();
         
         Calendar previousVotedAt = null;
         for(int i=0; i<5; i++) {
+            logger.info("testVotingLoop: sleeping 1sec...");
             Thread.sleep(1000);
+            logger.info("testVotingLoop: check to see that there is no voting loop...");
             // now check the ongoing votings
             ResourceResolverFactory factory = fastMachine.getResourceResolverFactory();
             ResourceResolver resourceResolver = factory