You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by st...@apache.org on 2015/10/08 16:31:46 UTC

svn commit: r1707548 [1/4] - in /sling/trunk/bundles/extensions/discovery/commons: ./ src/main/java/org/apache/sling/discovery/commons/providers/ src/main/java/org/apache/sling/discovery/commons/providers/impl/ src/main/java/org/apache/sling/discovery/...

Author: stefanegli
Date: Thu Oct  8 14:31:45 2015
New Revision: 1707548

URL: http://svn.apache.org/viewvc?rev=1707548&view=rev
Log:
SLING-5131 : introducing ConsistencyService and an oak-discovery-lite based implementation of it - plus SLING-4697 : support for PROPERTIES_CHANGED added to ViewStateManagerImpl

Added:
    sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/EventFactory.java   (with props)
    sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/impl/
    sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/impl/AsyncEvent.java   (with props)
    sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/impl/AsyncEventSender.java   (with props)
    sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/impl/MinEventDelayHandler.java   (with props)
    sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/impl/ViewStateManagerFactory.java   (with props)
    sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/impl/ViewStateManagerImpl.java
      - copied, changed from r1706814, sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/ViewStateManager.java
    sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/spi/
    sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/spi/ConsistencyService.java   (with props)
    sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/spi/impl/
    sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/spi/impl/OakSyncTokenConsistencyService.java   (with props)
    sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/spi/impl/SyncTokenConsistencyService.java   (with props)
    sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/impl/
    sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/impl/ClusterTest.java   (with props)
    sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/impl/Listener.java   (with props)
    sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/impl/SimpleClusterView.java   (with props)
    sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/impl/SimpleDiscoveryService.java   (with props)
    sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/impl/SimpleInstanceDescription.java   (with props)
    sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/impl/SimpleScheduler.java   (with props)
    sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/impl/SimpleTopologyView.java   (with props)
    sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/impl/TestHelper.java   (with props)
    sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/impl/TestMinEventDelayHandler.java   (with props)
    sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/impl/TestViewStateManager.java
      - copied, changed from r1706814, sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/TestViewStateManager.java
    sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/spi/
    sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/spi/impl/
    sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/spi/impl/DiscoLite.java   (with props)
    sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/spi/impl/MockFactory.java   (with props)
    sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/spi/impl/MockedResource.java   (with props)
    sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/spi/impl/MockedResourceResolver.java   (with props)
    sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/spi/impl/RepositoryHelper.java   (with props)
    sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/spi/impl/TestOakSyncTokenConsistencyService.java   (with props)
    sling/trunk/bundles/extensions/discovery/commons/src/test/resources/
    sling/trunk/bundles/extensions/discovery/commons/src/test/resources/log4j.properties   (with props)
Removed:
    sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/ViewStateManager.java
    sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/TestViewStateManager.java
Modified:
    sling/trunk/bundles/extensions/discovery/commons/pom.xml
    sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/BaseTopologyView.java

Modified: sling/trunk/bundles/extensions/discovery/commons/pom.xml
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/discovery/commons/pom.xml?rev=1707548&r1=1707547&r2=1707548&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/discovery/commons/pom.xml (original)
+++ sling/trunk/bundles/extensions/discovery/commons/pom.xml Thu Oct  8 14:31:45 2015
@@ -57,11 +57,77 @@
             <artifactId>bndlib</artifactId>
         </dependency>
         <dependency>
+        	<groupId>org.apache.sling</groupId>
+        	<artifactId>org.apache.sling.commons.scheduler</artifactId>
+        	<version>2.3.4</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
             <groupId>org.apache.sling</groupId>
             <artifactId>org.apache.sling.discovery.api</artifactId>
             <version>1.0.0</version>
             <scope>provided</scope>
         </dependency>
+		<dependency>
+			<groupId>org.apache.sling</groupId>
+			<artifactId>org.apache.sling.api</artifactId>
+			<version>2.4.0</version>
+            <scope>provided</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.sling</groupId>
+			<artifactId>org.apache.sling.commons.json</artifactId>
+			<version>2.0.6</version>
+            <scope>provided</scope>
+		</dependency>
+		<dependency>
+			<groupId>javax.jcr</groupId>
+			<artifactId>jcr</artifactId>
+			<version>2.0</version>
+			<scope>provided</scope>
+		</dependency>
+        <dependency>
+        	<groupId>org.apache.sling</groupId>
+        	<artifactId>org.apache.sling.commons.testing</artifactId>
+        	<version>2.0.16</version>
+        	<scope>test</scope>
+            <exclusions>
+                <!-- slf4j simple implementation logs INFO + higher to stdout (we don't want that behaviour) -->
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-simple</artifactId>
+                </exclusion>
+                <!--  also excluding jcl-over-slf4j as we need a newer vesion of this which is compatible with slf4j 1.6 -->
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>jcl-over-slf4j</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+        	<!--  needed to get the nodetypes for testing properly with sling: prefix -->
+        	<groupId>org.apache.sling</groupId>
+        	<artifactId>org.apache.sling.jcr.resource</artifactId>
+        	<version>2.3.8</version>
+        	<scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.jackrabbit</groupId>
+            <artifactId>oak-core</artifactId>
+            <version>1.3.7</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.jackrabbit</groupId>
+            <artifactId>oak-jcr</artifactId>
+            <version>1.3.7</version>
+            <scope>provided</scope>
+        </dependency>
+		<dependency>
+			<groupId>javax.servlet</groupId>
+			<artifactId>servlet-api</artifactId>
+        	<scope>test</scope>
+		</dependency>
         <dependency>
             <groupId>com.google.code.findbugs</groupId>
             <artifactId>jsr305</artifactId>
@@ -83,5 +149,31 @@
             <version>1.9.5</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+        	<groupId>org.slf4j</groupId>
+        	<artifactId>slf4j-log4j12</artifactId>
+        	<version>1.7.5</version>
+        	<scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>log4j</groupId>
+            <artifactId>log4j</artifactId>
+            <version>1.2.13</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+        	<groupId>org.apache.jackrabbit</groupId>
+        	<artifactId>jackrabbit-jcr-commons</artifactId>
+        	<version>2.11.0</version>
+        	<type>bundle</type>
+        	<scope>test</scope>
+        </dependency>
+        <dependency>
+        	<groupId>org.apache.jackrabbit</groupId>
+        	<artifactId>jackrabbit-api</artifactId>
+        	<version>2.11.0</version>
+        	<type>bundle</type>
+        	<scope>test</scope>
+        </dependency>
     </dependencies>
 </project>

Modified: sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/BaseTopologyView.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/BaseTopologyView.java?rev=1707548&r1=1707547&r2=1707548&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/BaseTopologyView.java (original)
+++ sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/BaseTopologyView.java Thu Oct  8 14:31:45 2015
@@ -49,4 +49,37 @@ public abstract class BaseTopologyView i
         current = false;
     }
 
+    /**
+     * Returns the id that shall be used in the syncToken
+     * by the ConsistencyService.
+     * <p>
+     * The clusterSyncId uniquely identifies each change
+     * of the local cluster for all participating instances. 
+     * That means, all participating instances know of the 
+     * clusterSyncId and it is the same for all instances.
+     * Whenever an instance joins/leaves the cluster, this
+     * clusterSyncId must change. 
+     * <p>
+     * Since this method returns the *local* clusterSyncId,
+     * it doesn't care if a remote cluster experienced
+     * changes - it must only change when the local cluster changes.
+     * However, it *can* change when a remote cluster changes too.
+     * So the requirement is just that it changes *at least* when
+     * the local cluster changes - but implementations
+     * can opt to regard this rather as a TopologyView-ID too
+     * (ie an ID that identifies a particular incarnation
+     * of the TopologyView for all participating instances
+     * in the whole topology).
+     * <p>
+     * This id can further safely be used by the ConsistencyService
+     * to identify a syncToken that it writes and that all
+     * other instances in the lcoal cluster wait for, before
+     * sending a TOPOLOGY_CHANGED event.
+     * <p>
+     * Note that this is obviously not to be confused
+     * with the ClusterView.getId() which is stable throughout
+     * the lifetime of a cluster.
+     */
+    public abstract String getLocalClusterSyncTokenId();
+
 }

