You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by st...@apache.org on 2015/10/12 15:54:15 UTC
svn commit: r1708119 - in /sling/trunk/bundles/extensions/discovery/impl: ./
src/main/java/org/apache/sling/discovery/impl/
src/main/java/org/apache/sling/discovery/impl/common/heartbeat/
src/main/java/org/apache/sling/discovery/impl/topology/ src/test...
Author: stefanegli
Date: Mon Oct 12 13:54:14 2015
New Revision: 1708119
URL: http://svn.apache.org/viewvc?rev=1708119&view=rev
Log:
SLING-5094 : refactoring DiscoveryServiceImpl to use ViewStateManager of discovery.commons
Modified:
sling/trunk/bundles/extensions/discovery/impl/pom.xml
sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/DiscoveryServiceImpl.java
sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/common/heartbeat/HeartbeatHandler.java
sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/topology/TopologyChangeHandler.java
sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/topology/TopologyViewImpl.java
sling/trunk/bundles/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/cluster/ClusterTest.java
sling/trunk/bundles/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/cluster/SingleInstanceTest.java
sling/trunk/bundles/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/cluster/TopologyEventTest.java
sling/trunk/bundles/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/cluster/helpers/AssertingTopologyEventListener.java
sling/trunk/bundles/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/common/heartbeat/HeartbeatTest.java
Modified: sling/trunk/bundles/extensions/discovery/impl/pom.xml
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/discovery/impl/pom.xml?rev=1708119&r1=1708118&r2=1708119&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/discovery/impl/pom.xml (original)
+++ sling/trunk/bundles/extensions/discovery/impl/pom.xml Mon Oct 12 13:54:14 2015
@@ -128,6 +128,12 @@
</dependency>
<dependency>
<groupId>org.apache.sling</groupId>
+ <artifactId>org.apache.sling.discovery.commons</artifactId>
+ <version>1.0.0-SNAPSHOT</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.sling</groupId>
<artifactId>org.apache.sling.api</artifactId>
<version>2.4.0</version>
<scope>provided</scope>
Modified: sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/DiscoveryServiceImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/DiscoveryServiceImpl.java?rev=1708119&r1=1708118&r2=1708119&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/DiscoveryServiceImpl.java (original)
+++ sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/DiscoveryServiceImpl.java Mon Oct 12 13:54:14 2015
@@ -20,20 +20,18 @@ package org.apache.sling.discovery.impl;
import java.net.URL;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
-import java.util.Date;
import java.util.Dictionary;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Hashtable;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.locks.ReentrantLock;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
@@ -53,10 +51,12 @@ import org.apache.sling.discovery.Cluste
import org.apache.sling.discovery.DiscoveryService;
import org.apache.sling.discovery.InstanceDescription;
import org.apache.sling.discovery.PropertyProvider;
-import org.apache.sling.discovery.TopologyEvent;
-import org.apache.sling.discovery.TopologyEvent.Type;
import org.apache.sling.discovery.TopologyEventListener;
import org.apache.sling.discovery.TopologyView;
+import org.apache.sling.discovery.commons.providers.BaseTopologyView;
+import org.apache.sling.discovery.commons.providers.ViewStateManager;
+import org.apache.sling.discovery.commons.providers.impl.ViewStateManagerFactory;
+import org.apache.sling.discovery.commons.providers.spi.ConsistencyService;
import org.apache.sling.discovery.impl.cluster.ClusterViewService;
import org.apache.sling.discovery.impl.cluster.UndefinedClusterViewException;
import org.apache.sling.discovery.impl.cluster.UndefinedClusterViewException.Reason;
@@ -85,157 +85,11 @@ public class DiscoveryServiceImpl implem
private final static Logger logger = LoggerFactory.getLogger(DiscoveryServiceImpl.class);
- /** SLING-4755 : encapsulates an event that yet has to be sent (asynchronously) for a particular listener **/
- private final static class AsyncEvent {
- private final TopologyEventListener listener;
- private final TopologyEvent event;
- AsyncEvent(TopologyEventListener listener, TopologyEvent event) {
- if (listener==null) {
- throw new IllegalArgumentException("listener must not be null");
- }
- if (event==null) {
- throw new IllegalArgumentException("event must not be null");
- }
- this.listener = listener;
- this.event = event;
- }
- @Override
- public String toString() {
- return "an AsyncEvent[event="+event+", listener="+listener+"]";
- }
- }
-
- /**
- * SLING-4755 : background runnable that takes care of asynchronously sending events.
- * <p>
- * API is: enqueue() puts a listener-event tuple onto the internal Q, which
- * is processed in a loop in run that does so (uninterruptably, even catching
- * Throwables to be 'very safe', but sleeps 5sec if an Error happens) until
- * flushThenStop() is called - which puts the sender in a state where any pending
- * events are still sent (flush) but then stops automatically. The argument of
- * using flush before stop is that the event was originally meant to be sent
- * before the bundle was stopped - thus just because the bundle is stopped
- * doesn't undo the event and it still has to be sent. That obviously can
- * mean that listeners can receive a topology event after deactivate. But I
- * guess that was already the case before the change to become asynchronous.
- */
- private final static class AsyncEventSender implements Runnable {
-
- /** stopped is always false until flushThenStop is called **/
- private boolean stopped = false;
-
- /** eventQ contains all AsyncEvent objects that have yet to be sent - in order to be sent **/
- private final List<AsyncEvent> eventQ = new LinkedList<AsyncEvent>();
-
- /** Enqueues a particular event for asynchronous sending to a particular listener **/
- void enqueue(TopologyEventListener listener, TopologyEvent event) {
- final AsyncEvent asyncEvent = new AsyncEvent(listener, event);
- synchronized(eventQ) {
- eventQ.add(asyncEvent);
- if (logger.isDebugEnabled()) {
- logger.debug("enqueue: enqueued event {} for async sending (Q size: {})", asyncEvent, eventQ.size());
- }
- eventQ.notifyAll();
- }
- }
-
- /**
- * Stops the AsyncEventSender as soon as the queue is empty
- */
- void flushThenStop() {
- synchronized(eventQ) {
- logger.info("AsyncEventSender.flushThenStop: flushing (size: {}) & stopping...", eventQ.size());
- stopped = true;
- eventQ.notifyAll();
- }
- }
-
- /** Main worker loop that dequeues from the eventQ and calls sendTopologyEvent with each **/
- public void run() {
- logger.info("AsyncEventSender.run: started.");
- try{
- while(true) {
- try{
- final AsyncEvent asyncEvent;
- synchronized(eventQ) {
- while(!stopped && eventQ.isEmpty()) {
- try {
- eventQ.wait();
- } catch (InterruptedException e) {
- // issue a log debug but otherwise continue
- logger.debug("AsyncEventSender.run: interrupted while waiting for async events");
- }
- }
- if (stopped) {
- if (eventQ.isEmpty()) {
- // then we have flushed, so we can now finally stop
- logger.info("AsyncEventSender.run: flush finished. stopped.");
- return;
- } else {
- // otherwise the eventQ is not yet empty, so we are still in flush mode
- logger.info("AsyncEventSender.run: flushing another event. (pending {})", eventQ.size());
- }
- }
- asyncEvent = eventQ.remove(0);
- if (logger.isDebugEnabled()) {
- logger.debug("AsyncEventSender.run: dequeued event {}, remaining: {}", asyncEvent, eventQ.size());
- }
- }
- if (asyncEvent!=null) {
- sendTopologyEvent(asyncEvent);
- }
- } catch(Throwable th) {
- // Even though we should never catch Error or RuntimeException
- // here's the thinking about doing it anyway:
- // * in case of a RuntimeException that would be less dramatic
- // and catching it is less of an issue - we rather want
- // the background thread to be able to continue than
- // having it finished just because of a RuntimeException
- // * catching an Error is of course not so nice.
- // however, should we really give up this thread even in
- // case of an Error? It could be an OOM or some other
- // nasty one, for sure. But even if. Chances are that
- // other parts of the system would also get that Error
- // if it is very dramatic. If not, then catching it
- // sounds feasible.
- // My two cents..
- // the goal is to avoid quitting the AsyncEventSender thread
- logger.error("AsyncEventSender.run: Throwable occurred. Sleeping 5sec. Throwable: "+th, th);
- try {
- Thread.sleep(5000);
- } catch (InterruptedException e) {
- logger.warn("AsyncEventSender.run: interrupted while sleeping");
- }
- }
- }
- } finally {
- logger.info("AsyncEventSender.run: quits (finally).");
- }
- }
-
- /** Actual sending of the asynchronous event - catches RuntimeExceptions a listener can send. (Error is caught outside) **/
- private void sendTopologyEvent(AsyncEvent asyncEvent) {
- final TopologyEventListener listener = asyncEvent.listener;
- final TopologyEvent event = asyncEvent.event;
- logger.debug("sendTopologyEvent: start: listener: {}, event: {}", listener, event);
- try{
- listener.handleTopologyEvent(event);
- } catch(final Exception e) {
- logger.warn("sendTopologyEvent: handler threw exception. handler: "+listener+", exception: "+e, e);
- }
- logger.debug("sendTopologyEvent: start: listener: {}, event: {}", listener, event);
- }
-
- }
-
@Reference
private SlingSettingsService settingsService;
@Reference(cardinality = ReferenceCardinality.OPTIONAL_MULTIPLE, policy = ReferencePolicy.DYNAMIC, referenceInterface = TopologyEventListener.class)
private TopologyEventListener[] eventListeners = new TopologyEventListener[0];
-
- /** SLING-5030 : this map contains the event last sent to each listener to prevent duplicate CHANGING events when scheduler is broken**/
- private Map<TopologyEventListener,TopologyEvent.Type> lastEventMap = new HashMap<TopologyEventListener, TopologyEvent.Type>();
/**
* All property providers.
@@ -252,9 +106,6 @@ public class DiscoveryServiceImpl implem
**/
private boolean activated = false;
- /** SLING-3750 : set when activate() could not send INIT events due to being in isolated mode **/
- private boolean initEventDelayed = false;
-
@Reference
private ResourceResolverFactory resourceResolverFactory;
@@ -282,20 +133,24 @@ public class DiscoveryServiceImpl implem
/** the old view previously valid and sent to the TopologyEventListeners **/
private TopologyViewImpl oldView;
- /**
- * whether or not there is a delayed event sending pending.
- * Marked volatile to allow getTopology() to read this without need for
- * synchronized(lock) (which would be deadlock-prone). (introduced with SLING-4638).
- **/
- private volatile boolean delayedEventPending = false;
-
- /** used to continue functioning when scheduler is broken **/
- private volatile boolean delayedEventPendingFailed = false;
-
private ServiceRegistration mbeanRegistration;
- /** SLING-4755 : reference to the background AsyncEventSender. Started/stopped in activate/deactivate **/
- private AsyncEventSender asyncEventSender;
+ private ViewStateManager viewStateManager;
+
+ private ReentrantLock viewStateManagerLock;
+
+ public DiscoveryServiceImpl() {
+ viewStateManagerLock = new ReentrantLock();
+ final ConsistencyService consistencyService = new ConsistencyService() {
+
+ @Override
+ public void sync(BaseTopologyView view, Runnable callback) {
+ logger.debug("sync: no syncToken applicable");
+ callback.run();
+ }
+ };
+ viewStateManager = ViewStateManagerFactory.newViewStateManager(viewStateManagerLock, consistencyService);
+ }
protected void registerMBean(BundleContext bundleContext) {
if (this.mbeanRegistration!=null) {
@@ -323,6 +178,7 @@ public class DiscoveryServiceImpl implem
if (view==null) {
throw new IllegalArgumentException("view must not be null");
}
+ logger.debug("setOldView: oldView is now: {}", oldView);
oldView = view;
}
@@ -342,6 +198,10 @@ public class DiscoveryServiceImpl implem
slingId = settingsService.getSlingId();
+ if (config.getMinEventDelay()>0) {
+ viewStateManager.installMinEventDelayHandler(this, scheduler, config.getMinEventDelay());
+ }
+
final String isolatedClusterId = UUID.randomUUID().toString();
{
// create a pre-voting/isolated topologyView which would be used
@@ -357,11 +217,11 @@ public class DiscoveryServiceImpl implem
col.add(isolatedInstance);
final TopologyViewImpl topology = new TopologyViewImpl();
topology.addInstances(col);
- topology.markOld();
+ topology.setNotCurrent();
setOldView(topology);
}
setOldView((TopologyViewImpl) getTopology());
- oldView.markOld();
+ oldView.setNotCurrent();
// make sure the first heartbeat is issued as soon as possible - which
// is right after this service starts. since the two (discoveryservice
@@ -369,35 +229,25 @@ public class DiscoveryServiceImpl implem
// is passed on to the heartbeatHandler in this initialize call).
heartbeatHandler.initialize(this, isolatedClusterId);
- final TopologyEventListener[] registeredServices;
- synchronized (lock) {
- // SLING-4755 : start the asyncEventSender in the background
- // will be stopped in deactivate (at which point
- // all pending events will still be sent but no
- // new events can be enqueued)
- asyncEventSender = new AsyncEventSender();
- Thread th = new Thread(asyncEventSender);
- th.setName("Discovery-AsyncEventSender");
- th.setDaemon(true);
- th.start();
+ viewStateManagerLock.lock();
+ try{
+ viewStateManager.handleActivated();
- registeredServices = this.eventListeners;
doUpdateProperties();
TopologyViewImpl newView = (TopologyViewImpl) getTopology();
- if (!newView.isCurrent()) {
+ if (newView.isCurrent()) {
+ viewStateManager.handleNewView(newView);
+ } else {
// SLING-3750: just issue a log.info about the delaying
logger.info("activate: this instance is in isolated mode and must yet finish voting before it can send out TOPOLOGY_INIT.");
- initEventDelayed = true;
- } else {
- final TopologyEvent event = new TopologyEvent(Type.TOPOLOGY_INIT, null,
- newView);
- for (final TopologyEventListener da : registeredServices) {
- enqueueAsyncTopologyEvent(da, event);
- }
}
activated = true;
setOldView(newView);
+ } finally {
+ if (viewStateManagerLock!=null) {
+ viewStateManagerLock.unlock();
+ }
}
URL[] topologyConnectorURLs = config.getTopologyConnectorURLs();
@@ -420,40 +270,20 @@ public class DiscoveryServiceImpl implem
logger.debug("DiscoveryServiceImpl activated.");
}
- private void enqueueAsyncTopologyEvent(final TopologyEventListener da, final TopologyEvent event) {
- if (logger.isDebugEnabled()) {
- logger.debug("enqueueAsyncTopologyEvent: sending topologyEvent {}, to {}", event, da);
- }
- if (asyncEventSender==null) {
- // this should never happen - sendTopologyEvent should only be called
- // when activated
- logger.warn("enqueueAsyncTopologyEvent: asyncEventSender is null, cannot send event ({}, {})!", da, event);
- return;
- }
- if (lastEventMap.get(da)==event.getType() && event.getType()==Type.TOPOLOGY_CHANGING) {
- // don't sent TOPOLOGY_CHANGING twice
- logger.debug("enqueueAsyncTopologyEvent: listener already got TOPOLOGY_CHANGING: {}", da);
- return;
- }
- asyncEventSender.enqueue(da, event);
- lastEventMap.put(da, event.getType());
- if (logger.isDebugEnabled()) {
- logger.debug("enqueueAsyncTopologyEvent: sending topologyEvent {}, to {}", event, da);
- }
- }
-
/**
* Deactivate this service
*/
@Deactivate
protected void deactivate() {
logger.debug("DiscoveryServiceImpl deactivated.");
- synchronized (lock) {
+ viewStateManagerLock.lock();
+ try{
+ viewStateManager.handleDeactivated();
+
activated = false;
- if (asyncEventSender!=null) {
- // it should always be not-null though
- asyncEventSender.flushThenStop();
- asyncEventSender = null;
+ } finally {
+ if (viewStateManagerLock!=null) {
+ viewStateManagerLock.unlock();
}
}
try{
@@ -470,45 +300,14 @@ public class DiscoveryServiceImpl implem
* bind a topology event listener
*/
protected void bindTopologyEventListener(final TopologyEventListener eventListener) {
-
- logger.debug("bindTopologyEventListener: Binding TopologyEventListener {}",
- eventListener);
-
- synchronized (lock) {
- final List<TopologyEventListener> currentList = new ArrayList<TopologyEventListener>(
- Arrays.asList(eventListeners));
- currentList.add(eventListener);
- this.eventListeners = currentList
- .toArray(new TopologyEventListener[currentList.size()]);
- if (activated && !initEventDelayed) {
- final TopologyViewImpl topology = (TopologyViewImpl) getTopology();
- if (delayedEventPending) {
- // that means that for other TopologyEventListeners that were already bound
- // and in general: the topology is currently CHANGING
- // so we must reflect this with the isCurrent() flag (SLING-4638)
- topology.markOld();
- }
- enqueueAsyncTopologyEvent(eventListener, new TopologyEvent(
- Type.TOPOLOGY_INIT, null, topology));
- }
- }
+ viewStateManager.bind(eventListener);
}
/**
* Unbind a topology event listener
*/
protected void unbindTopologyEventListener(final TopologyEventListener eventListener) {
-
- logger.debug("unbindTopologyEventListener: Releasing TopologyEventListener {}",
- eventListener);
-
- synchronized (lock) {
- final List<TopologyEventListener> currentList = new ArrayList<TopologyEventListener>(
- Arrays.asList(eventListeners));
- currentList.remove(eventListener);
- this.eventListeners = currentList
- .toArray(new TopologyEventListener[currentList.size()]);
- }
+ viewStateManager.unbind(eventListener);
}
/**
@@ -688,7 +487,7 @@ public class DiscoveryServiceImpl implem
// update the announcements but just return
// the previous oldView marked as !current
logger.info("getTopology: undefined cluster view: "+e.getReason()+"] "+e);
- oldView.markOld();
+ oldView.setNotCurrent();
if (e.getReason()==Reason.ISOLATED_FROM_TOPOLOGY) {
if (heartbeatHandler!=null) {
// SLING-5030 part 2: when we detect being isolated we should
@@ -734,195 +533,6 @@ public class DiscoveryServiceImpl implem
}
/**
- * Internal handle method which checks if anything in the topology has
- * changed and informs the TopologyEventListeners if such a change occurred.
- * <p>
- * All changes should go through this method. This method keeps track of
- * oldView/newView as well.
- */
- private void handlePotentialTopologyChange() {
- if (!activated) {
- // ignore this call then - an early call to issue
- // a topologyevent before even activated
- logger.debug("handlePotentialTopologyChange: ignoring early change before activate finished.");
- return;
- }
- if (delayedEventPending && !delayedEventPendingFailed) {
- logger.debug("handlePotentialTopologyChange: ignoring potential change since a delayed event is pending.");
- return;
- }
- TopologyViewImpl newView = (TopologyViewImpl) getTopology();
- if (initEventDelayed) {
- // this means activate could not yet send a TOPOLOGY_INIT event
- // (which can happen frequently) - so we have to do this now
- // that we potentially have a valid view
- if (!newView.isCurrent()) {
- // we cannot proceed until we're out of the isolated mode..
- // SLING-4535 : while this has warning character, it happens very frequently,
- // eg also when binding a PropertyProvider (so normal processing)
- // hence lowering to info for now
- logger.info("handlePotentialTopologyChange: still in isolated mode - cannot send TOPOLOGY_INIT yet.");
- } else {
- logger.info("handlePotentialTopologyChange: new view is no longer isolated sending delayed TOPOLOGY_INIT now.");
- // SLING-4638: OK: newView is current==true as we're just coming out of initEventDelayed first time.
- enqueueForAll(Type.TOPOLOGY_INIT, null, newView);
- initEventDelayed = false;
- }
- return;
- }
-
- TopologyViewImpl oldView = this.oldView;
- Type difference;
- if (!newView.isCurrent()) {
- difference = Type.TOPOLOGY_CHANGING;
- } else {
- difference = newView.compareTopology(oldView);
- }
- if (difference == null) { // indicating: equals
- if (delayedEventPendingFailed) {
- // when the delayed event handling for some very odd reason could
- // not re-spawn itself (via runAfter) - in that case we now
- // have listeners in CHANGING state .. which we should wake up
- enqueueForAll(Type.TOPOLOGY_CHANGED, oldView, newView);
- delayedEventPendingFailed = false;
- delayedEventPending = false;
- } else {
- // then dont send any event then
- logger.debug("handlePotentialTopologyChange: identical views. not informing listeners");
- }
- return;
- } else if (difference == Type.PROPERTIES_CHANGED) {
- enqueueForAll(Type.PROPERTIES_CHANGED, oldView, newView);
- return;
- }
- delayedEventPendingFailed = false;
- delayedEventPending = false;
-
- // else: TOPOLOGY_CHANGING or CHANGED
- if (logger.isDebugEnabled()) {
- logger.debug("handlePotentialTopologyChange: difference: {}, oldView={}, newView={}",
- new Object[] {difference, oldView, newView});
- }
-
- // send a TOPOLOGY_CHANGING first
- logger.info("handlePotentialTopologyChange: sending "+Type.TOPOLOGY_CHANGING+
- " to all listeners (that have not gotten one yet) (oldView={}).", oldView);
- oldView.markOld();
- for (final TopologyEventListener da : eventListeners) {
- enqueueAsyncTopologyEvent(da, new TopologyEvent(Type.TOPOLOGY_CHANGING, oldView,
- null));
- }
-
- int minEventDelay = config.getMinEventDelay();
- if ((!newView.isCurrent()) && minEventDelay<=0) {
- // if newView is isolated
- // then we should not send a TOPOLOGY_CHANGED yet - but instead
- // wait until the view gets resolved. that is achieved by
- // going into event-delaying and retrying that way.
- // and if minEventDelay is not configured, then auto-switch
- // to a 1sec such minEventDelay:
- minEventDelay=1;
- }
-
- if (minEventDelay<=0) {
- // otherwise, send the TOPOLOGY_CHANGED now
- enqueueForAll(Type.TOPOLOGY_CHANGED, oldView, newView);
- return;
- }
-
- // then delay the sending of the next event
- logger.debug("handlePotentialTopologyChange: delaying event sending to avoid event flooding");
-
- if (runAfter(minEventDelay /*seconds*/ , new Runnable() {
-
- public void run() {
- logger.debug("handlePotentialTopologyChange: acquiring synchronized(lock)...");
- synchronized(lock) {
- logger.debug("handlePotentialTopologyChange: sending delayed event now");
- if (!activated) {
- delayedEventPending = false;
- delayedEventPendingFailed = false;
- logger.debug("handlePotentialTopologyChange: no longer activated. not sending delayed event");
- return;
- }
- final TopologyViewImpl newView = (TopologyViewImpl) getTopology();
- // irrespective of the difference, send the latest topology
- // via a topology_changed event (since we already sent a changing)
- if (!newView.isCurrent()) {
- // if the newView is isolated at this stage we have sent
- // TOPOLOGY_CHANGING to the listeners, and they are now waiting
- // for TOPOLOGY_CHANGED. But we can't send them that yet..
- // we must do a loop via the minEventDelay mechanism and log
- // accordingly
- if (runAfter(1/*sec*/, this)) {
- logger.warn("handlePotentialTopologyChange: local instance is isolated from topology. Waiting for rejoining...");
- return;
- }
- // otherwise we have to fall back to still sending a TOPOLOGY_CHANGED
- // but that's unexpected! (back to delayedEventPending=false..)
- delayedEventPendingFailed = true;
- logger.warn("handlePotentialTopologyChange: local instance is isolated from topology but failed to trigger delay-job");
- return;
- }
-
- enqueueForAll(Type.TOPOLOGY_CHANGED, DiscoveryServiceImpl.this.oldView, newView);
- delayedEventPending = false;
- delayedEventPendingFailed = false;
- }
- }
- })) {
- delayedEventPending = true;
- delayedEventPendingFailed = false;
- logger.debug("handlePotentialTopologyChange: delayed event triggering.");
- return;
- } else {
- logger.debug("handlePotentialTopologyChange: delaying event triggering did not work for some reason. "
- + "Will be retriggered lazily via later heartbeat.");
- delayedEventPending = true;
- delayedEventPendingFailed = true;
- return;
- }
- }
-
- private void enqueueForAll(Type eventType, TopologyViewImpl oldView, TopologyViewImpl newView) {
- if (oldView!=null) {
- oldView.markOld();
- }
- logger.info("enqueueForAll: sending "+eventType+" to all listeners (oldView={}, newView={}).", oldView, newView);
- for (final TopologyEventListener da : eventListeners) {
- enqueueAsyncTopologyEvent(da, new TopologyEvent(eventType, oldView, newView));
- }
- if (eventType!=Type.TOPOLOGY_CHANGING) {
- setOldView(newView);
- }
- if (heartbeatHandler!=null) {
- // trigger a heartbeat 'now' to pass it on to the topology asap
- heartbeatHandler.triggerHeartbeat();
- }
- }
-
- /**
- * run the runnable after the indicated number of seconds, once.
- * @return true if the scheduling of the runnable worked, false otherwise
- */
- private boolean runAfter(int seconds, final Runnable runnable) {
- final Scheduler theScheduler = scheduler;
- if (theScheduler == null) {
- logger.info("runAfter: no scheduler set");
- return false;
- }
- logger.debug("runAfter: trying with scheduler.fireJob");
- final Date date = new Date(System.currentTimeMillis() + seconds * 1000);
- try {
- theScheduler.fireJobAt(null, runnable, null, date);
- return true;
- } catch (Exception e) {
- logger.info("runAfter: could not schedule a job: "+e);
- return false;
- }
- }
-
- /**
* Internal class caching some provider infos like service id and ranking.
*/
private final static class ProviderInfo implements Comparable<ProviderInfo> {
@@ -1002,13 +612,27 @@ public class DiscoveryServiceImpl implem
/**
* Handle the fact that the topology has likely changed
*/
- public void handleTopologyChanged() {
- logger.debug("handleTopologyChanged: calling handlePotentialTopologyChange.");
- synchronized ( this.lock ) {
- handlePotentialTopologyChange();
+ public void handlePotentialTopologyChange() {
+ BaseTopologyView t = (BaseTopologyView) getTopology();
+ if (t.isCurrent()) {
+ // if we have a valid view, let the viewStateManager do the
+ // comparison and sending of an event, if necessary
+ viewStateManager.handleNewView(t);
+ } else {
+ // if we don't have a view, then we might have to send
+ // a CHANGING event, let that be decided by the viewStateManager as well
+ viewStateManager.handleChanging();
}
}
+ /**
+ * Handle the fact that the topology has started to change - inform the listeners asap
+ */
+ public void handleTopologyChanging() {
+ logger.debug("handleTopologyChanging: invoking viewStateManager.handlechanging");
+ viewStateManager.handleChanging();
+ }
+
/** SLING-2901 : send a TOPOLOGY_CHANGING event and shutdown the service thereafter **/
public void forcedShutdown() {
synchronized(lock) {
@@ -1022,11 +646,8 @@ public class DiscoveryServiceImpl implem
}
logger.error("forcedShutdown: sending TOPOLOGY_CHANGING to all listeners");
// SLING-4638: make sure the oldView is really marked as old:
- oldView.markOld();
- for (final TopologyEventListener da : eventListeners) {
- enqueueAsyncTopologyEvent(da, new TopologyEvent(Type.TOPOLOGY_CHANGING, oldView,
- null));
- }
+ oldView.setNotCurrent();
+ viewStateManager.handleChanging();
logger.error("forcedShutdown: deactivating DiscoveryService.");
// to make sure no further event is sent after this, flag this service as deactivated
activated = false;
Modified: sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/common/heartbeat/HeartbeatHandler.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/common/heartbeat/HeartbeatHandler.java?rev=1708119&r1=1708118&r2=1708119&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/common/heartbeat/HeartbeatHandler.java (original)
+++ sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/common/heartbeat/HeartbeatHandler.java Mon Oct 12 13:54:14 2015
@@ -280,6 +280,7 @@ public class HeartbeatHandler implements
// use 'fireJobAt' here, instead of 'fireJob' to make sure the job can always be triggered
// 'fireJob' checks for a job from the same job-class to already exist
// 'fireJobAt' though allows to pass a name for the job - which can be made unique, thus does not conflict/already-exist
+ logger.info("triggerHeartbeat: firing job to trigger heartbeat");
scheduler.fireJobAt(NAME+UUID.randomUUID(), this, null, new Date(System.currentTimeMillis()-1000 /* make sure it gets triggered immediately*/));
} catch (Exception e) {
logger.info("triggerHeartbeat: Could not trigger heartbeat: " + e);
@@ -396,6 +397,7 @@ public class HeartbeatHandler implements
// sling instance accessing the same repository (ie in the same cluster)
// using the same sling.id - hence writing to the same
// resource
+ discoveryService.handleTopologyChanging();
logger.error("issueClusterLocalHeartbeat: SLING-2892: Detected unexpected, concurrent update of: "+
myClusterNodePath+" 'lastHeartbeat'. If not done manually, " +
"this likely indicates that there is more than 1 instance running in this cluster" +
@@ -415,6 +417,7 @@ public class HeartbeatHandler implements
// someone deleted the resource property
firstHeartbeatWritten = -1;
} else if (!runtimeId.equals(readRuntimeId)) {
+ discoveryService.handleTopologyChanging();
final String slingHomePath = slingSettingsService==null ? "n/a" : slingSettingsService.getSlingHomePath();
final String endpointsAsString = getEndpointsAsString();
final String readEndpoints = resourceMap.get(PROPERTY_ID_ENDPOINTS, String.class);
@@ -575,6 +578,11 @@ public class HeartbeatHandler implements
if (winningVoting != null || (numOpenNonWinningVotes > 0)) {
// then there are votings pending and I shall wait for them to
// settle
+
+ // but first: make sure we sent the TOPOLOGY_CHANGING
+ logger.info("doCheckView: there are pending votings, marking topology as changing...");
+ discoveryService.handleTopologyChanging();
+
if (logger.isDebugEnabled()) {
logger.debug("doCheckView: "
+ numOpenNonWinningVotes
@@ -595,6 +603,11 @@ public class HeartbeatHandler implements
logger.debug("doCheckView: no pending nor winning votes. view is fine. we're all happy.");
return;
}
+
+ // immediately send a TOPOLOGY_CHANGING - could already be sent, but just to be sure
+ logger.info("doCheckView: no matching established view, marking topology as changing");
+ discoveryService.handleTopologyChanging();
+
if (logger.isDebugEnabled()) {
logger.debug("doCheckView: no pending nor winning votes. But: view does not match established or no established yet. Initiating a new voting");
Iterator<String> it = liveInstances.iterator();
Modified: sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/topology/TopologyChangeHandler.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/topology/TopologyChangeHandler.java?rev=1708119&r1=1708118&r2=1708119&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/topology/TopologyChangeHandler.java (original)
+++ sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/topology/TopologyChangeHandler.java Mon Oct 12 13:54:14 2015
@@ -131,7 +131,7 @@ public class TopologyChangeHandler imple
/** Inform the DiscoveryServiceImpl that the topology (might) have changed **/
private void handleTopologyChanged() {
- discoveryService.handleTopologyChanged();
+ discoveryService.handlePotentialTopologyChange();
}
}
\ No newline at end of file
Modified: sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/topology/TopologyViewImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/topology/TopologyViewImpl.java?rev=1708119&r1=1708118&r2=1708119&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/topology/TopologyViewImpl.java (original)
+++ sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/topology/TopologyViewImpl.java Mon Oct 12 13:54:14 2015
@@ -28,20 +28,17 @@ import org.apache.sling.discovery.Cluste
import org.apache.sling.discovery.InstanceDescription;
import org.apache.sling.discovery.InstanceFilter;
import org.apache.sling.discovery.TopologyEvent.Type;
-import org.apache.sling.discovery.TopologyView;
+import org.apache.sling.discovery.commons.providers.BaseTopologyView;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Implementation of the topology view
*/
-public class TopologyViewImpl implements TopologyView {
+public class TopologyViewImpl extends BaseTopologyView {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
- /** Whether or not this topology is considered 'current' / ie currently valid **/
- private volatile boolean current = true;
-
/** the instances that are part of this topology **/
private final Set<InstanceDescription> instances = new HashSet<InstanceDescription>();
@@ -116,7 +113,7 @@ public class TopologyViewImpl implements
return false;
}
TopologyViewImpl other = (TopologyViewImpl) obj;
- if (this.current != other.current) {
+ if (this.isCurrent() != other.isCurrent()) {
return false;
}
Type diff = compareTopology(other);
@@ -135,20 +132,6 @@ public class TopologyViewImpl implements
}
/**
- * @see org.apache.sling.discovery.TopologyView#isCurrent()
- */
- public boolean isCurrent() {
- return current;
- }
-
- /**
- * Mark this topology as old
- */
- public void markOld() {
- this.current = false;
- }
-
- /**
* @see org.apache.sling.discovery.TopologyView#getLocalInstance()
*/
public InstanceDescription getLocalInstance() {
@@ -234,7 +217,12 @@ public class TopologyViewImpl implements
@Override
public String toString() {
- return "TopologyViewImpl [current=" + current + ", super.hashCode=" + super.hashCode() +
- ", instances=" + instances + "]";
+ return "TopologyViewImpl [current=" + isCurrent() + ", num=" + instances.size() + ", instances="
+ + instances + "]";
+ }
+
+ @Override
+ public String getLocalClusterSyncTokenId() {
+ throw new IllegalStateException("no syncToken applicable");
}
}
Modified: sling/trunk/bundles/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/cluster/ClusterTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/cluster/ClusterTest.java?rev=1708119&r1=1708118&r2=1708119&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/cluster/ClusterTest.java (original)
+++ sling/trunk/bundles/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/cluster/ClusterTest.java Mon Oct 12 13:54:14 2015
@@ -97,7 +97,7 @@ public class ClusterTest {
public void setup() throws Exception {
final org.apache.log4j.Logger discoveryLogger = LogManager.getRootLogger().getLogger("org.apache.sling.discovery");
logLevel = discoveryLogger.getLevel();
- discoveryLogger.setLevel(Level.DEBUG);
+ discoveryLogger.setLevel(Level.TRACE);
logger.debug("here we are");
instance1 = Instance.newStandaloneInstance("firstInstance", true);
instance2 = Instance.newClusterInstance("secondInstance", instance1,
@@ -1355,12 +1355,15 @@ public class ClusterTest {
// let the instance1 become alone, instance2 is idle
instance1.getConfig().setHeartbeatTimeout(2);
instance2.getConfig().setHeartbeatTimeout(2);
+ logger.info("testLongRunningListener : letting instance2 remain silent from now on");
instance1.runHeartbeatOnce();
Thread.sleep(1500);
instance1.runHeartbeatOnce();
Thread.sleep(1500);
instance1.runHeartbeatOnce();
Thread.sleep(1500);
+ instance1.runHeartbeatOnce();
+ logger.info("testLongRunningListener : instance 2 should now be considered dead");
// instance1.dumpRepo();
LongRunningListener longRunningListener1 = new LongRunningListener();
@@ -1368,11 +1371,14 @@ public class ClusterTest {
fastListener2.addExpected(Type.TOPOLOGY_INIT);
longRunningListener1.assertNoFail();
assertEquals(1, fastListener2.getRemainingExpectedCount());
+ logger.info("testLongRunningListener : binding longRunningListener1 ...");
instance1.bindTopologyEventListener(longRunningListener1);
+ logger.info("testLongRunningListener : binding fastListener2 ...");
instance1.bindTopologyEventListener(fastListener2);
- Thread.sleep(500); // SLING-4755: async event sending requires some minimal wait time nowadays
- assertTrue(longRunningListener1.initReceived);
+ logger.info("testLongRunningListener : waiting a bit for longRunningListener1 to receive the TOPOLOGY_INIT event");
+ Thread.sleep(2500); // SLING-4755: async event sending requires some minimal wait time nowadays
assertEquals(0, fastListener2.getRemainingExpectedCount());
+ assertTrue(longRunningListener1.initReceived);
// after INIT, now do an actual change where listener1 will do a long-running handling
fastListener2.addExpected(Type.TOPOLOGY_CHANGING);
Modified: sling/trunk/bundles/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/cluster/SingleInstanceTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/cluster/SingleInstanceTest.java?rev=1708119&r1=1708118&r2=1708119&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/cluster/SingleInstanceTest.java (original)
+++ sling/trunk/bundles/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/cluster/SingleInstanceTest.java Mon Oct 12 13:54:14 2015
@@ -194,6 +194,7 @@ public class SingleInstanceTest {
AssertingTopologyEventListener assertingTopologyEventListener = new AssertingTopologyEventListener();
assertingTopologyEventListener.addExpected(Type.TOPOLOGY_INIT);
+ logger.info("testTopologyEventListeners: binding the event listener");
instance.bindTopologyEventListener(assertingTopologyEventListener);
Thread.sleep(500); // SLING-4755: async event sending requires some minimal wait time nowadays
assertEquals(0, assertingTopologyEventListener.getRemainingExpectedCount());
@@ -210,6 +211,8 @@ public class SingleInstanceTest {
instance.bindPropertyProvider(pp, propertyName);
logger.info("testTopologyEventListeners: 3rd sleep 1.5s");
Thread.sleep(1500);
+ logger.info("testTopologyEventListeners: dumping due to failure: ");
+ assertingTopologyEventListener.dump();
assertEquals(0, assertingTopologyEventListener.getRemainingExpectedCount());
// we can only assume that the getProperty was called at least once - it
// could be called multiple times though..
@@ -226,14 +229,14 @@ public class SingleInstanceTest {
logger.info("testTopologyEventListeners: 4th sleep 2s");
Thread.sleep(2000);
assertEquals(0, assertingTopologyEventListener.getRemainingExpectedCount());
- assertEquals(2, pp.getGetCnt());
+ assertEquals(1, pp.getGetCnt());
// a heartbeat repeat should not result in another call though
instance.runHeartbeatOnce();
logger.info("testTopologyEventListeners: 5th sleep 2s");
Thread.sleep(2000);
assertEquals(0, assertingTopologyEventListener.getRemainingExpectedCount());
- assertEquals(3, pp.getGetCnt());
+ assertEquals(2, pp.getGetCnt());
logger.info("testTopologyEventListeners: done");
}
@@ -268,6 +271,7 @@ public class SingleInstanceTest {
instance.runHeartbeatOnce();
Thread.sleep(1000);
instance.dumpRepo();
+ ada.dump();
assertEquals(0, ada.getUnexpectedCount());
assertEquals(1, ada.getEvents().size());
TopologyEvent initEvent = ada.getEvents().remove(0);
Modified: sling/trunk/bundles/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/cluster/TopologyEventTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/cluster/TopologyEventTest.java?rev=1708119&r1=1708118&r2=1708119&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/cluster/TopologyEventTest.java (original)
+++ sling/trunk/bundles/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/cluster/TopologyEventTest.java Mon Oct 12 13:54:14 2015
@@ -138,16 +138,6 @@ public class TopologyEventTest {
assertEquals(0, l1Two.getRemainingExpectedCount()); // the expected one
assertEquals(0, l1Two.getUnexpectedCount());
- // one heartbeat doesn't change the history yet
- logger.info("testDelayedInitEvent: an additional heartbeat shouldn't trigger any event for now");
- instance1.runHeartbeatOnce();
- instance2.runHeartbeatOnce();
- assertEquals(1, l1.getEvents().size()); // one event
- assertEquals(0, l1.getUnexpectedCount());
- assertEquals(0, l2.getEvents().size());
- assertEquals(0, l2.getUnexpectedCount());
- assertEquals(1, l1Two.getEvents().size());
- assertEquals(0, l1Two.getUnexpectedCount());
// the second & third heartbeat though triggers the voting etc
logger.info("testDelayedInitEvent: two more heartbeats should trigger events");
@@ -158,6 +148,9 @@ public class TopologyEventTest {
instance1.runHeartbeatOnce();
instance2.runHeartbeatOnce();
Thread.sleep(500);
+ instance1.runHeartbeatOnce();
+ instance2.runHeartbeatOnce();
+ Thread.sleep(500);
instance1.runHeartbeatOnce();
instance2.runHeartbeatOnce();
logger.info("testDelayedInitEvent: instance1: "+instance1.slingId);
Modified: sling/trunk/bundles/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/cluster/helpers/AssertingTopologyEventListener.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/cluster/helpers/AssertingTopologyEventListener.java?rev=1708119&r1=1708118&r2=1708119&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/cluster/helpers/AssertingTopologyEventListener.java (original)
+++ sling/trunk/bundles/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/cluster/helpers/AssertingTopologyEventListener.java Mon Oct 12 13:54:14 2015
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertTru
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertNotNull;
+import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
@@ -47,12 +48,9 @@ public class AssertingTopologyEventListe
this.debugInfo = debugInfo;
}
- public void fail(String errorMsg) {
- this.errorMsg = errorMsg;
- }
-
- public String getErrorMsg() {
- return errorMsg;
+ @Override
+ public String toString() {
+ return super.toString()+"-[debugInfo="+debugInfo+"]";
}
private List<TopologyEvent> events_ = new LinkedList<TopologyEvent>();
@@ -147,4 +145,16 @@ public class AssertingTopologyEventListe
public int getUnexpectedCount() {
return unexpectedEvents_.size();
}
+
+ public void dump() {
+ StringBuffer ue = new StringBuffer();
+ if (unexpectedEvents_.size()>0) {
+ for (Iterator<TopologyEvent> it = unexpectedEvents_.iterator(); it.hasNext();) {
+ TopologyEvent topologyEvent = it.next();
+ ue.append(topologyEvent+", ");
+ }
+ unexpectedEvents_.iterator();
+ }
+ logger.info("dump: got "+events_.size()+" events, "+unexpectedEvents_.size()+" (details: "+ue+") thereof unexpected. My list of expected events contains "+expectedEvents.size());
+ }
}
\ No newline at end of file
Modified: sling/trunk/bundles/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/common/heartbeat/HeartbeatTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/common/heartbeat/HeartbeatTest.java?rev=1708119&r1=1708118&r2=1708119&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/common/heartbeat/HeartbeatTest.java (original)
+++ sling/trunk/bundles/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/common/heartbeat/HeartbeatTest.java Mon Oct 12 13:54:14 2015
@@ -30,6 +30,7 @@ import java.util.Calendar;
import java.util.Date;
import java.util.HashSet;
import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
@@ -55,12 +56,16 @@ import org.apache.sling.discovery.impl.s
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class HeartbeatTest {
+ private final Logger logger = LoggerFactory.getLogger(this.getClass());
+
class SimpleTopologyEventListener implements TopologyEventListener {
+ private List<TopologyEvent> events = new LinkedList<TopologyEvent>();
private TopologyEvent lastEvent;
private int eventCount;
private final String name;
@@ -71,6 +76,7 @@ public class HeartbeatTest {
@Override
public void handleTopologyEvent(TopologyEvent event) {
+ events.add(event);
String msg = event.toString();
TopologyView newView = event.getNewView();
switch(event.getType()) {
@@ -119,7 +125,7 @@ public class HeartbeatTest {
public void setup() throws Exception {
final org.apache.log4j.Logger discoveryLogger = LogManager.getRootLogger().getLogger("org.apache.sling.discovery");
logLevel = discoveryLogger.getLevel();
- discoveryLogger.setLevel(Level.DEBUG);
+ discoveryLogger.setLevel(Level.TRACE);
}
@After
@@ -147,6 +153,7 @@ public class HeartbeatTest {
}
public void doTestPartitioning(boolean scheduler) throws Throwable {
+ logger.info("doTestPartitioning: creating slowMachine...");
Instance slowMachine = Instance.newStandaloneInstance("/var/discovery/impl/", "slow", true, 10 /*10sec timeout*/,
999 /* 999sec interval: to disable it*/, 0, UUID.randomUUID().toString());
assertEquals(1, slowMachine.getDiscoveryService().getTopology().getInstances().size());
@@ -155,6 +162,8 @@ public class HeartbeatTest {
Thread.sleep(10); // wait 10ms to ensure 'slowMachine' has the lowerst leaderElectionId (to become leader)
SimpleTopologyEventListener slowListener = new SimpleTopologyEventListener("slow");
slowMachine.bindTopologyEventListener(slowListener);
+
+ logger.info("doTestPartitioning: creating fastMachine1...");
Instance fastMachine1 = Instance.newClusterInstance("/var/discovery/impl/", "fast1", slowMachine, false, 10, 1, 0);
assertEquals(1, fastMachine1.getDiscoveryService().getTopology().getInstances().size());
assertEquals(fastMachine1.getSlingId(), fastMachine1.getDiscoveryService().getTopology().getInstances().iterator().next().getSlingId());
@@ -162,18 +171,24 @@ public class HeartbeatTest {
Thread.sleep(10); // wait 10ms to ensure 'fastMachine1' has the 2nd lowerst leaderElectionId (to become leader during partitioning)
SimpleTopologyEventListener fastListener1 = new SimpleTopologyEventListener("fast1");
fastMachine1.bindTopologyEventListener(fastListener1);
+
+ logger.info("doTestPartitioning: creating fastMachine2...");
Instance fastMachine2 = Instance.newClusterInstance("/var/discovery/impl/", "fast2", slowMachine, false, 10, 1, 0);
assertEquals(1, fastMachine2.getDiscoveryService().getTopology().getInstances().size());
assertEquals(fastMachine2.getSlingId(), fastMachine2.getDiscoveryService().getTopology().getInstances().iterator().next().getSlingId());
instances.add(fastMachine2);
SimpleTopologyEventListener fastListener2 = new SimpleTopologyEventListener("fast2");
fastMachine2.bindTopologyEventListener(fastListener2);
+
+ logger.info("doTestPartitioning: creating fastMachine3...");
Instance fastMachine3 = Instance.newClusterInstance("/var/discovery/impl/", "fast3", slowMachine, false, 10, 1, 0);
assertEquals(1, fastMachine3.getDiscoveryService().getTopology().getInstances().size());
assertEquals(fastMachine3.getSlingId(), fastMachine3.getDiscoveryService().getTopology().getInstances().iterator().next().getSlingId());
instances.add(fastMachine3);
SimpleTopologyEventListener fastListener3 = new SimpleTopologyEventListener("fast3");
fastMachine3.bindTopologyEventListener(fastListener3);
+
+ logger.info("doTestPartitioning: creating fastMachine4...");
Instance fastMachine4 = Instance.newClusterInstance("/var/discovery/impl/", "fast4", slowMachine, false, 10, 1, 0);
assertEquals(1, fastMachine4.getDiscoveryService().getTopology().getInstances().size());
assertEquals(fastMachine4.getSlingId(), fastMachine4.getDiscoveryService().getTopology().getInstances().iterator().next().getSlingId());
@@ -181,6 +196,7 @@ public class HeartbeatTest {
SimpleTopologyEventListener fastListener4 = new SimpleTopologyEventListener("fast4");
fastMachine4.bindTopologyEventListener(fastListener4);
+ logger.info("doTestPartitioning: letting heartbeats be sent by all instances for a few loops...");
HeartbeatHandler hhSlow = slowMachine.getHeartbeatHandler();
for(int i=0; i<3; i++) {
hhSlow.issueHeartbeat();
@@ -199,6 +215,7 @@ public class HeartbeatTest {
}
// at this stage the 4 fast plus the slow instance should all see each other
+ logger.info("doTestPartitioning: all 4 instances should have agreed on seeing each other");
assertNotNull(fastListener1.getLastEvent());
assertEquals(TopologyEvent.Type.TOPOLOGY_INIT, fastListener1.getLastEvent().getType());
assertEquals(5, fastListener1.getLastEvent().getNewView().getInstances().size());
@@ -221,6 +238,7 @@ public class HeartbeatTest {
assertTrue(slowListener.getLastEvent().getNewView().getLocalInstance().isLeader());
// after 12sec the slow instance' heartbeat should have timed out
+ logger.info("doTestPartitioning: letting slowMachine NOT send any heartbeats for 12sec, only the fast ones do...");
for(int i=0; i<12; i++) {
if (!scheduler) {
fastMachine1.getHeartbeatHandler().issueHeartbeat();
@@ -234,10 +252,13 @@ public class HeartbeatTest {
}
Thread.sleep(1000);
}
+ logger.info("doTestPartitioning: this should now have decoupled slowMachine from the other 4...");
// so the fast listeners should only see 4 instances remaining
for(int i=0; i<7; i++) {
+ logger.info("doTestPartitioning: sleeping 2sec...");
Thread.sleep(2000);
+ logger.info("doTestPartitioning: the 4 fast machines should all just see themselves...");
assertEquals(TopologyEvent.Type.TOPOLOGY_CHANGED, fastListener1.getLastEvent().getType());
assertEquals(4, fastListener1.getLastEvent().getNewView().getInstances().size());
assertEquals(TopologyEvent.Type.TOPOLOGY_CHANGED, fastListener2.getLastEvent().getType());
@@ -254,7 +275,8 @@ public class HeartbeatTest {
// and the slow instance should be isolated
assertFalse(slowMachine.getDiscoveryService().getTopology().isCurrent());
- assertEquals(5, slowMachine.getDiscoveryService().getTopology().getInstances().size());
+ // however we can't really make any assertions on how many instances a non-current topology contains...
+ // assertEquals(5, slowMachine.getDiscoveryService().getTopology().getInstances().size());
if (i==0) {
assertEquals(TopologyEvent.Type.TOPOLOGY_INIT, slowListener.getLastEvent().getType());
} else {
@@ -272,7 +294,8 @@ public class HeartbeatTest {
TopologyView slowTopo = slowMachine.getDiscoveryService().getTopology();
assertNotNull(slowTopo);
assertFalse(slowTopo.isCurrent());
- assertEquals(5, slowTopo.getInstances().size());
+ // again, can't make any assertion on how many instances the slow non-current view contains...
+ //assertEquals(5, slowTopo.getInstances().size());
if (!scheduler) {
fastMachine1.getHeartbeatHandler().issueHeartbeat();
fastMachine1.getHeartbeatHandler().checkView();
@@ -341,11 +364,13 @@ public class HeartbeatTest {
}
public void doTestSlowAndFastMachine() throws Throwable {
+ logger.info("doTestSlowAndFastMachine: creating slowMachine... (w/o heartbeat runner)");
Instance slowMachine = Instance.newStandaloneInstance("/var/discovery/impl/", "slow", true, 5 /*5sec timeout*/,
999 /* 999sec interval: to disable it*/, 0, UUID.randomUUID().toString());
instances.add(slowMachine);
SimpleTopologyEventListener slowListener = new SimpleTopologyEventListener("slow");
slowMachine.bindTopologyEventListener(slowListener);
+ logger.info("doTestSlowAndFastMachine: creating fastMachine... (w/o heartbeat runner)");
Instance fastMachine = Instance.newClusterInstance("/var/discovery/impl/", "fast", slowMachine, false, 5, 999, 0);
instances.add(fastMachine);
SimpleTopologyEventListener fastListener = new SimpleTopologyEventListener("fast");
@@ -354,12 +379,14 @@ public class HeartbeatTest {
HeartbeatHandler hhFast = fastMachine.getHeartbeatHandler();
Thread.sleep(1000);
+ logger.info("doTestSlowAndFastMachine: no event should have been triggered yet");
assertFalse(fastMachine.getDiscoveryService().getTopology().isCurrent());
assertFalse(slowMachine.getDiscoveryService().getTopology().isCurrent());
assertNull(fastListener.getLastEvent());
assertNull(slowListener.getLastEvent());
// make few rounds of heartbeats so that the two instances see each other
+ logger.info("doTestSlowAndFastMachine: send a couple of heartbeats to connect the two..");
for(int i=0; i<5; i++) {
hhSlow.issueHeartbeat();
hhSlow.checkView();
@@ -367,6 +394,7 @@ public class HeartbeatTest {
hhFast.checkView();
Thread.sleep(100);
}
+ logger.info("doTestSlowAndFastMachine: now the two instances should be connected.");
slowMachine.dumpRepo();
assertEquals(2, slowMachine.getDiscoveryService().getTopology().getInstances().size());
@@ -377,12 +405,15 @@ public class HeartbeatTest {
assertEquals(1, slowListener.getEventCount());
// now let the slow machine be slow while the fast one updates as expected
+ logger.info("doTestSlowAndFastMachine: last heartbeat of slowMachine.");
hhSlow.issueHeartbeat();
+ logger.info("doTestSlowAndFastMachine: while the fastMachine still sends heartbeats...");
for(int i=0; i<6; i++) {
Thread.sleep(1500);
hhFast.issueHeartbeat();
hhFast.checkView();
}
+ logger.info("doTestSlowAndFastMachine: now the fastMachine should have decoupled the slow one");
fastMachine.dumpRepo();
hhFast.checkView(); // one more for the start of the vote
fastMachine.dumpRepo();
@@ -398,8 +429,10 @@ public class HeartbeatTest {
assertFalse(topo.isCurrent());
// after those 6 sec, hhSlow does the check (6sec between heartbeat and check)
+ logger.info("doTestSlowAndFastMachine: slowMachine is going to do a checkView next - and will detect being decoupled");
hhSlow.checkView();
slowMachine.dumpRepo();
+ logger.info("doTestSlowAndFastMachine: slowMachine is going to also do a heartbeat next");
hhSlow.issueHeartbeat();
assertEquals(TopologyEvent.Type.TOPOLOGY_CHANGED, fastListener.getLastEvent().getType());
assertEquals(3, fastListener.getEventCount());
@@ -407,6 +440,7 @@ public class HeartbeatTest {
assertEquals(2, slowListener.getEventCount());
Thread.sleep(8000);
// even after 8 sec the slow lsitener did not send a TOPOLOGY_CHANGED yet
+ logger.info("doTestSlowAndFastMachine: after another 8 sec of silence from slowMachine, it should still remain in CHANGING state");
assertEquals(TopologyEvent.Type.TOPOLOGY_CHANGING, slowListener.getLastEvent().getType());
assertFalse(slowMachine.getDiscoveryService().getTopology().isCurrent());
assertEquals(2, slowListener.getEventCount());
@@ -415,6 +449,7 @@ public class HeartbeatTest {
assertEquals(3, fastListener.getEventCount());
// make few rounds of heartbeats so that the two instances see each other again
+ logger.info("doTestSlowAndFastMachine: now let both fast and slow issue heartbeats...");
for(int i=0; i<4; i++) {
hhFast.issueHeartbeat();
hhFast.checkView();
@@ -422,6 +457,7 @@ public class HeartbeatTest {
hhSlow.checkView();
Thread.sleep(1000);
}
+ logger.info("doTestSlowAndFastMachine: by now the two should have joined");
// this should have put the two together again
// even after 8 sec the slow lsitener did not send a TOPOLOGY_CHANGED yet
@@ -489,15 +525,20 @@ public class HeartbeatTest {
*/
@Test
public void testVotingLoop() throws Throwable {
+ logger.info("testVotingLoop: creating slowMachine1...");
Instance slowMachine1 = Instance.newStandaloneInstance("/var/discovery/impl/", "slow1", true, 600 /*600sec timeout*/,
999 /* 999sec interval: to disable it*/, 0, UUID.randomUUID().toString());
instances.add(slowMachine1);
SimpleTopologyEventListener slowListener1 = new SimpleTopologyEventListener("slow1");
slowMachine1.bindTopologyEventListener(slowListener1);
+
+ logger.info("testVotingLoop: creating slowMachine2...");
Instance slowMachine2 = Instance.newClusterInstance("/var/discovery/impl/", "slow2", slowMachine1, false, 600, 999, 0);
instances.add(slowMachine2);
SimpleTopologyEventListener slowListener2 = new SimpleTopologyEventListener("slow2");
slowMachine2.bindTopologyEventListener(slowListener2);
+
+ logger.info("testVotingLoop: creating fastMachine...");
Instance fastMachine = Instance.newClusterInstance("/var/discovery/impl/", "fast", slowMachine1, false, 600, 999, 0);
instances.add(fastMachine);
SimpleTopologyEventListener fastListener = new SimpleTopologyEventListener("fast");
@@ -507,6 +548,7 @@ public class HeartbeatTest {
HeartbeatHandler hhFast = fastMachine.getHeartbeatHandler();
Thread.sleep(1000);
+ logger.info("testVotingLoop: after some initial 1sec sleep no event should yet have been sent");
assertFalse(fastMachine.getDiscoveryService().getTopology().isCurrent());
assertFalse(slowMachine1.getDiscoveryService().getTopology().isCurrent());
assertFalse(slowMachine2.getDiscoveryService().getTopology().isCurrent());
@@ -515,9 +557,11 @@ public class HeartbeatTest {
assertNull(slowListener2.getLastEvent());
// prevent the slow machine from voting
+ logger.info("testVotingLoop: stopping voting of slowMachine1...");
slowMachine1.stopVoting();
// now let all issue a heartbeat
+ logger.info("testVotingLoop: letting slow1, slow2 and fast all issue 1 heartbeat");
hhSlow1.issueHeartbeat();
hhSlow2.issueHeartbeat();
hhFast.issueHeartbeat();
@@ -525,11 +569,14 @@ public class HeartbeatTest {
// now let the fast one start a new voting, to which
// only the fast one will vote, the slow one doesn't.
// that will cause a voting loop
+ logger.info("testVotingLoop: let the fast one do a checkView, thus initiate a voting");
hhFast.checkView();
Calendar previousVotedAt = null;
for(int i=0; i<5; i++) {
+ logger.info("testVotingLoop: sleeping 1sec...");
Thread.sleep(1000);
+ logger.info("testVotingLoop: check to see that there is no voting loop...");
// now check the ongoing votings
ResourceResolverFactory factory = fastMachine.getResourceResolverFactory();
ResourceResolver resourceResolver = factory