Added: sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/EventFactory.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/EventFactory.java?rev=1707548&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/EventFactory.java (added)
+++ sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/EventFactory.java Thu Oct  8 14:31:45 2015
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.discovery.commons.providers;
+
+import org.apache.sling.discovery.TopologyEvent;
+import org.apache.sling.discovery.TopologyEvent.Type;
+
+/** Factory for creating TopologyEvents with BaseTopologyView **/
+public class EventFactory {
+
+    /** Simple factory method for creating a TOPOLOGY_INIT event with the given newView **/
+    public static TopologyEvent newInitEvent(final BaseTopologyView newView) {
+        if (newView==null) {
+            throw new IllegalStateException("newView must not be null");
+        }
+        if (!newView.isCurrent()) {
+            throw new IllegalStateException("newView must be current");
+        }
+        return new TopologyEvent(Type.TOPOLOGY_INIT, null, newView);
+    }
+
+    /** Simple factory method for creating a TOPOLOGY_CHANGING event with the given oldView **/
+    public static TopologyEvent newChangingEvent(final BaseTopologyView oldView) {
+        if (oldView==null) {
+            throw new IllegalStateException("oldView must not be null");
+        }
+        if (oldView.isCurrent()) {
+            throw new IllegalStateException("oldView must not be current");
+        }
+        return new TopologyEvent(Type.TOPOLOGY_CHANGING, oldView, null);
+    }
+
+    /** Simple factory method for creating a TOPOLOGY_CHANGED event with the given old and new views **/
+    public static TopologyEvent newChangedEvent(final BaseTopologyView oldView, final BaseTopologyView newView) {
+        if (oldView==null) {
+            throw new IllegalStateException("oldView must not be null");
+        }
+        if (oldView.isCurrent()) {
+            throw new IllegalStateException("oldView must not be current");
+        }
+        if (newView==null) {
+            throw new IllegalStateException("newView must not be null");
+        }
+        if (!newView.isCurrent()) {
+            throw new IllegalStateException("newView must be current");
+        }
+        return new TopologyEvent(Type.TOPOLOGY_CHANGED, oldView, newView);
+    }
+
+    public static TopologyEvent newPropertiesChangedEvent(final BaseTopologyView oldView, final BaseTopologyView newView) {
+        if (oldView==null) {
+            throw new IllegalStateException("oldView must not be null");
+        }
+        if (oldView.isCurrent()) {
+            throw new IllegalStateException("oldView must not be current");
+        }
+        if (newView==null) {
+            throw new IllegalStateException("newView must not be null");
+        }
+        if (!newView.isCurrent()) {
+            throw new IllegalStateException("newView must be current");
+        }
+        return new TopologyEvent(Type.PROPERTIES_CHANGED, oldView, newView);
+    }
+
+}

Propchange: sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/EventFactory.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/impl/AsyncEvent.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/impl/AsyncEvent.java?rev=1707548&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/impl/AsyncEvent.java (added)
+++ sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/impl/AsyncEvent.java Thu Oct  8 14:31:45 2015
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.discovery.commons.providers.impl;
+
+import org.apache.sling.discovery.TopologyEvent;
+import org.apache.sling.discovery.TopologyEventListener;
+
+/** SLING-4755 : encapsulates an event that yet has to be sent (asynchronously) for a particular listener **/
+final class AsyncEvent {
+    final TopologyEventListener listener;
+    final TopologyEvent event;
+    AsyncEvent(TopologyEventListener listener, TopologyEvent event) {
+        if (listener==null) {
+            throw new IllegalArgumentException("listener must not be null");
+        }
+        if (event==null) {
+            throw new IllegalArgumentException("event must not be null");
+        }
+        this.listener = listener;
+        this.event = event;
+    }
+    @Override
+    public String toString() {
+        return "an AsyncEvent[event="+event+", listener="+listener+"]";
+    }
+}
\ No newline at end of file

Propchange: sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/impl/AsyncEvent.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/impl/AsyncEventSender.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/impl/AsyncEventSender.java?rev=1707548&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/impl/AsyncEventSender.java (added)
+++ sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/impl/AsyncEventSender.java Thu Oct  8 14:31:45 2015
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.discovery.commons.providers.impl;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.sling.discovery.TopologyEvent;
+import org.apache.sling.discovery.TopologyEventListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** 
+ * SLING-4755 : background runnable that takes care of asynchronously sending events.
+ * <p>
+ * API is: enqueue() puts a listener-event tuple onto the internal Q, which
+ * is processed in a loop in run that does so (uninterruptably, even catching
+ * Throwables to be 'very safe', but sleeps 5sec if an Error happens) until
+ * flushThenStop() is called - which puts the sender in a state where any pending
+ * events are still sent (flush) but then stops automatically. The argument of
+ * using flush before stop is that the event was originally meant to be sent
+ * before the bundle was stopped - thus just because the bundle is stopped
+ * doesn't undo the event and it still has to be sent. That obviously can
+ * mean that listeners can receive a topology event after deactivate. But I
+ * guess that was already the case before the change to become asynchronous.
+ */
+final class AsyncEventSender implements Runnable {
+    
+    static final Logger logger = LoggerFactory.getLogger(AsyncEventSender.class);
+
+    /** stopped is always false until flushThenStop is called **/
+    private boolean stopped = false;
+
+    /** eventQ contains all AsyncEvent objects that have yet to be sent - in order to be sent **/
+    private final List<AsyncEvent> eventQ = new LinkedList<AsyncEvent>();
+    
+    /** flag to track whether or not an event is currently being sent (but already taken off the Q **/
+    private boolean isSending = false;
+    
+    /** Enqueues a particular event for asynchronous sending to a particular listener **/
+    void enqueue(TopologyEventListener listener, TopologyEvent event) {
+        final AsyncEvent asyncEvent = new AsyncEvent(listener, event);
+        synchronized(eventQ) {
+            eventQ.add(asyncEvent);
+            if (logger.isDebugEnabled()) {
+                logger.debug("enqueue: enqueued event {} for async sending (Q size: {})", asyncEvent, eventQ.size());
+            }
+            eventQ.notifyAll();
+        }
+    }
+    
+    /**
+     * Stops the AsyncEventSender as soon as the queue is empty
+     */
+    void flushThenStop() {
+        synchronized(eventQ) {
+            logger.info("AsyncEventSender.flushThenStop: flushing (size: {}) & stopping...", eventQ.size());
+            stopped = true;
+            eventQ.notifyAll();
+        }
+    }
+    
+    /** Main worker loop that dequeues from the eventQ and calls sendTopologyEvent with each **/
+    public void run() {
+        logger.info("AsyncEventSender.run: started.");
+        try{
+            while(true) {
+                try{
+                    final AsyncEvent asyncEvent;
+                    synchronized(eventQ) {
+                        isSending = false;
+                        while(!stopped && eventQ.isEmpty()) {
+                            try {
+                                eventQ.wait();
+                            } catch (InterruptedException e) {
+                                // issue a log debug but otherwise continue
+                                logger.debug("AsyncEventSender.run: interrupted while waiting for async events");
+                            }
+                        }
+                        if (stopped) {
+                            if (eventQ.isEmpty()) {
+                                // then we have flushed, so we can now finally stop
+                                logger.info("AsyncEventSender.run: flush finished. stopped.");
+                                return;
+                            } else {
+                                // otherwise the eventQ is not yet empty, so we are still in flush mode
+                                logger.info("AsyncEventSender.run: flushing another event. (pending {})", eventQ.size());
+                            }
+                        }
+                        asyncEvent = eventQ.remove(0);
+                        if (logger.isDebugEnabled()) {
+                            logger.debug("AsyncEventSender.run: dequeued event {}, remaining: {}", asyncEvent, eventQ.size());
+                        }
+                        isSending = asyncEvent!=null;
+                    }
+                    if (asyncEvent!=null) {
+                        sendTopologyEvent(asyncEvent);
+                    }
+                } catch(Throwable th) {
+                    // Even though we should never catch Error or RuntimeException
+                    // here's the thinking about doing it anyway:
+                    //  * in case of a RuntimeException that would be less dramatic
+                    //    and catching it is less of an issue - we rather want
+                    //    the background thread to be able to continue than
+                    //    having it finished just because of a RuntimeException
+                    //  * catching an Error is of course not so nice.
+                    //    however, should we really give up this thread even in
+                    //    case of an Error? It could be an OOM or some other 
+                    //    nasty one, for sure. But even if. Chances are that
+                    //    other parts of the system would also get that Error
+                    //    if it is very dramatic. If not, then catching it
+                    //    sounds feasible. 
+                    // My two cents..
+                    // the goal is to avoid quitting the AsyncEventSender thread
+                    logger.error("AsyncEventSender.run: Throwable occurred. Sleeping 5sec. Throwable: "+th, th);
+                    try {
+                        Thread.sleep(5000);
+                    } catch (InterruptedException e) {
+                        logger.warn("AsyncEventSender.run: interrupted while sleeping");
+                    }
+                }
+            }
+        } finally {
+            logger.info("AsyncEventSender.run: quits (finally).");
+        }
+    }
+
+    /** Actual sending of the asynchronous event - catches RuntimeExceptions a listener can send. (Error is caught outside) **/
+    private void sendTopologyEvent(AsyncEvent asyncEvent) {
+        logger.trace("sendTopologyEvent: start");
+        final TopologyEventListener listener = asyncEvent.listener;
+        final TopologyEvent event = asyncEvent.event;
+        try{
+            logger.debug("sendTopologyEvent: sending to listener: {}, event: {}", listener, event);
+            listener.handleTopologyEvent(event);
+        } catch(final Exception e) {
+            logger.warn("sendTopologyEvent: handler threw exception. handler: "+listener+", exception: "+e, e);
+        }
+        logger.trace("sendTopologyEvent: start: listener: {}, event: {}", listener, event);
+    }
+
+    /** for testing only: checks whether there are any events being queued or sent **/
+    boolean hasInFlightEvent() {
+        synchronized(eventQ) {
+            return isSending || !eventQ.isEmpty();
+        }
+    }
+    
+}
\ No newline at end of file

Propchange: sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/impl/AsyncEventSender.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/impl/MinEventDelayHandler.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/impl/MinEventDelayHandler.java?rev=1707548&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/impl/MinEventDelayHandler.java (added)
+++ sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/impl/MinEventDelayHandler.java Thu Oct  8 14:31:45 2015
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.discovery.commons.providers.impl;
+
+import java.util.Date;
+import java.util.concurrent.locks.Lock;
+
+import org.apache.sling.commons.scheduler.Scheduler;
+import org.apache.sling.discovery.DiscoveryService;
+import org.apache.sling.discovery.TopologyView;
+import org.apache.sling.discovery.commons.providers.BaseTopologyView;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** hooks into the ViewStateManagerImpl and adds a delay between
+ * TOPOLOGY_CHANGING and TOPOLOGY_CHANGED - with the idea to avoid
+ * bundle multiple TOPOLOGY_CHANGED events should they happen within
+ * a very short amount of time.
+ */
+class MinEventDelayHandler {
+
+    private final static Logger logger = LoggerFactory.getLogger(MinEventDelayHandler.class);
+
+    private boolean isDelaying = false;
+
+    private final Scheduler scheduler;
+
+    private final long minEventDelaySecs;
+
+    private DiscoveryService discoveryService;
+
+    private ViewStateManagerImpl viewStateManager;
+
+    private Lock lock;
+    
+    MinEventDelayHandler(ViewStateManagerImpl viewStateManager, Lock lock,
+            DiscoveryService discoveryService, Scheduler scheduler,
+            long minEventDelaySecs) {
+        this.viewStateManager = viewStateManager;
+        this.lock = lock;
+        if (discoveryService==null) {
+            throw new IllegalArgumentException("discoveryService must not be null");
+        }
+        this.discoveryService = discoveryService;
+        this.scheduler = scheduler;
+        if (minEventDelaySecs<=0) {
+            throw new IllegalArgumentException("minEventDelaySecs must be greater than 0 (is "+minEventDelaySecs+")");
+        }
+        this.minEventDelaySecs = minEventDelaySecs;
+    }
+
+    /**
+     * Asks the MinEventDelayHandler to handle the new view
+     * and return true if the caller shouldn't worry about any follow-up action -
+     * only if the method returns false should the caller do the usual 
+     * handleNewView action
+     */
+    boolean handlesNewView(BaseTopologyView newView) {
+        if (isDelaying) {
+            // already delaying, so we'll soon ask the DiscoveryServiceImpl for the
+            // latest view and go ahead then
+            logger.info("handleNewView: already delaying, ignoring new view meanwhile");
+            return true;
+        }
+        
+        if (!viewStateManager.hadPreviousView() 
+                || viewStateManager.isPropertiesDiff(newView) 
+                || viewStateManager.unchanged(newView)) {
+            logger.info("handleNewView: we never had a previous view, so we mustn't delay");
+            return false;
+        }
+        
+        // thanks to force==true this will always return true
+        if (!triggerAsyncDelaying(newView)) {
+            logger.info("handleNewView: could not trigger async delaying, sending new view now.");
+            viewStateManager.handleNewViewNonDelayed(newView);
+        }
+        return true;
+    }
+    
+    private boolean triggerAsyncDelaying(BaseTopologyView newView) {
+        final boolean triggered = runAfter(minEventDelaySecs /*seconds*/ , new Runnable() {
+    
+            public void run() {
+                lock.lock();
+                try{
+                    // unlock the CHANGED event for any subsequent call to handleTopologyChanged()
+                    isDelaying = false;
+
+                    // check if the new topology is already ready
+                    TopologyView t = discoveryService.getTopology();
+                    if (!(t instanceof BaseTopologyView)) {
+                        logger.error("asyncDelay.run: topology not of type BaseTopologyView: "+t);
+                        // cannot continue in this case
+                        return;
+                    }
+                    BaseTopologyView topology = (BaseTopologyView) t;
+                    
+                    if (topology.isCurrent()) {
+                        logger.info("asyncDelay.run: got new view: "+topology);
+                        viewStateManager.handleNewViewNonDelayed(topology);
+                    } else {
+                        logger.info("asyncDelay.run: new view (still) not current, delaying again");
+                        triggerAsyncDelaying(topology);
+                        // we're actually not interested in the result here
+                        // if the async part failed, then we have to rely
+                        // on a later handleNewView to come in - we can't
+                        // really send a view now cos it is not current.
+                        // so we're really stuck to waiting for handleNewView
+                        // in this case.
+                    }
+                } catch(RuntimeException re) {
+                    logger.error("RuntimeException: "+re, re);
+                    throw re;
+                } catch(Error er) {
+                    logger.error("Error: "+er, er);
+                    throw er;
+                } finally {
+                    lock.unlock();
+                }
+            }
+        });
+            
+        logger.info("triggerAsyncDelaying: asynch delaying of "+minEventDelaySecs+" triggered: "+triggered);
+        if (triggered) {
+            isDelaying = true;
+        }
+        return triggered;
+    }
+    
+    /**
+     * run the runnable after the indicated number of seconds, once.
+     * @return true if the scheduling of the runnable worked, false otherwise
+     */
+    private boolean runAfter(long seconds, final Runnable runnable) {
+        final Scheduler theScheduler = scheduler;
+        if (theScheduler == null) {
+            logger.info("runAfter: no scheduler set");
+            return false;
+        }
+        logger.trace("runAfter: trying with scheduler.fireJob");
+        final Date date = new Date(System.currentTimeMillis() + seconds * 1000);
+        try {
+            theScheduler.fireJobAt(null, runnable, null, date);
+            return true;
+        } catch (Exception e) {
+            logger.info("runAfter: could not schedule a job: "+e);
+            return false;
+        }
+    }
+
+}

Propchange: sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/impl/MinEventDelayHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/impl/ViewStateManagerFactory.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/impl/ViewStateManagerFactory.java?rev=1707548&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/impl/ViewStateManagerFactory.java (added)
+++ sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/impl/ViewStateManagerFactory.java Thu Oct  8 14:31:45 2015
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.discovery.commons.providers.impl;
+
+import java.util.concurrent.locks.Lock;
+
+import org.apache.sling.discovery.commons.providers.ViewStateManager;
+import org.apache.sling.discovery.commons.providers.spi.ConsistencyService;
+
+/**
+ * Used to create an implementation classes of type ViewStateManager
+ * (with the idea to be able to leave the implementation classes
+ * as package-protected)
+ */
+public class ViewStateManagerFactory {
+
+    public static ViewStateManager newViewStateManager(Lock lock, 
+            ConsistencyService consistencyService) {
+        return new ViewStateManagerImpl(lock, consistencyService);
+    }
+
+}

Propchange: sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/impl/ViewStateManagerFactory.java
------------------------------------------------------------------------------
    svn:eol-style = native

Copied: sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/impl/ViewStateManagerImpl.java (from r1706814, sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/ViewStateManager.java)
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/impl/ViewStateManagerImpl.java?p2=sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/impl/ViewStateManagerImpl.java&p1=sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/ViewStateManager.java&r1=1706814&r2=1707548&rev=1707548&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/ViewStateManager.java (original)
+++ sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/impl/ViewStateManagerImpl.java Thu Oct  8 14:31:45 2015
@@ -16,15 +16,29 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.sling.discovery.commons.providers;
+package org.apache.sling.discovery.commons.providers.impl;
 
 import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
-
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.locks.Lock;
+
+import org.apache.sling.commons.scheduler.Scheduler;
+import org.apache.sling.discovery.DiscoveryService;
+import org.apache.sling.discovery.InstanceDescription;
 import org.apache.sling.discovery.TopologyEvent;
 import org.apache.sling.discovery.TopologyEvent.Type;
 import org.apache.sling.discovery.TopologyEventListener;
+import org.apache.sling.discovery.commons.InstancesDiff;
+import org.apache.sling.discovery.commons.InstancesDiff.InstanceCollection;
+import org.apache.sling.discovery.commons.providers.BaseTopologyView;
+import org.apache.sling.discovery.commons.providers.EventFactory;
+import org.apache.sling.discovery.commons.providers.ViewStateManager;
+import org.apache.sling.discovery.commons.providers.spi.ConsistencyService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -33,21 +47,18 @@ import org.slf4j.LoggerFactory;
  * the 'view state' (changing vs changed) and sending out the appropriate
  * and according TopologyEvents to the registered listeners.
  * <p>
- * Note that this class is completely unsynchronized and the idea is that
- * (since this class is only of interest to other implementors/providers of 
- * the discovery.api, not to users of the discovery.api however) that those
- * other implementors take care of proper synchronization. Without synchronization
- * this class is prone to threading issues! The most simple form of 
- * synchronization is to synchronize all of the public (non-static..) methods
- * with the same monitor object.
+ * Note re synchronization: this class rquires a lock object to be passed
+ * in the constructor - this will be applied to all public methods
+ * appropriately. Additionally, the ConsistencyService callback will
+ * also be locked using the provided lock object.
  */
-public class ViewStateManager {
+class ViewStateManagerImpl implements ViewStateManager {
 
-    private final Logger logger = LoggerFactory.getLogger(this.getClass());
+    private static final Logger logger = LoggerFactory.getLogger(ViewStateManagerImpl.class);
     
     /** 
      * List of bound listeners that have already received their INIT event - others are in unInitializedEventListeners.
-     * @see ViewStateManager#unInitializedEventListeners
+     * @see ViewStateManagerImpl#unInitializedEventListeners
      */
     private List<TopologyEventListener> eventListeners = new ArrayList<TopologyEventListener>();
 
@@ -57,7 +68,7 @@ public class ViewStateManager {
      * <p>
      * This list becomes necessary for cases where the bind() happens before activate, or after activate but at a time
      * when the topology is TOPOLOGY_CHANGING - at which point an TOPOLOGY_INIT event can not yet be sent.
-     * @see ViewStateManager#eventListeners
+     * @see ViewStateManagerImpl#eventListeners
      */
     private List<TopologyEventListener> unInitializedEventListeners = new ArrayList<TopologyEventListener>();
     
@@ -66,10 +77,10 @@ public class ViewStateManager {
      * <p>
      * This controls whether handleChanging() and handleNewView() should cause any events
      * to be sent - which they do not if called before handleActivated() (or after handleDeactivated())
-     * @see ViewStateManager#handleActivated()
-     * @see ViewStateManager#handleChanging()
-     * @see ViewStateManager#handleNewView(BaseTopologyView)
-     * @see ViewStateManager#handleDeactivated()
+     * @see ViewStateManagerImpl#handleActivated()
+     * @see ViewStateManagerImpl#handleChanging()
+     * @see ViewStateManagerImpl#handleNewView(BaseTopologyView)
+     * @see ViewStateManagerImpl#handleDeactivated()
      */
     private boolean activated;
     
@@ -91,262 +102,520 @@ public class ViewStateManager {
      * When this goes false, a TOPOLOGY_CHANGED is sent.
      */
     private boolean isChanging;
+
+    /**
+     * The lock object with which all public methods are guarded - to be provided in the constructor.
+     */
+    protected final Lock lock;
+
+    /**
+     * An optional ConsistencyService can be provided in the constructor which, when set, will
+     * be invoked upon a new view becoming available (in handleNewView) and the actual
+     * TOPOLOGY_CHANGED event will only be sent once the ConsistencyService.sync method
+     * does the according callback (which can be synchronous or asynchronous again).
+     */
+    private final ConsistencyService consistencyService;
     
     /** 
-     * Binds the given eventListener, sending it an INIT event if applicable.
-     * <p>
-     * Note: no synchronization done in ViewStateManager, <b>must</b> be done externally
-     * @param eventListener the eventListener that is to bind
+     * A modification counter that increments on each of the following methods:
+     * <ul>
+     *  <li>handleActivated()</li>
+     *  <li>handleDeactivated()</li>
+     *  <li>handleChanging()</li>
+     *  <li>handleNewView()</li>
+     * </ul>
+     * with the intent that - when a consistencyService is set - the callback from the
+     * ConsistencyService can check if any of the above methods was invoked - and if so,
+     * it does not send the TOPOLOGY_CHANGED event due to those new facts that happened
+     * while it was synching with the repository.
      */
-    public void bind(final TopologyEventListener eventListener) {
+    private int modCnt = 0;
 
-        logger.debug("bind: Binding TopologyEventListener {}",
-                eventListener);
-        
-        if (eventListeners.contains(eventListener) || unInitializedEventListeners.contains(eventListener)) {
-            logger.info("bind: TopologyEventListener already registered: "+eventListener);
-            return;
+    /** SLING-4755 : reference to the background AsyncEventSender. Started/stopped in activate/deactivate **/
+    private AsyncEventSender asyncEventSender;
+    
+    /** SLING-5030 : this map contains the event last sent to each listener to prevent duplicate CHANGING events when scheduler is broken**/
+    private Map<TopologyEventListener,TopologyEvent.Type> lastEventMap = new HashMap<TopologyEventListener, TopologyEvent.Type>();
+
+    private MinEventDelayHandler minEventDelayHandler;
+
+    /**
+     * Creates a new ViewStateManager which synchronizes each method with the given
+     * lock and which optionally uses the given ConsistencyService to sync the repository
+     * upon handling a new view where an instances leaves the local cluster.
+     * @param lock the lock to be used - must not be null
+     * @param consistencyService optional (ie can be null) - the ConsistencyService to 
+     * sync the repository upon handling a new view where an instances leaves the local cluster.
+     */
+    ViewStateManagerImpl(Lock lock, ConsistencyService consistencyService) {
+        if (lock==null) {
+            throw new IllegalArgumentException("lock must not be null");
         }
+        this.lock = lock;
+        this.consistencyService = consistencyService;
+    }
 
-        if (activated) {
-            // check to see in which state we are
-            if (isChanging || (previousView==null)) {
-                // then we cannot send the TOPOLOGY_INIT at this point - need to delay this
-                unInitializedEventListeners.add(eventListener);
+    @Override
+    public void installMinEventDelayHandler(DiscoveryService discoveryService, Scheduler scheduler, long minEventDelaySecs) {
+        this.minEventDelayHandler = new MinEventDelayHandler(this, lock, discoveryService, scheduler, minEventDelaySecs);
+    }
+    
+    protected boolean hadPreviousView() {
+        return previousView!=null;
+    }
+    
+    protected boolean unchanged(BaseTopologyView newView) {
+        if (isChanging) {
+            return false;
+        }
+        if (previousView==null) {
+            return false;
+        }
+        return previousView.equals(newView);
+    }
+    
+    /* (non-Javadoc)
+     * @see org.apache.sling.discovery.commons.providers.impl.ViewStateManager#bind(org.apache.sling.discovery.TopologyEventListener)
+     */
+    @Override
+    public void bind(final TopologyEventListener eventListener) {
+        logger.trace("bind: start {}", eventListener);
+        lock.lock();
+        try{
+            logger.debug("bind: Binding TopologyEventListener {}",
+                    eventListener);
+            
+            if (eventListeners.contains(eventListener) || unInitializedEventListeners.contains(eventListener)) {
+                logger.info("bind: TopologyEventListener already registered: "+eventListener);
+                return;
+            }
+    
+            if (activated) {
+                // check to see in which state we are
+                if (isChanging || (previousView==null)) {
+                    // then we cannot send the TOPOLOGY_INIT at this point - need to delay this
+                    if (logger.isDebugEnabled()) {
+                        logger.debug("bind: view is not yet/currently not defined (isChanging: "+isChanging+
+                                ", previousView==null: "+(previousView==null)+
+                                ", delaying INIT to "+eventListener);
+                    }
+                    unInitializedEventListeners.add(eventListener);
+                } else {
+                    // otherwise we can send the TOPOLOGY_INIT now
+                    logger.debug("bind: view is defined, sending INIT now to {}",
+                            eventListener);
+                    enqueue(eventListener, EventFactory.newInitEvent(previousView));
+                    eventListeners.add(eventListener);
+                }
             } else {
-                // otherwise we can send the TOPOLOGY_INIT now
-                sendEvent(eventListener, newInitEvent(previousView));
-                eventListeners.add(eventListener);
+                logger.debug("bind: not yet activated, delaying INIT to {}",
+                        eventListener);
+                unInitializedEventListeners.add(eventListener);
             }
-        } else {
-            unInitializedEventListeners.add(eventListener);
+        } finally {
+            lock.unlock();
+            logger.trace("bind: end");
         }
     }
     
-    /** 
-     * Unbinds the given eventListener, returning whether or not it was bound at all.
-     * <p>
-     * Note: no synchronization done in ViewStateManager, <b>must</b> be done externally
-     * @param eventListener the eventListner that is to unbind
-     * @return whether or not the listener was added in the first place 
+    /* (non-Javadoc)
+     * @see org.apache.sling.discovery.commons.providers.impl.ViewStateManager#unbind(org.apache.sling.discovery.TopologyEventListener)
      */
+    @Override
     public boolean unbind(final TopologyEventListener eventListener) {
-
-        logger.debug("unbind: Releasing TopologyEventListener {}",
-                eventListener);
-
-        // even though a listener must always only ever exist in one of the two,
-        // the unbind we do - for safety-paranoia-reasons - remove them from both
-        final boolean a = eventListeners.remove(eventListener);
-        final boolean b = unInitializedEventListeners.remove(eventListener);
-        return a || b;
+        logger.trace("unbind: start {}", eventListener);
+        lock.lock();
+        try{
+            logger.debug("unbind: Releasing TopologyEventListener {}",
+                    eventListener);
+    
+            // even though a listener must always only ever exist in one of the two,
+            // the unbind we do - for safety-paranoia-reasons - remove them from both
+            final boolean a = eventListeners.remove(eventListener);
+            final boolean b = unInitializedEventListeners.remove(eventListener);
+            return a || b;
+        } finally {
+            lock.unlock();
+            logger.trace("unbind: end");
+        }
     }
     
-    /** Internal helper method that sends a given event to a list of listeners **/
-    private void sendEvent(final List<TopologyEventListener> audience, final TopologyEvent event) {
-        if (logger.isDebugEnabled()) {
-            logger.debug("sendEvent: sending topologyEvent {}, to all ({}) listeners", event, audience.size());
+    private void enqueue(final TopologyEventListener da, final TopologyEvent event) {
+        logger.trace("enqueue: start: topologyEvent {}, to {}", event, da);
+        if (asyncEventSender==null) {
+            // this should never happen - sendTopologyEvent should only be called
+            // when activated
+            logger.warn("enqueue: asyncEventSender is null, cannot send event ({}, {})!", da, event);
+            return;
         }
+        if (lastEventMap.get(da)==event.getType() && event.getType()==Type.TOPOLOGY_CHANGING) {
+            // don't sent TOPOLOGY_CHANGING twice
+            logger.debug("enqueue: listener already got TOPOLOGY_CHANGING: {}", da);
+            return;
+        }
+        logger.debug("enqueue: enqueuing topologyEvent {}, to {}", event, da);
+        asyncEventSender.enqueue(da, event);
+        lastEventMap.put(da, event.getType());
+        logger.trace("enqueue: sending topologyEvent {}, to {}", event, da);
+    }
+
+    /** Internal helper method that sends a given event to a list of listeners **/
+    private void enqueueForAll(final List<TopologyEventListener> audience, final TopologyEvent event) {
+        logger.debug("enqueueForAll: sending topologyEvent {}, to all ({}) listeners", event, audience.size());
         for (Iterator<TopologyEventListener> it = audience.iterator(); it.hasNext();) {
             TopologyEventListener topologyEventListener = it.next();
-            sendEvent(topologyEventListener, event);
-        }
-        if (logger.isDebugEnabled()) {
-            logger.debug("sendEvent: sent topologyEvent {}, to all ({}) listeners", event, audience.size());
+            enqueue(topologyEventListener, event);
         }
+        logger.trace("enqueueForAll: sent topologyEvent {}, to all ({}) listeners", event, audience.size());
     }
     
-    /** Internal helper method that sends a given event to a particular listener **/
-    private void sendEvent(final TopologyEventListener da, final TopologyEvent event) {
-        if (logger.isDebugEnabled()) {
-            logger.debug("sendEvent: sending topologyEvent {}, to {}", event, da);
-        }
+    /* (non-Javadoc)
+     * @see org.apache.sling.discovery.commons.providers.impl.ViewStateManager#handleActivated()
+     */
+    @Override
+    public void handleActivated() {
+        logger.trace("handleActivated: start");
+        lock.lock();
         try{
-            da.handleTopologyEvent(event);
-        } catch(final Exception e) {
-            logger.warn("sendEvent: handler threw exception. handler: "+da+", exception: "+e, e);
+            logger.debug("handleActivated: activating the ViewStateManager");
+            activated = true;
+            modCnt++;
+            
+            // SLING-4755 : start the asyncEventSender in the background
+            //              will be stopped in deactivate (at which point
+            //              all pending events will still be sent but no
+            //              new events can be enqueued)
+            asyncEventSender = new AsyncEventSender();
+            Thread th = new Thread(asyncEventSender);
+            th.setName("Discovery-AsyncEventSender");
+            th.setDaemon(true);
+            th.start();
+
+            if (previousView!=null && !isChanging) {
+                enqueueForAll(unInitializedEventListeners, EventFactory.newInitEvent(previousView));
+                eventListeners.addAll(unInitializedEventListeners);
+                unInitializedEventListeners.clear();
+            }
+            logger.debug("handleActivated: activated the ViewStateManager");
+        } finally {
+            lock.unlock();
+            logger.trace("handleActivated: finally");
         }
     }
 
-    /** Note: no synchronization done in ViewStateManager, <b>must</b> be done externally **/
-    public void handleActivated() {
-        logger.debug("handleActivated: activating the ViewStateManager");
-        activated = true;
-        
-        if (previousView!=null && !isChanging) {
-            sendEvent(unInitializedEventListeners, newInitEvent(previousView));
-            eventListeners.addAll(unInitializedEventListeners);
-            unInitializedEventListeners.clear();
-        }
-        logger.debug("handleActivated: activated the ViewStateManager");
-    }
+    /* (non-Javadoc)
+     * @see org.apache.sling.discovery.commons.providers.impl.ViewStateManager#handleDeactivated()
+     */
+    @Override
+    public void handleDeactivated() {
+        logger.trace("handleDeactivated: start");
+        lock.lock();
+        try{
+            logger.debug("handleDeactivated: deactivating the ViewStateManager");
+            activated = false;
+            modCnt++;
+    
+            if (asyncEventSender!=null) {
+                // it should always be not-null though
+                asyncEventSender.flushThenStop();
+                asyncEventSender = null;
+            }
 
-    /** Simple factory method for creating a TOPOLOGY_INIT event with the given newView **/
-    public static TopologyEvent newInitEvent(final BaseTopologyView newView) {
-        if (newView==null) {
-            throw new IllegalStateException("newView must not be null");
-        }
-        if (!newView.isCurrent()) {
-            throw new IllegalStateException("newView must be current");
+            if (previousView!=null) {
+                previousView.setNotCurrent();
+                logger.trace("handleDeactivated: setting previousView to null");
+                previousView = null;
+            }
+            logger.trace("handleDeactivated: setting isChanging to false");
+            isChanging = false;
+            
+            eventListeners.clear();
+            unInitializedEventListeners.clear();
+            logger.debug("handleDeactivated: deactivated the ViewStateManager");
+        } finally {
+            lock.unlock();
+            logger.trace("handleDeactivated: finally");
         }
-        return new TopologyEvent(Type.TOPOLOGY_INIT, null, newView);
     }
     
-    /** Simple factory method for creating a TOPOLOGY_CHANGING event with the given oldView **/
-    public static TopologyEvent newChangingEvent(final BaseTopologyView oldView) {
-        if (oldView==null) {
-            throw new IllegalStateException("oldView must not be null");
-        }
-        if (oldView.isCurrent()) {
-            throw new IllegalStateException("oldView must not be current");
+    /* (non-Javadoc)
+     * @see org.apache.sling.discovery.commons.providers.impl.ViewStateManager#handleChanging()
+     */
+    @Override
+    public void handleChanging() {
+        logger.trace("handleChanging: start");
+        lock.lock();
+        try{
+    
+            if (isChanging) {
+                // if isChanging: then this is no news
+                // hence: return asap
+                logger.debug("handleChanging: was already changing - ignoring.");
+                return;
+            }
+            modCnt++;
+            
+            // whether activated or not: set isChanging to true now
+            logger.trace("handleChanging: setting isChanging to true");
+            isChanging = true;
+            
+            if (!activated) {
+                // if not activated: we can only start sending events once activated
+                // hence returning here - after isChanging was set to true accordingly
+                
+                // note however, that if !activated, there should be no eventListeners yet
+                // all of them should be in unInitializedEventListeners at the moment
+                // waiting for activate() and handleNewTopologyView
+                logger.debug("handleChanging: not yet activated - ignoring.");
+                return;
+            }
+            
+            if (previousView==null) {
+                // then nothing further to do - this is a very early changing event
+                // before even the first view was available
+                logger.debug("handleChanging: no previousView set - ignoring.");
+                return;
+            }
+            
+            logger.debug("handleChanging: sending TOPOLOGY_CHANGING to initialized listeners");
+            previousView.setNotCurrent();
+            enqueueForAll(eventListeners, EventFactory.newChangingEvent(previousView));
+        } finally {
+            lock.unlock();
+            logger.trace("handleChanging: finally");
         }
-        return new TopologyEvent(Type.TOPOLOGY_CHANGING, oldView, null);
     }
     
-    /** Simple factory method for creating a TOPOLOGY_CHANGED event with the given old and new views **/
-    public static TopologyEvent newChangedEvent(final BaseTopologyView oldView, final BaseTopologyView newView) {
-        if (oldView==null) {
-            throw new IllegalStateException("oldView must not be null");
-        }
-        if (oldView.isCurrent()) {
-            throw new IllegalStateException("oldView must not be current");
-        }
+    /* (non-Javadoc)
+     * @see org.apache.sling.discovery.commons.providers.impl.ViewStateManager#handleNewView(org.apache.sling.discovery.commons.providers.BaseTopologyView)
+     */
+    @Override
+    public void handleNewView(final BaseTopologyView newView) {
+        logger.trace("handleNewView: start, newView={}", newView);
         if (newView==null) {
-            throw new IllegalStateException("newView must not be null");
+            throw new IllegalArgumentException("newView must not be null");
         }
         if (!newView.isCurrent()) {
-            throw new IllegalStateException("newView must be current");
+            logger.debug("handleNewView: newView is not current - calling handleChanging.");
+            handleChanging();
+            return;// false;
         }
-        return new TopologyEvent(Type.TOPOLOGY_CHANGED, oldView, newView);
+        // paranoia-testing:
+        InstanceDescription localInstance = newView.getLocalInstance();
+        if (localInstance==null) {
+            throw new IllegalStateException("newView does not contain the local instance - hence cannot be current");
+        }
+        if (!localInstance.isLocal()) {
+            throw new IllegalStateException("newView's local instance is not isLocal - very unexpected - hence cannot be current");
+        }
+        logger.debug("handleNewView: newView is current, so trying with minEventDelayHandler...");
+        if (minEventDelayHandler!=null) {
+            if (minEventDelayHandler.handlesNewView(newView)) {
+                return;// true;
+            }
+        }
+        logger.debug("handleNewView: minEventDelayHandler not set or not applicable this time, invoking hanldeNewViewNonDelayed...");
+        /*return */handleNewViewNonDelayed(newView);
     }
 
-    /** 
-     * Must be called when the corresponding service (typically a DiscoveryService implementation)
-     * is deactivated.
-     * <p>
-     * Will mark this manager as deactivated and flags the last available view as not current.
-     * <p>
-     * Note: no synchronization done in ViewStateManager, <b>must</b> be done externally 
-     */
-    public void handleDeactivated() {
-        logger.debug("handleDeactivated: deactivating the ViewStateManager");
-        activated = false;
-
-        if (previousView!=null) {
-            previousView.setNotCurrent();
-            previousView = null;
+    boolean handleNewViewNonDelayed(final BaseTopologyView newView) {
+        logger.trace("handleNewViewNonDelayed: start");
+        lock.lock();
+        try{
+            logger.debug("handleNewViewNonDelayed: start, newView={}", newView);
+            if (!newView.isCurrent()) {
+                logger.error("handleNewViewNonDelayed: newView must be current");
+                throw new IllegalArgumentException("newView must be current");
+            }
+            modCnt++;
+            
+            if (!isChanging) {
+                // verify if there is actually a change between previousView and newView
+                // if there isn't, then there is not much point in sending a CHANGING/CHANGED tuple
+                // at all
+                if (previousView!=null && previousView.equals(newView)) {
+                    // then nothing to send - the view has not changed, and we haven't
+                    // sent the CHANGING event - so we should not do anything here
+                    logger.debug("handleNewViewNonDelayed: we were not in changing state and new view matches old, so - ignoring");
+                    return false;
+                }
+                logger.debug("handleNewViewNonDelayed: implicitly triggering a handleChanging as we were not in changing state");
+                handleChanging();
+                logger.debug("handleNewViewNonDelayed: implicitly triggering of a handleChanging done");
+            }
+                
+            if (!activated) {
+                // then all we can do is to pass this on to previoueView
+                logger.trace("handleNewViewNonDelayed: setting previousView to {}", newView);
+                previousView = newView;
+                // plus set the isChanging flag to false
+                logger.trace("handleNewViewNonDelayed: setting isChanging to false");
+                isChanging = false;
+                
+                // other than that, we can't currently send any event, before activate
+                logger.debug("handleNewViewNonDelayed: not yet activated - ignoring");
+                return true;
+            }
+            
+            // now check if the view indeed changed or if it was just the properties
+            if (!isChanging && isPropertiesDiff(newView)) {
+                // well then send a properties changed event only
+                // and that one does not go via consistencyservice
+                logger.info("handleNewViewNonDelayed: properties changed to: "+newView);
+                previousView.setNotCurrent();
+                enqueueForAll(eventListeners, EventFactory.newPropertiesChangedEvent(previousView, newView));
+                logger.trace("handleNewViewNonDelayed: setting previousView to {}", newView);
+                previousView = newView;
+                return true;
+            }
+            
+            final boolean invokeConsistencyService;
+            if (consistencyService==null) {
+                logger.debug("handleNewViewNonDelayed: no consistencyService set - continuing directly.");
+                invokeConsistencyService = false;
+            } else if (previousView==null) {
+                // when there was no previous view, we cannot determine if
+                // any instance left
+                // so for safety reason: always invoke the consistencyservice
+                logger.debug("handleNewViewNonDelayed: no previousView set - invoking consistencyService");
+                invokeConsistencyService = true;
+            } else {
+                final InstancesDiff diff = new InstancesDiff(previousView, newView);
+                InstanceCollection removed = diff.removed();
+//                Collection<InstanceDescription> c = removed.get();
+//                Iterator<InstanceDescription> it = c.iterator();
+//                while(it.hasNext()) {
+//                    logger.info("handleNewViewNonDelayed: removed: "+it.next());
+//                }
+                InstanceCollection inClusterView = removed.
+                        isInClusterView(newView.getLocalInstance().getClusterView());
+//                c = removed.get();
+//                it = c.iterator();
+//                while(it.hasNext()) {
+//                    logger.info("handleNewViewNonDelayed: inClusterView: "+it.next());
+//                }
+                final boolean anyInstanceLeftLocalCluster = inClusterView.
+                        get().size()>0;
+                if (anyInstanceLeftLocalCluster) {
+                    logger.debug("handleNewViewNonDelayed: anyInstanceLeftLocalCluster=true, hence invoking consistencyService next");
+                } else {
+                    logger.debug("handleNewViewNonDelayed: anyInstanceLeftLocalCluster=false - continuing directly.");
+                }
+                invokeConsistencyService = anyInstanceLeftLocalCluster;
+            }
+                        
+            if (invokeConsistencyService) {
+                // if "instances from the local cluster have been removed"
+                // then:
+                // run the set consistencyService
+                final int lastModCnt = modCnt;
+                logger.debug("handleNewViewNonDelayed: invoking consistencyService (modCnt={})", modCnt);
+                consistencyService.sync(newView,
+                        new Runnable() {
+                    
+                    public void run() {
+                        logger.trace("consistencyService.callback.run: start. acquiring lock...");
+                        lock.lock();
+                        try{
+                            logger.debug("consistencyService.callback.run: lock aquired. (modCnt should be {}, is {})", lastModCnt, modCnt);
+                            if (modCnt!=lastModCnt) {
+                                logger.debug("consistencyService.callback.run: modCnt changed (from {} to {}) - ignoring",
+                                        lastModCnt, modCnt);
+                                return;
+                            }
+                            // else:
+                            doHandleConsistent(newView);
+                        } finally {
+                            lock.unlock();
+                            logger.trace("consistencyService.callback.run: end.");
+                        }
+                    }
+                    
+                });
+            } else {
+                // otherwise we're either told not to use any ConsistencyService
+                // or using it is not applicable at this stage - so continue
+                // with sending the TOPOLOGY_CHANGED (or TOPOLOGY_INIT if there
+                // are any newly bound topology listeners) directly
+                doHandleConsistent(newView);
+            }
+            logger.debug("handleNewViewNonDelayed: end");
+            return true;
+        } finally {
+            lock.unlock();
+            logger.trace("handleNewViewNonDelayed: finally");
         }
-        isChanging = false;
-        
-        eventListeners.clear();
-        unInitializedEventListeners.clear();
-        logger.debug("handleDeactivated: deactivated the ViewStateManager");
     }
-    
-    /** Note: no synchronization done in ViewStateManager, <b>must</b> be done externally **/
-    public void handleChanging() {
-        logger.debug("handleChanging: start");
 
-        if (isChanging) {
-            // if isChanging: then this is no news
-            // hence: return asap
-            logger.debug("handleChanging: was already changing - ignoring.");
-            return;
+    protected boolean isPropertiesDiff(BaseTopologyView newView) {
+        if (previousView==null) {
+            return false;
         }
-        
-        // whether activated or not: set isChanging to true now
-        isChanging = true;
-        
-        if (!activated) {
-            // if not activated: we can only start sending events once activated
-            // hence returning here - after isChanging was set to true accordingly
-            
-            // note however, that if !activated, there should be no eventListeners yet
-            // all of them should be in unInitializedEventListeners at the moment
-            // waiting for activate() and handleNewTopologyView
-            logger.debug("handleChanging: not yet activated - ignoring.");
-            return;
+        if (newView==null) {
+            throw new IllegalArgumentException("newView must not be null");
         }
-        
-        if (previousView==null) {
-            // then nothing further to do - this is a very early changing event
-            // before even the first view was available
-            logger.debug("handleChanging: no previousView set - ignoring.");
-            return;
+        if (previousView.getInstances().size()!=newView.getInstances().size()) {
+            return false;
         }
-        
-        logger.debug("handleChanging: sending TOPOLOGY_CHANGING to initialized listeners");
-        previousView.setNotCurrent();
-        sendEvent(eventListeners, newChangingEvent(previousView));
-        logger.debug("handleChanging: end");
-    }
-    
-    /** Note: no synchronization done in ViewStateManager, <b>must</b> be done externally **/
-    public void handleNewView(BaseTopologyView newView) {
-        if (logger.isDebugEnabled()) {
-            logger.debug("handleNewView: start, newView={}", newView);
+        if (previousView.equals(newView)) {
+            return false;
         }
-        if (!newView.isCurrent()) {
-            logger.error("handleNewView: newView must be current");
-            throw new IllegalArgumentException("newView must be current");
+        Set<String> newIds = new HashSet<String>();
+        for(InstanceDescription newInstance : newView.getInstances()) {
+            newIds.add(newInstance.getSlingId());
         }
         
-        if (!isChanging) {
-            // verify if there is actually a change between previousView and newView
-            // if there isn't, then there is not much point in sending a CHANGING/CHANGED tuple
-            // at all
-            if (previousView!=null && previousView.equals(newView)) {
-                // then nothing to send - the view has not changed, and we haven't
-                // sent the CHANGING event - so we should not do anything here
-                logger.debug("handleNewView: we were not in changing state and new view matches old, so - ignoring");
-                return;
+        for(InstanceDescription oldInstance : previousView.getInstances()) {
+            if (!newIds.contains(oldInstance.getSlingId())) {
+                return false;
             }
-            logger.debug("handleNewView: simulating a handleChanging as we were not in changing state");
-            handleChanging();
-            logger.debug("handleNewView: simulation of a handleChanging done");
         }
+        return true;
+    }
 
-        // whether activated or not: set isChanging to false, first thing
-        isChanging = false;
-        
-        if (!activated) {
-            // then all we can do is to pass this on to previoueView
-            previousView = newView;
-            // other than that, we can't currently send any event, before activate
-            logger.debug("handleNewView: not yet activated - ignoring");
-            return;
-        }
+    private void doHandleConsistent(BaseTopologyView newView) {
+        logger.trace("doHandleConsistent: start");
         
+        // unset the isChanging flag
+        logger.trace("doHandleConsistent: setting isChanging to false");
+        isChanging = false;
+
         if (previousView==null) {
             // this is the first time handleNewTopologyView is called
             
             if (eventListeners.size()>0) {
-                logger.info("handleNewTopologyView: no previous view available even though listeners already got CHANGED event");
+                logger.info("doHandleConsistent: no previous view available even though listeners already got CHANGED event");
+            } else {
+                logger.debug("doHandleConsistent: no previous view and there are no event listeners yet. very quiet.");
             }
-            
+
             // otherwise this is the normal case where there are uninitialized event listeners waiting below
-                
+
         } else {
-            logger.debug("handleNewView: sending TOPOLOGY_CHANGED to initialized listeners");
+            logger.debug("doHandleConsistent: sending TOPOLOGY_CHANGED to initialized listeners");
             previousView.setNotCurrent();
-            sendEvent(eventListeners, newChangedEvent(previousView, newView));
+            enqueueForAll(eventListeners, EventFactory.newChangedEvent(previousView, newView));
         }
         
         if (unInitializedEventListeners.size()>0) {
             // then there were bindTopologyEventListener calls coming in while
             // we were in CHANGING state - so we must send those the INIT they were 
             // waiting for oh so long
-            if (logger.isDebugEnabled()) {
-                logger.debug("handleNewView: sending TOPOLOGY_INIT to uninitialized listeners ({})", 
-                        unInitializedEventListeners.size());
-            }
-            sendEvent(unInitializedEventListeners, newInitEvent(newView));
+            logger.debug("doHandleConsistent: sending TOPOLOGY_INIT to uninitialized listeners ({})", 
+                    unInitializedEventListeners.size());
+            enqueueForAll(unInitializedEventListeners, EventFactory.newInitEvent(newView));
             eventListeners.addAll(unInitializedEventListeners);
             unInitializedEventListeners.clear();
         }
         
+        logger.trace("doHandleConsistent: setting previousView to {}", newView);
         previousView = newView;
-        logger.debug("handleNewView: end");
+        logger.trace("doHandleConsistent: end");
+    }
+
+    /** get-hook for testing only! **/
+    AsyncEventSender getAsyncEventSender() {
+        return asyncEventSender;
     }
     
 }

Added: sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/spi/ConsistencyService.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/spi/ConsistencyService.java?rev=1707548&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/spi/ConsistencyService.java (added)
+++ sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/spi/ConsistencyService.java Thu Oct  8 14:31:45 2015
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.discovery.commons.providers.spi;
+
+import org.apache.sling.discovery.commons.providers.BaseTopologyView;
+
+/**
+ * The ConsistencyService can be used to establish strong
+ * consistency with the underlying (eventually consistent) repository in use.
+ * <p>
+ * The issue is described in length in SLING-4627 - the short
+ * version is composed of two different factors:
+ * <ul>
+ * <li>concurrency of discovery service and its listeners on the 
+ * different instances: upon a change in the topology it is 
+ * important that one listener doesn't do activity based on
+ * an older incarnation of the topologyView than another listener
+ * on another instance. they should change from one view to the
+ * next view based on the same repository state.</li>
+ * </li>
+ * <li>when an instance leaves the cluster (eg crashes), then 
+ * depending on the repository it might have left a backlog around
+ * which would yet have to be processed and which could contain
+ * relevant topology-dependent data that should be waited for
+ * to settle before the topology-dependent activity can continue
+ * </li>
+ * </ul>
+ * Both of these two aspects are handled by this ConsistencyService.
+ * The former one by introducing a 'sync token' that gets written
+ * to the repository and on receiving it by the peers they know
+ * that the writing instance is aware of the ongoing change, that
+ * the writing instance has sent out TOPOLOGY_CHANGING and that
+ * the receiving instance has seen all changes that the writing
+ * instance did prior to sending a TOPOLOGY_CHANGING.
+ * The latter aspect is achieved by making use of the underlying
+ * repository: eg on Oak the 'discovery lite' descriptor is
+ * used to determine if any instance not part of the new view
+ * is still being deactivated (eg has backlog). So this second
+ * part is repository dependent.
+ */
+public interface ConsistencyService {
+
+    /**
+     * Starts the synchronization process and calls the provided
+     * callback upon completion.
+     * <p>
+     * sync() is not thread-safe and should not be invoked 
+     * concurrently.
+     * <p>
+     * If sync() gets called before a previous invocation finished,
+     * that previous invocation will be discarded, ie the callback
+     * of the previous invocation will no longer be called.
+     * <p>
+     * The synchronization process consists of making sure that
+     * the repository has processed any potential backlog of instances
+     * that are no longer part of the provided, new view. Plus 
+     * it writes a 'sync-token' to a well-defined location, with
+     * all peers doing the same, and upon seeing all other sync-tokens
+     * declares successful completion - at which point it calls the
+     * callback.run().
+     * @param view the view which all instances in the local cluster
+     * should agree on having seen
+     * @param callback the runnable which should be called after
+     * successful syncing
+     */
+    void sync(BaseTopologyView view, Runnable callback);
+    
+}

Propchange: sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/spi/ConsistencyService.java
------------------------------------------------------------------------------
    svn:eol-style = native