You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by st...@apache.org on 2015/10/08 16:31:46 UTC

svn commit: r1707548 [2/4] - in /sling/trunk/bundles/extensions/discovery/commons: ./ src/main/java/org/apache/sling/discovery/commons/providers/ src/main/java/org/apache/sling/discovery/commons/providers/impl/ src/main/java/org/apache/sling/discovery/...

Added: sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/spi/impl/OakSyncTokenConsistencyService.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/spi/impl/OakSyncTokenConsistencyService.java?rev=1707548&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/spi/impl/OakSyncTokenConsistencyService.java (added)
+++ sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/spi/impl/OakSyncTokenConsistencyService.java Thu Oct  8 14:31:45 2015
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.discovery.commons.providers.spi.impl;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import javax.jcr.Session;
+
+import org.apache.sling.api.resource.LoginException;
+import org.apache.sling.api.resource.ModifiableValueMap;
+import org.apache.sling.api.resource.PersistenceException;
+import org.apache.sling.api.resource.Resource;
+import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.api.resource.ResourceResolverFactory;
+import org.apache.sling.api.resource.ValueMap;
+import org.apache.sling.commons.json.JSONArray;
+import org.apache.sling.commons.json.JSONException;
+import org.apache.sling.commons.json.JSONObject;
+import org.apache.sling.discovery.ClusterView;
+import org.apache.sling.discovery.InstanceDescription;
+import org.apache.sling.discovery.commons.providers.BaseTopologyView;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Inherits the 'sync-token' part from the SyncTokenConsistencyService
+ * and adds the 'wait while backlog' part to it, based on
+ * the Oak discovery-lite descriptor.
+ */
+public class OakSyncTokenConsistencyService extends SyncTokenConsistencyService {
+
+    private static final String IDMAP_PATH = "/var/discovery/commons/idmap";
+
+    static enum BacklogStatus {
+        UNDEFINED /* when there was an error retrieving the backlog status with oak */,
+        HAS_BACKLOG /* when oak's discovery lite descriptor indicated that there is still some backlog */,
+        NO_BACKLOG /* when oak's discovery lite descriptor declared we're backlog-free now */
+    }
+    
+    private final Logger logger = LoggerFactory.getLogger(getClass());
+
+    /** TODO: avoid hardcoding the constant here but use an Oak constant class instead if possible */
+    public static final String OAK_DISCOVERYLITE_CLUSTERVIEW = "oak.discoverylite.clusterview";
+
+    private boolean initialized = false;
+
+    private final long waitWhileBacklogTimeoutMillis;
+    
+    /**
+     * 
+     * @param resourceResolverFactory
+     * @param slingId the local slingId
+     * @param syncTokenTimeoutMillis timeout value in millis after which the
+     * sync-token process is cancelled - or -1 if no timeout should be used there
+     * @param waitWhileBacklogTimeoutMillis timeout value in millis after which
+     * the waiting-while-backlog should be cancelled - or -1 if no timeout should be 
+     * used there
+     * @throws LoginException when the login for initialization failed
+     * @throws JSONException when the descriptor wasn't proper json at init time
+     */
+    public OakSyncTokenConsistencyService(ResourceResolverFactory resourceResolverFactory,
+            String slingId, long syncTokenTimeoutMillis, long waitWhileBacklogTimeoutMillis) {
+        super(resourceResolverFactory, slingId, syncTokenTimeoutMillis);
+        this.waitWhileBacklogTimeoutMillis = waitWhileBacklogTimeoutMillis;
+        startBackgroundCheck("idmap-initializer", new BackgroundCheck() {
+            
+            @Override
+            public boolean check() {
+                return ensureInitialized();
+            }
+        }, null, -1);
+    }
+    
+    private boolean ensureInitialized() {
+        if (initialized) {
+            return true;
+        }
+        logger.info("ensureInitialized: initializing.");
+        try {
+            initialized = init();
+            return initialized;
+        } catch (LoginException e) {
+            logger.error("ensureInitialized: could not login: "+e, e);
+            return false;
+        } catch (JSONException e) {
+            logger.error("ensureInitialized: got JSONException: "+e, e);
+            return false;
+        } catch (PersistenceException e) {
+            logger.error("ensureInitialized: got PersistenceException: "+e, e);
+            return false;
+        }
+    }
+    
+    private boolean init() throws LoginException, JSONException, PersistenceException {
+        ResourceResolver resourceResolver = null;
+        try{
+            resourceResolver = getResourceResolver();
+            JSONObject descriptor = getDescriptor(resourceResolver);
+            if (descriptor == null) {
+                logger.info("init: could not yet get descriptor '"+OAK_DISCOVERYLITE_CLUSTERVIEW+"'!");
+                return false;
+            }
+            Object meObj = descriptor.get("me");
+            if (meObj == null || !(meObj instanceof Number)) {
+                logger.info("init: 'me' value of descriptor not a Number: "+meObj+" (descriptor: "+descriptor+")");
+                return false;
+            }
+            Number me = (Number)meObj;
+            final Resource resource = getOrCreateResource(resourceResolver, IDMAP_PATH);
+            ModifiableValueMap idmap = resource.adaptTo(ModifiableValueMap.class);
+            idmap.put(slingId, me.longValue());
+            resourceResolver.commit();
+            logger.info("init: mapped slingId="+slingId+" to discoveryLiteId="+me);
+            return true;
+        } finally {
+            if (resourceResolver!=null) {
+                resourceResolver.close();
+            }
+        }
+        
+    }
+    
+    @Override
+    public void sync(final BaseTopologyView view, final Runnable callback) {
+        // cancel the previous backgroundCheck if it's still running
+        cancelPreviousBackgroundCheck();
+
+        // first do the wait-for-backlog part
+        logger.info("sync: doing wait-for-backlog part for view="+view);
+        waitWhileBacklog(view, new Runnable() {
+
+            @Override
+            public void run() {
+                // when done, then do the sync-token part
+                logger.info("sync: doing sync-token part for view="+view);
+                syncToken(view, callback);
+            }
+            
+        });
+    }
+
+    private void waitWhileBacklog(final BaseTopologyView view, final Runnable runnable) {
+        // start backgroundChecking until the backlogStatus 
+        // is NO_BACKLOG
+        startBackgroundCheck("OakSyncTokenConsistencyService-waitWhileBacklog", new BackgroundCheck() {
+            
+            @Override
+            public boolean check() {
+                if (!ensureInitialized()) {
+                    logger.info("waitWhileBacklog: could not initialize...");
+                    return false;
+                }
+                BacklogStatus backlogStatus = getBacklogStatus(view);
+                if (backlogStatus == BacklogStatus.NO_BACKLOG) {
+                    logger.info("waitWhileBacklog: no backlog (anymore), done.");
+                    return true;
+                } else {
+                    logger.info("waitWhileBacklog: backlogStatus still "+backlogStatus);
+                    return false;
+                }
+            }
+        }, runnable, waitWhileBacklogTimeoutMillis);
+    }
+    
+    private BacklogStatus getBacklogStatus(BaseTopologyView view) {
+        logger.trace("getBacklogStatus: start");
+        ResourceResolver resourceResolver = null;
+        try{
+            resourceResolver = getResourceResolver();
+            JSONObject descriptor = getDescriptor(resourceResolver);
+            if (descriptor == null) {
+                logger.warn("getBacklogStatus: could not get descriptor '"+OAK_DISCOVERYLITE_CLUSTERVIEW+"'!");
+                return BacklogStatus.UNDEFINED;
+            }
+            // backlog-free means:
+            // 1) 'deactivating' must be empty 
+            //     (otherwise we indeed have a backlog)
+            // 2) all active ids of the descriptor must have a mapping to slingIds
+            //     (otherwise the init failed or is pending for some instance(s))
+            // 3) all 'active' instances must be in the view 
+            //     (otherwise discovery lite might not yet consider 
+            //     an instance dead but discovery-service does)
+            // instead what is fine from a backlog point of view
+            // * instances in the view but listed as 'inactive'
+            //     (this might be the case for just-started instances)
+            // * instances in the view but not contained in the descriptor at all
+            //     (this might be the case for just-started instances)
+            
+            Object activeObj = descriptor.get("active");
+            JSONArray active = (JSONArray) activeObj;
+            Object deactivatingObj = descriptor.get("deactivating");
+            JSONArray deactivating = (JSONArray) deactivatingObj;
+            // we're not worried about 'inactive' ones - as that could
+            // be a larger list filled with legacy entries too
+            // plus once the instance is inactive there's no need to 
+            // check anything further - that one is then backlog-free
+            
+            // 1) 'deactivating' must be empty 
+            if (deactivating.length()!=0) {
+                logger.info("getBacklogStatus: there are deactivating instances: "+deactivating);
+                return BacklogStatus.HAS_BACKLOG;
+            }
+
+            Resource resource = getOrCreateResource(resourceResolver, IDMAP_PATH);
+            ValueMap idmapValueMap = resource.adaptTo(ValueMap.class);
+            ClusterView cluster = view.getLocalInstance().getClusterView();
+            Set<String> slingIds = new HashSet<String>();
+            for (InstanceDescription instance : cluster.getInstances()) {
+                slingIds.add(instance.getSlingId());
+            }
+            Map<Long, String> idmap = new HashMap<Long, String>();
+            for (String slingId : idmapValueMap.keySet()) {
+                Object value = idmapValueMap.get(slingId);
+                if (value instanceof Number) {
+                    Number number = (Number)value;
+                    idmap.put(number.longValue(), slingId);
+                }
+            }
+            
+            for(int i=0; i<active.length(); i++) {
+                Number activeId = (Number) active.get(i);
+                String slingId = idmap.get(activeId.longValue());
+                // 2) all ids of the descriptor must have a mapping to slingIds
+                if (slingId == null) {
+                    logger.info("getBacklogStatus: no slingId found for active id: "+activeId);
+                    return BacklogStatus.UNDEFINED;
+                }
+                // 3) all 'active' instances must be in the view
+                if (!slingIds.contains(slingId)) {
+                    logger.info("getBacklogStatus: active instance's ("+activeId+") slingId ("+slingId+") not found in cluster ("+cluster+")");
+                    return BacklogStatus.HAS_BACKLOG;
+                }
+            }
+
+            logger.info("getBacklogStatus: no backlog (anymore)");
+            return BacklogStatus.NO_BACKLOG;
+        } catch (LoginException e) {
+            logger.error("getBacklogStatus: could not login: "+e, e);
+            return BacklogStatus.UNDEFINED;
+        } catch (JSONException e) {
+            logger.error("getBacklogStatus: got JSONException: "+e, e);
+            return BacklogStatus.UNDEFINED;
+        } catch (PersistenceException e) {
+            logger.error("getBacklogStatus: got PersistenceException: "+e, e);
+            return BacklogStatus.UNDEFINED;
+        } finally {
+            logger.trace("getBacklogStatus: end");
+            if (resourceResolver!=null) {
+                resourceResolver.close();
+            }
+        }
+    }
+    
+    /**
+     * {"seq":8,"final":true,"id":"aae34e9a-b08d-409e-be10-9ff4106e5387","me":4,"active":[4],"deactivating":[],"inactive":[1,2,3]}
+     */
+    private JSONObject getDescriptor(ResourceResolver resourceResolver) throws JSONException {
+        Session session = resourceResolver.adaptTo(Session.class);
+        if (session == null) {
+            return null;
+        }
+        String descriptorStr = session.getRepository().getDescriptor(OAK_DISCOVERYLITE_CLUSTERVIEW);
+        if (descriptorStr == null) {
+            return null;
+        }
+        JSONObject descriptor = new JSONObject(descriptorStr);
+        return descriptor;
+    }
+
+}

Propchange: sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/spi/impl/OakSyncTokenConsistencyService.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/spi/impl/SyncTokenConsistencyService.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/spi/impl/SyncTokenConsistencyService.java?rev=1707548&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/spi/impl/SyncTokenConsistencyService.java (added)
+++ sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/spi/impl/SyncTokenConsistencyService.java Thu Oct  8 14:31:45 2015
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.discovery.commons.providers.spi.impl;
+
+import org.apache.sling.api.resource.LoginException;
+import org.apache.sling.api.resource.ModifiableValueMap;
+import org.apache.sling.api.resource.PersistenceException;
+import org.apache.sling.api.resource.Resource;
+import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.api.resource.ResourceResolverFactory;
+import org.apache.sling.api.resource.ResourceUtil;
+import org.apache.sling.api.resource.ValueMap;
+import org.apache.sling.discovery.InstanceDescription;
+import org.apache.sling.discovery.commons.providers.BaseTopologyView;
+import org.apache.sling.discovery.commons.providers.spi.ConsistencyService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implements the 'sync-token' part of the ConsistencyService,
+ * but not the 'wait while backlog' part (which is left to subclasses
+ * if needed).
+ */
+public class SyncTokenConsistencyService implements ConsistencyService {
+
+    private final Logger logger = LoggerFactory.getLogger(getClass());
+
+    private static final String SYNCTOKEN_PATH = "/var/discovery/commons/synctokens";
+
+    private static final String DEFAULT_RESOURCE_TYPE = "sling:Folder";
+
+    protected static Resource getOrCreateResource(
+            final ResourceResolver resourceResolver, final String path)
+            throws PersistenceException {
+        return ResourceUtil.getOrCreateResource(resourceResolver, path,
+                DEFAULT_RESOURCE_TYPE, DEFAULT_RESOURCE_TYPE, true);
+    }
+
+    private final class BackgroundCheckRunnable implements Runnable {
+        private final Runnable callback;
+        private final BackgroundCheck check;
+        private final long timeoutMillis;
+        private volatile boolean cancelled;
+        private final String threadName;
+        
+        // for testing only:
+        private final Object waitObj = new Object();
+        private int waitCnt;
+        private volatile boolean done;
+
+        private BackgroundCheckRunnable(Runnable callback, 
+                BackgroundCheck check, long timeoutMillis, String threadName) {
+            this.callback = callback;
+            this.check = check;
+            this.timeoutMillis = timeoutMillis;
+            this.threadName = threadName;
+        }
+
+        @Override
+        public void run() {
+            logger.debug("backgroundCheck.run: start");
+            long start = System.currentTimeMillis();
+            try{
+                while(!cancelled()) {
+                    if (check.check()) {
+                        if (callback != null) {
+                            callback.run();
+                        }
+                        return;
+                    }
+                    if (timeoutMillis != -1 && 
+                            (System.currentTimeMillis() > start + timeoutMillis)) {
+                        if (callback == null) {
+                            logger.info("backgroundCheck.run: timeout hit (no callback to invoke)");
+                        } else {
+                            logger.info("backgroundCheck.run: timeout hit, invoking callback.");
+                            callback.run();
+                        }
+                        return;
+                    }
+                    logger.debug("backgroundCheck.run: waiting another sec.");
+                    synchronized(waitObj) {
+                        waitCnt++;
+                        try {
+                            waitObj.notify();
+                            waitObj.wait(1000);
+                        } catch (InterruptedException e) {
+                            logger.info("backgroundCheck.run: got interrupted");
+                        }
+                    }
+                }
+                logger.debug("backgroundCheck.run: this run got cancelled. {}", check);
+            } catch(RuntimeException re) {
+                logger.error("backgroundCheck.run: RuntimeException: "+re, re);
+                // nevertheless calling runnable.run in this case
+                if (callback != null) {
+                    logger.info("backgroundCheck.run: RuntimeException -> invoking callback");
+                    callback.run();
+                }
+                throw re;
+            } catch(Error er) {
+                logger.error("backgroundCheck.run: Error: "+er, er);
+                // not calling runnable.run in this case!
+                // since Error is typically severe
+                logger.info("backgroundCheck.run: NOT invoking callback");
+                throw er;
+            } finally {
+                logger.debug("backgroundCheck.run: end");
+                synchronized(waitObj) {
+                    done = true;
+                    waitObj.notify();
+                }
+            }
+        }
+        
+        boolean cancelled() {
+            return cancelled;
+        }
+
+        void cancel() {
+            logger.info("cancel: "+threadName);
+            cancelled = true;
+        }
+
+        public void triggerCheck() {
+            synchronized(waitObj) {
+                int waitCntAtStart = waitCnt;
+                waitObj.notify();
+                while(!done && waitCnt<=waitCntAtStart) {
+                    try {
+                        waitObj.wait();
+                    } catch (InterruptedException e) {
+                        throw new RuntimeException("got interrupted");
+                    }
+                }
+            }
+        }
+    }
+
+    interface BackgroundCheck {
+        
+        boolean check();
+        
+    }
+    
+    protected final ResourceResolverFactory resourceResolverFactory;
+
+    protected final String slingId;
+
+    private final long syncTokenTimeoutMillis;
+
+    protected BackgroundCheckRunnable backgroundCheckRunnable;
+    
+    public SyncTokenConsistencyService(ResourceResolverFactory resourceResolverFactory,
+            String slingId, long syncTokenTimeoutMillis) {
+        if (resourceResolverFactory == null) {
+            throw new IllegalArgumentException("resourceResolverFactory must not be null");
+        }
+        if (slingId == null || slingId.length() == 0) {
+            throw new IllegalArgumentException("slingId must not be null or empty: "+slingId);
+        }
+        this.slingId = slingId;
+        this.resourceResolverFactory = resourceResolverFactory;
+        this.syncTokenTimeoutMillis = syncTokenTimeoutMillis;
+    }
+    
+    protected void cancelPreviousBackgroundCheck() {
+        BackgroundCheckRunnable current = backgroundCheckRunnable;
+        if (current!=null) {
+            current.cancel();
+            // leave backgroundCheckRunnable field as is
+            // as that does not represent a memory leak
+            // nor is it a problem to invoke cancel multiple times
+            // but properly synchronizing on just setting backgroundCheckRunnable
+            // back to null is error-prone and overkill
+        }
+    }
+    
+    protected void startBackgroundCheck(String threadName, final BackgroundCheck check, final Runnable callback, final long timeoutMillis) {
+        // cancel the current one if it's still running
+        cancelPreviousBackgroundCheck();
+        
+        if (check.check()) {
+            // then we're not even going to start the background-thread
+            // we're already done
+            logger.info("backgroundCheck: already done, backgroundCheck successful, invoking callback");
+            callback.run();
+            return;
+        }
+        logger.info("backgroundCheck: spawning background-thread for '"+threadName+"'");
+        backgroundCheckRunnable = new BackgroundCheckRunnable(callback, check, timeoutMillis, threadName);
+        Thread th = new Thread(backgroundCheckRunnable);
+        th.setName(threadName);
+        th.setDaemon(true);
+        th.start();
+    }
+    
+    /** Get or create a ResourceResolver **/
+    protected ResourceResolver getResourceResolver() throws LoginException {
+        return resourceResolverFactory.getAdministrativeResourceResolver(null);
+    }
+    
+    @Override
+    public void sync(BaseTopologyView view, Runnable callback) {
+        // cancel the previous background-check if it's still running
+        cancelPreviousBackgroundCheck();
+
+        syncToken(view, callback);
+        // this implementation doesn't support wait-for-backlog, so
+        // the above doSyncTokenPart will already terminate with invoking the callback
+    }
+
+    protected void syncToken(final BaseTopologyView view, final Runnable callback) {
+        // 1) first storing my syncToken
+        try {
+            storeMySyncToken(view.getLocalClusterSyncTokenId());
+        } catch (LoginException e) {
+            logger.error("syncToken: will run into timeout: could not login for storing my syncToken: "+e, e);
+        } catch (PersistenceException e) {
+            logger.error("syncToken: will run into timeout: got PersistenceException while storing my syncToken: "+e, e);
+        }
+        // if anything goes wrong above, then this will mean for the others
+        // that they will have to wait until the timeout hits
+        // which means we should do the same..
+        // hence no further action possible on error above
+        
+        // 2) then check if all others have done the same already
+        startBackgroundCheck("SyncTokenConsistencyService", new BackgroundCheck() {
+            
+            @Override
+            public boolean check() {
+                return seenAllSyncTokens(view);
+            }
+        }, callback, syncTokenTimeoutMillis);
+    }
+
+    private void storeMySyncToken(String syncTokenId) throws LoginException, PersistenceException {
+        logger.trace("storeMySyncToken: start");
+        ResourceResolver resourceResolver = null;
+        try{
+            resourceResolver = getResourceResolver();
+            final Resource resource = getOrCreateResource(resourceResolver, SYNCTOKEN_PATH);
+            ModifiableValueMap syncTokens = resource.adaptTo(ModifiableValueMap.class);
+            Object currentValue = syncTokens.get(slingId);
+            if (currentValue == null || !syncTokenId.equals(currentValue)) {
+                syncTokens.put(slingId, syncTokenId);
+            }
+            resourceResolver.commit();
+            logger.info("syncToken: stored syncToken of slingId="+slingId+" as="+syncTokenId);
+        } finally {
+            logger.trace("storeMySyncToken: end");
+            if (resourceResolver!=null) {
+                resourceResolver.close();
+            }
+        }
+    }
+
+    private boolean seenAllSyncTokens(BaseTopologyView view) {
+        logger.trace("seenAllSyncTokens: start");
+        ResourceResolver resourceResolver = null;
+        try{
+            resourceResolver = getResourceResolver();
+            Resource resource = getOrCreateResource(resourceResolver, SYNCTOKEN_PATH);
+            ValueMap syncTokens = resource.adaptTo(ValueMap.class);
+            String syncToken = view.getLocalClusterSyncTokenId();
+            
+            for (InstanceDescription instance : view.getLocalInstance().getClusterView().getInstances()) {
+                Object currentValue = syncTokens.get(instance.getSlingId());
+                if (currentValue == null) {
+                    logger.info("seenAllSyncTokens: no syncToken of "+instance);
+                    return false;
+                }
+                if (!syncToken.equals(currentValue)) {
+                    logger.info("seenAllSyncTokens: old syncToken of " + instance
+                            + " : expected=" + syncToken + " got="+currentValue);
+                    return false;
+                }
+            }
+            
+            resourceResolver.commit();
+            logger.info("seenAllSyncTokens: seen all syncTokens!");
+            return true;
+        } catch (LoginException e) {
+            logger.error("seenAllSyncTokens: could not login: "+e, e);
+            return false;
+        } catch (PersistenceException e) {
+            logger.error("seenAllSyncTokens: got PersistenceException: "+e, e);
+            return false;
+        } finally {
+            logger.trace("seenAllSyncTokens: end");
+            if (resourceResolver!=null) {
+                resourceResolver.close();
+            }
+        }
+    }
+
+    /** for testing only! **/
+    protected void triggerBackgroundCheck() {
+        BackgroundCheckRunnable backgroundOp = backgroundCheckRunnable;
+        if (backgroundOp!=null) {
+            backgroundOp.triggerCheck();
+        }
+    }
+}

Propchange: sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/spi/impl/SyncTokenConsistencyService.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/impl/ClusterTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/impl/ClusterTest.java?rev=1707548&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/impl/ClusterTest.java (added)
+++ sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/impl/ClusterTest.java Thu Oct  8 14:31:45 2015
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.discovery.commons.providers.impl;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.sling.discovery.TopologyEvent;
+import org.apache.sling.discovery.TopologyEvent.Type;
+import org.apache.sling.discovery.commons.providers.BaseTopologyView;
+import org.apache.sling.discovery.commons.providers.spi.ConsistencyService;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ClusterTest {
+    
+    private final Logger logger = LoggerFactory.getLogger(this.getClass());
+
+    private List<ViewStateManagerImpl> mgrList;
+    
+    private Random defaultRandom;
+
+    @Before
+    public void setup() throws Exception {
+        mgrList = new LinkedList<ViewStateManagerImpl>();
+        defaultRandom = new Random(1234123412); // I want randomness yes, but deterministic, for some methods at least
+    }
+    
+    @After
+    public void teardown() throws Exception {
+        mgrList = null;
+        defaultRandom= null;
+    }
+    
+    private ViewStateManagerImpl newMgr() {
+        ViewStateManagerImpl mgr = new ViewStateManagerImpl(new ReentrantLock(), new ConsistencyService() {
+            
+            public void sync(BaseTopologyView view, Runnable callback) {
+                callback.run();
+            }
+        });
+        mgrList.add(mgr);
+        return mgr;
+    }
+    
+    private void waitForInflightEvents(ViewStateManagerImpl mgr) throws InterruptedException {
+        if (mgr==null) {
+            throw new IllegalArgumentException("mgr must not be null");
+        }
+        if (mgr.getAsyncEventSender()==null) {
+            logger.info("waitForInflightEvents: mgr not yet activated...");
+            return;
+        }
+        while(mgr.getAsyncEventSender().hasInFlightEvent()) {
+            logger.info("waitForInflightEvents: waiting 10ms...");
+            Thread.sleep(10);
+        }
+    }
+    
+    private void assertCountEvents(ViewStateManagerImpl mgr, Listener l, TopologyEvent.Type... types) throws InterruptedException {
+        waitForInflightEvents(mgr);
+        assertEquals(types.length, l.countEvents());
+        Iterator<TopologyEvent> it = l.getEvents().iterator();
+        int i=0;
+        while(it.hasNext() && (i<types.length)) {
+            TopologyEvent expectedEvent = it.next();
+            Type gotType = types[i++];
+            assertEquals(expectedEvent.getType(), gotType);
+        }
+        if (it.hasNext()) {
+            StringBuffer additionalTypes = new StringBuffer();
+            while(it.hasNext()) {
+                additionalTypes.append(",");
+                additionalTypes.append(it.next().getType());
+            }
+            fail("got more events than expected : "+additionalTypes);
+        }
+        if (i<types.length) {
+            StringBuffer additionalTypes = new StringBuffer();
+            while(i<types.length) {
+                additionalTypes.append(",");
+                additionalTypes.append(types[i++]);
+            }
+            fail("did not get all events, also expected : "+additionalTypes);
+        }
+    }
+
+    private void fail(String string) {
+        // TODO Auto-generated method stub
+        
+    }
+
+    @Test
+    public void testTwoNodes() throws Exception {
+        final ViewStateManagerImpl mgr1 = newMgr();
+        final String slingId1 = UUID.randomUUID().toString();
+        final ViewStateManagerImpl mgr2 = newMgr();
+        final String slingId2 = UUID.randomUUID().toString();
+        
+        // bind l1
+        Listener l1 = new Listener();
+        mgr1.bind(l1);
+        assertCountEvents(mgr1, l1);
+        
+        // bind l2
+        Listener l2 = new Listener();
+        mgr2.bind(l2);
+        assertCountEvents(mgr2, l2);
+        
+        // fiddle with l1 - without any events expected to be sent
+        mgr1.handleChanging();
+        assertCountEvents(mgr1, l1);
+        mgr1.handleActivated();
+        assertCountEvents(mgr1, l1);
+        mgr1.handleChanging();
+        assertCountEvents(mgr1, l1);
+
+        // fiddle with l2 - without any events expected to be sent
+        mgr2.handleChanging();
+        assertCountEvents(mgr2, l2);
+        mgr2.handleActivated();
+        assertCountEvents(mgr2, l2);
+        mgr2.handleChanging();
+        assertCountEvents(mgr2, l2);
+        
+        // call handleNewView with not-current views first...
+        BaseTopologyView vA1 = TestHelper.newView(false, slingId1, slingId1, slingId1, slingId2);
+        mgr1.handleNewView(vA1);
+        assertCountEvents(mgr1, l1);
+        assertCountEvents(mgr2, l2);
+        BaseTopologyView vB1 = TestHelper.newView(false, slingId1, slingId2, slingId1, slingId2);
+        mgr2.handleNewView(vB1);
+        assertCountEvents(mgr1, l1);
+        assertCountEvents(mgr2, l2);
+        
+        // then call handleNewView with a current view - that should now sent the INIT
+        BaseTopologyView vA2 = TestHelper.newView(true, slingId1, slingId1, slingId1, slingId2);
+        mgr1.handleNewView(vA2);
+        assertCountEvents(mgr1, l1, Type.TOPOLOGY_INIT);
+        assertCountEvents(mgr2, l2);
+        BaseTopologyView vB2 = TestHelper.newView(true, slingId1, slingId2, slingId1, slingId2);
+        mgr2.handleNewView(vB2);
+        assertCountEvents(mgr1, l1, Type.TOPOLOGY_INIT);
+        assertCountEvents(mgr2, l2, Type.TOPOLOGY_INIT);
+        
+        // now let instance1 get decoupled from the cluster (pseudo-network-partitioning)
+        BaseTopologyView vB3 = TestHelper.newView(true, slingId2, slingId2, slingId2);
+        mgr2.handleNewView(vB3);
+        assertCountEvents(mgr1, l1, Type.TOPOLOGY_INIT);
+        assertCountEvents(mgr2, l2, Type.TOPOLOGY_INIT, Type.TOPOLOGY_CHANGING, Type.TOPOLOGY_CHANGED);
+    
+        // now let instance1 take note of this decoupling
+        mgr1.handleChanging();
+        assertCountEvents(mgr1, l1, Type.TOPOLOGY_INIT, Type.TOPOLOGY_CHANGING);
+        assertCountEvents(mgr2, l2, Type.TOPOLOGY_INIT, Type.TOPOLOGY_CHANGING, Type.TOPOLOGY_CHANGED);
+        
+        // and now let instance1 rejoin
+        BaseTopologyView vA4 = TestHelper.newView(true, slingId2, slingId1, slingId1, slingId2);
+        mgr1.handleNewView(vA4);
+        assertCountEvents(mgr1, l1, Type.TOPOLOGY_INIT, Type.TOPOLOGY_CHANGING, Type.TOPOLOGY_CHANGED);
+        assertCountEvents(mgr2, l2, Type.TOPOLOGY_INIT, Type.TOPOLOGY_CHANGING, Type.TOPOLOGY_CHANGED);
+        BaseTopologyView vB4 = TestHelper.newView(true, slingId2, slingId2, slingId1, slingId2);
+        mgr2.handleNewView(vA4);
+        assertCountEvents(mgr1, l1, Type.TOPOLOGY_INIT, Type.TOPOLOGY_CHANGING, Type.TOPOLOGY_CHANGED);
+        assertCountEvents(mgr2, l2, Type.TOPOLOGY_INIT, Type.TOPOLOGY_CHANGING, Type.TOPOLOGY_CHANGED, Type.TOPOLOGY_CHANGING, Type.TOPOLOGY_CHANGED);
+    }
+
+}

Propchange: sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/impl/ClusterTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/impl/Listener.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/impl/Listener.java?rev=1707548&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/impl/Listener.java (added)
+++ sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/impl/Listener.java Thu Oct  8 14:31:45 2015
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.discovery.commons.providers.impl;
+
+import static org.junit.Assert.fail;
+
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.sling.discovery.TopologyEvent;
+import org.apache.sling.discovery.TopologyEventListener;
+import org.apache.sling.discovery.commons.providers.BaseTopologyView;
+
+public class Listener implements TopologyEventListener {
+
+    private List<TopologyEvent> events = new LinkedList<TopologyEvent>();
+    private TopologyEvent lastEvent;
+    
+    public synchronized void handleTopologyEvent(TopologyEvent event) {
+        events.add(event);
+        lastEvent = event;
+    }
+    
+    public synchronized List<TopologyEvent> getEvents() {
+        return Collections.unmodifiableList(events);
+    }
+
+    public synchronized int countEvents() {
+        return events.size();
+    }
+    
+    public synchronized TopologyEvent getLastEvent() {
+        return lastEvent;
+    }
+
+    public synchronized void clearEvents() {
+        events.clear();
+    }
+
+    public BaseTopologyView getLastView() {
+        if (lastEvent==null) {
+            return null;
+        } else {
+            switch(lastEvent.getType()) {
+            case TOPOLOGY_INIT:
+            case PROPERTIES_CHANGED:
+            case TOPOLOGY_CHANGED: {
+                return (BaseTopologyView) lastEvent.getNewView();
+            }
+            case TOPOLOGY_CHANGING:{
+                return (BaseTopologyView) lastEvent.getOldView();
+            }
+            default: {
+                fail("no other types supported yet");
+            }
+            }
+        }
+        return null;
+    }
+    
+}
\ No newline at end of file

Propchange: sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/impl/Listener.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/impl/SimpleClusterView.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/impl/SimpleClusterView.java?rev=1707548&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/impl/SimpleClusterView.java (added)
+++ sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/impl/SimpleClusterView.java Thu Oct  8 14:31:45 2015
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.discovery.commons.providers.impl;
+
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.sling.discovery.ClusterView;
+import org.apache.sling.discovery.InstanceDescription;
+
+public class SimpleClusterView implements ClusterView {
+
+    private final String id;
+    private List<InstanceDescription> instances = new LinkedList<InstanceDescription>();
+
+    public SimpleClusterView(String id) {
+        this.id = id;
+    }
+    
+    @Override
+    public String getId() {
+        return id;
+    }
+    
+    public void addInstanceDescription(InstanceDescription id) {
+        instances.add(id);
+    }
+
+    public boolean removeInstanceDescription(InstanceDescription id) {
+        return instances.remove(id);
+    }
+
+    @Override
+    public List<InstanceDescription> getInstances() {
+        return Collections.unmodifiableList(instances);
+    }
+
+    @Override
+    public InstanceDescription getLeader() {
+        for (InstanceDescription instanceDescription : instances) {
+            if (instanceDescription.isLeader()) {
+                return instanceDescription;
+            }
+        }
+        throw new IllegalStateException("no leader");
+    }
+
+}

Propchange: sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/impl/SimpleClusterView.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/impl/SimpleDiscoveryService.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/impl/SimpleDiscoveryService.java?rev=1707548&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/impl/SimpleDiscoveryService.java (added)
+++ sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/impl/SimpleDiscoveryService.java Thu Oct  8 14:31:45 2015
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.discovery.commons.providers.impl;
+
+import org.apache.sling.discovery.DiscoveryService;
+import org.apache.sling.discovery.TopologyView;
+import org.apache.sling.discovery.commons.providers.BaseTopologyView;
+
+public class SimpleDiscoveryService implements DiscoveryService {
+
+    private BaseTopologyView topologyView;
+
+    public void setTopoology(BaseTopologyView topologyView) {
+        this.topologyView = topologyView;
+    }
+    
+    @Override
+    public TopologyView getTopology() {
+        return topologyView;
+    }
+
+}

Propchange: sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/impl/SimpleDiscoveryService.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/impl/SimpleInstanceDescription.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/impl/SimpleInstanceDescription.java?rev=1707548&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/impl/SimpleInstanceDescription.java (added)
+++ sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/impl/SimpleInstanceDescription.java Thu Oct  8 14:31:45 2015
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.discovery.commons.providers.impl;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.sling.discovery.ClusterView;
+import org.apache.sling.discovery.InstanceDescription;
+
+public class SimpleInstanceDescription implements InstanceDescription {
+
+    private ClusterView clusterView;
+    private final boolean isLeader;
+    private final boolean isLocal;
+    private final String slingId;
+    private final Map<String, String> properties;
+
+    public SimpleInstanceDescription(boolean isLeader, boolean isLocal, String slingId,
+            Map<String, String> properties) {
+        this.isLeader = isLeader;
+        this.isLocal = isLocal;
+        this.slingId = slingId;
+        this.properties = properties;
+    }
+    
+    @Override
+    public String toString() {
+        return "Instance["+slingId+"]";
+    }
+    
+    @Override
+    public int hashCode() {
+        return slingId.hashCode() + (isLeader?0:1) + (isLocal?0:1);
+    }
+    
+    @Override
+    public boolean equals(Object obj) {
+        if (!(obj instanceof SimpleInstanceDescription)) {
+            return false;
+        }
+        SimpleInstanceDescription other = (SimpleInstanceDescription) obj;
+        if (!slingId.equals(other.slingId)) {
+            return false;
+        }
+        if (isLeader!=other.isLeader) {
+            return false;
+        }
+        if (isLocal!=other.isLocal) {
+            return false;
+        }
+        return true;
+    }
+
+    public void setClusterView(ClusterView clusterView) {
+        this.clusterView = clusterView;
+    }
+    
+    @Override
+    public ClusterView getClusterView() {
+        return clusterView;
+    }
+
+    @Override
+    public boolean isLeader() {
+        return isLeader;
+    }
+
+    @Override
+    public boolean isLocal() {
+        return isLocal;
+    }
+
+    @Override
+    public String getSlingId() {
+        return slingId;
+    }
+
+    @Override
+    public String getProperty(String name) {
+        return properties.get(name);
+    }
+
+    @Override
+    public Map<String, String> getProperties() {
+        if (properties==null) {
+            return new HashMap<String, String>();
+        }
+        return new HashMap<String,String>(properties);
+    }
+
+}

Propchange: sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/impl/SimpleInstanceDescription.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/impl/SimpleScheduler.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/impl/SimpleScheduler.java?rev=1707548&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/impl/SimpleScheduler.java (added)
+++ sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/impl/SimpleScheduler.java Thu Oct  8 14:31:45 2015
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.discovery.commons.providers.impl;
+
+import java.io.Serializable;
+import java.util.Date;
+import java.util.Map;
+import java.util.NoSuchElementException;
+
+import org.apache.sling.commons.scheduler.Scheduler;
+
+public class SimpleScheduler implements Scheduler {
+
+    private boolean failMode;
+
+    @Override
+    public void addJob(String name, Object job, Map<String, Serializable> config, String schedulingExpression,
+            boolean canRunConcurrently) throws Exception {
+        throw new IllegalStateException("not yet impl");
+    }
+
+    @Override
+    public void addPeriodicJob(String name, Object job, Map<String, Serializable> config, long period, boolean canRunConcurrently)
+            throws Exception {
+        throw new IllegalStateException("not yet impl");
+    }
+
+    @Override
+    public void addPeriodicJob(String name, Object job, Map<String, Serializable> config, long period, boolean canRunConcurrently,
+            boolean startImmediate) throws Exception {
+        throw new IllegalStateException("not yet impl");
+    }
+
+    @Override
+    public void fireJob(Object job, Map<String, Serializable> config) throws Exception {
+        throw new IllegalStateException("not yet impl");
+    }
+
+    @Override
+    public boolean fireJob(Object job, Map<String, Serializable> config, int times, long period) {
+        throw new IllegalStateException("not yet impl");
+    }
+
+    @Override
+    public void fireJobAt(String name, final Object job, Map<String, Serializable> config, final Date date) throws Exception {
+        if (!(job instanceof Runnable)) {
+            throw new IllegalArgumentException("only runnables supported for now");
+        }
+        final Runnable j = (Runnable)job;
+        Runnable r = new Runnable() {
+
+            @Override
+            public void run() {
+                while (System.currentTimeMillis()<date.getTime()) {
+                    try {
+                        Thread.sleep(10);
+                    } catch (InterruptedException e) {
+                        e.printStackTrace();
+                        Thread.yield();
+                    }
+                }
+                j.run();
+            }
+            
+        };
+        async(r, name);
+    }
+
+    private void async(Runnable r, String name) {
+        if (failMode) {
+            throw new IllegalStateException("failMode");
+        }
+        Thread th = new Thread(r);
+        th.setName("async test thread for "+name);
+        th.setDaemon(true);
+        th.start();
+    }
+
+    @Override
+    public boolean fireJobAt(String name, Object job, Map<String, Serializable> config, Date date, int times, long period) {
+        throw new IllegalStateException("not yet impl");
+    }
+
+    @Override
+    public void removeJob(String name) throws NoSuchElementException {
+        throw new IllegalStateException("not yet impl");
+    }
+
+    public void failMode() {
+        failMode = true;
+    }
+
+}

Propchange: sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/impl/SimpleScheduler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/impl/SimpleTopologyView.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/impl/SimpleTopologyView.java?rev=1707548&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/impl/SimpleTopologyView.java (added)
+++ sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/impl/SimpleTopologyView.java Thu Oct  8 14:31:45 2015
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.discovery.commons.providers.impl;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.sling.discovery.ClusterView;
+import org.apache.sling.discovery.InstanceDescription;
+import org.apache.sling.discovery.InstanceFilter;
+import org.apache.sling.discovery.commons.providers.BaseTopologyView;
+
+public class SimpleTopologyView extends BaseTopologyView {
+
+    private List<InstanceDescription> instances = new LinkedList<InstanceDescription>();
+
+    private final String id;
+
+    public SimpleTopologyView() {
+        id = UUID.randomUUID().toString();
+    }
+    
+    public SimpleTopologyView(String id) {
+        this.id = id;
+    }
+    
+    @Override
+    public boolean equals(Object obj) {
+        if (!(obj instanceof SimpleTopologyView)) {
+            return false;
+        }
+        final SimpleTopologyView other = (SimpleTopologyView) obj;
+        if (this==other) {
+            return true;
+        }
+        if (!id.equals(other.id)) {
+            return false;
+        }
+        if (this.instances.size()!=other.instances.size()) {
+            return false;
+        }
+        for (Iterator<InstanceDescription> it = instances.iterator(); it.hasNext();) {
+            InstanceDescription instanceDescription = (InstanceDescription) it
+                    .next();
+            boolean found = false;
+            for (Iterator<?> it2 = other.instances.iterator(); it2
+                    .hasNext();) {
+                InstanceDescription otherId = (InstanceDescription) it2
+                        .next();
+                if (instanceDescription.equals(otherId)) {
+                    found = true;
+                    break;
+                }
+            }
+            if (!found) {
+                return false;
+            }
+        }
+        return true;
+    }
+    
+    @Override
+    public int hashCode() {
+        int c=0;
+        for (Iterator<InstanceDescription> it = instances.iterator(); it.hasNext();) {
+            InstanceDescription instanceDescription = (InstanceDescription) it
+                    .next();
+            c+=instanceDescription.hashCode();
+        }
+        return c;
+    }
+
+    public void addInstanceDescription(InstanceDescription id) {
+        instances.add(id);
+    }
+    
+    @Override
+    public InstanceDescription getLocalInstance() {
+        InstanceDescription result = null;
+        for (Iterator<InstanceDescription> it = instances.iterator(); it.hasNext();) {
+            InstanceDescription instanceDescription = (InstanceDescription) it
+                    .next();
+            if (instanceDescription.isLocal()) {
+                if (result!=null) {
+                    throw new IllegalStateException("multiple local instances");
+                }
+                result = instanceDescription;
+            }
+        }
+        if (result==null) {
+            throw new IllegalStateException("no local instance found");
+        }
+        return result;
+    }
+
+    @Override
+    public Set<InstanceDescription> getInstances() {
+        return new HashSet<InstanceDescription>(instances);
+    }
+
+    @Override
+    public Set<InstanceDescription> findInstances(InstanceFilter filter) {
+        throw new IllegalStateException("not yet implemented");
+    }
+
+    @Override
+    public Set<ClusterView> getClusterViews() {
+        Set<ClusterView> clusters = new HashSet<ClusterView>();
+        for (InstanceDescription instanceDescription : instances) {
+            clusters.add(instanceDescription.getClusterView());
+        }
+        return clusters;
+    }
+
+    @Override
+    public String getLocalClusterSyncTokenId() {
+        return id;
+    }
+
+    public SimpleTopologyView addInstance() {
+        final String slingId = UUID.randomUUID().toString();
+        final String clusterId = UUID.randomUUID().toString();
+        final SimpleClusterView cluster = new SimpleClusterView(clusterId);
+        final SimpleInstanceDescription instance = new SimpleInstanceDescription(true, true, slingId, null);
+        instance.setClusterView(cluster);
+        cluster.addInstanceDescription(instance);
+        instances.add(instance);
+        return this;
+    }
+
+    public SimpleTopologyView addInstance(String slingId, SimpleClusterView cluster, boolean isLeader, boolean isLocal) {
+        final SimpleInstanceDescription instance = new SimpleInstanceDescription(isLeader, isLocal, slingId, null);
+        instance.setClusterView(cluster);
+        cluster.addInstanceDescription(instance);
+        instances.add(instance);
+        return this;
+    }
+
+    public SimpleTopologyView addInstance(InstanceDescription artefact) {
+        final String slingId = artefact.getSlingId();
+        final boolean isLeader = artefact.isLeader();
+        final boolean isLocal = artefact.isLocal();
+        SimpleClusterView cluster = (SimpleClusterView) artefact.getClusterView();
+        final SimpleInstanceDescription instance = new SimpleInstanceDescription(isLeader, isLocal, slingId, null);
+        instance.setClusterView(cluster);
+        cluster.addInstanceDescription(instance);
+        instances.add(instance);
+        return this;
+    }
+
+    public SimpleTopologyView removeInstance(String slingId) {
+        for (Iterator<InstanceDescription> it = instances.iterator(); it.hasNext();) {
+            InstanceDescription id = (InstanceDescription) it.next();
+            if (id.getSlingId().equals(slingId)) {
+                it.remove();
+                SimpleClusterView cluster = (SimpleClusterView) id.getClusterView();
+                if (!cluster.removeInstanceDescription(id)) {
+                    throw new IllegalStateException("could not remove id: "+id);
+                }
+                return this;
+            }
+        }
+        throw new IllegalStateException("instance not found: "+slingId);
+    }
+
+    public static SimpleTopologyView clone(final SimpleTopologyView view) {
+        final SimpleTopologyView result = new SimpleTopologyView(view.id);
+        final Iterator<InstanceDescription> it = view.getInstances().iterator();
+        Map<String,SimpleClusterView> clusters = new HashMap<String, SimpleClusterView>();
+        while(it.hasNext()) {
+            InstanceDescription id = it.next();
+            SimpleInstanceDescription clone = clone(id);
+            String clusterId = id.getClusterView().getId();
+            SimpleClusterView cluster = clusters.get(clusterId);
+            if (cluster==null) {
+                cluster = new SimpleClusterView(clusterId);
+                clusters.put(clusterId, cluster);
+            }
+            clone.setClusterView(cluster);
+            result.addInstance(clone);
+        }
+        return result;
+    }
+    
+    private static SimpleInstanceDescription clone(InstanceDescription id) {
+        return new SimpleInstanceDescription(id.isLeader(), id.isLocal(), id.getSlingId(), id.getProperties());
+    }
+
+    public SimpleTopologyView clone() {
+        return SimpleTopologyView.clone(this);
+    }
+}

Propchange: sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/impl/SimpleTopologyView.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/impl/TestHelper.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/impl/TestHelper.java?rev=1707548&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/impl/TestHelper.java (added)
+++ sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/impl/TestHelper.java Thu Oct  8 14:31:45 2015
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.discovery.commons.providers.impl;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
+
+import java.util.Random;
+import java.util.UUID;
+
+import org.apache.sling.discovery.TopologyEvent;
+import org.apache.sling.discovery.commons.providers.BaseTopologyView;
+import org.apache.sling.discovery.commons.providers.EventFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestHelper {
+
+    private static final Logger logger = LoggerFactory.getLogger(TestHelper.class);
+
+    public static void assertEvents(ViewStateManagerImpl mgr, Listener listener, TopologyEvent... events) {
+        waitForAsyncEvents(mgr);
+        assertEquals(events.length, listener.countEvents());
+        for (int i = 0; i < events.length; i++) {
+            TopologyEvent e = events[i];
+            assertEquals(e.getType(), listener.getEvents().get(i).getType());
+            switch(e.getType()) {
+            case TOPOLOGY_INIT: {
+                assertNull(listener.getEvents().get(i).getOldView());
+                assertEquals(e.getNewView(), listener.getEvents().get(i).getNewView());
+                break;
+            }
+            case TOPOLOGY_CHANGING: {
+                assertEquals(e.getOldView(), listener.getEvents().get(i).getOldView());
+                assertNull(listener.getEvents().get(i).getNewView());
+                break;
+            }
+            case PROPERTIES_CHANGED:
+            case TOPOLOGY_CHANGED: {
+                assertEquals(e.getOldView(), listener.getEvents().get(i).getOldView());
+                assertEquals(e.getNewView(), listener.getEvents().get(i).getNewView());
+                break;
+            }
+            default: {
+                fail("no other type supported yet");
+            }
+            }
+        }
+        listener.clearEvents();
+    }
+
+    public static void waitForAsyncEvents(ViewStateManagerImpl mgr) {
+        int sleep = 1;
+        while(true) {
+            if (!mgr.getAsyncEventSender().hasInFlightEvent()) {
+                return;
+            }
+            
+            // sleep outside of synchronized to keep test-influence
+            // to a minimum
+            try {
+                Thread.sleep(sleep);
+            } catch (InterruptedException e) {
+                logger.error("waitForFlush: got interrupted: "+e, e);
+            }
+            // minor back-off up until 20ms
+            sleep = Math.min(20, sleep+1);
+        }
+    }
+
+    public static void assertNoEvents(Listener listener) {
+        assertEquals(0, listener.countEvents());
+    }
+
+    /** does couple loops randomly calling handleChanging() (or not) and then handleNewView().
+     * Note: random is passed to allow customizing and not hardcoding this method to a particular random 
+     * @throws InterruptedException **/
+    public static void randomEventLoop(ViewStateManagerImpl mgr, SimpleDiscoveryService sds, int loopSize, int delayInMillis, final Random random, Listener... listeners) throws InterruptedException {
+        for(int i=0; i<loopSize; i++) {
+            final boolean shouldCallChanging = random.nextBoolean();
+            if (shouldCallChanging) {
+                // dont always do a changing
+                logger.debug("randomEventLoop: calling handleChanging...");
+                mgr.handleChanging();
+                // must first wait for async events to have been processed - as otherwise
+                // the 'getLastView()' might not return the correct view
+                logger.debug("randomEventLoop: waiting for async events....");
+                waitForAsyncEvents(mgr);
+                logger.debug("randomEventLoop: asserting CHANGING event was sent...");
+                for(int j=0; j<listeners.length; j++) {
+                    assertEvents(mgr, listeners[j], EventFactory.newChangingEvent(listeners[j].getLastView()));
+                }
+            } else {
+                logger.debug("randomEventLoop: asserting no events...");
+                for(int j=0; j<listeners.length; j++) {
+                    assertNoEvents(listeners[j]);
+                }
+            }
+            final BaseTopologyView view = new SimpleTopologyView().addInstance();
+            BaseTopologyView[] lastViews = new BaseTopologyView[listeners.length];
+            for(int j=0; j<listeners.length; j++) {
+                lastViews[j] = listeners[j].getLastView();
+            }
+            logger.debug("randomEventLoop: calling handleNewView");
+            if (sds!=null) {
+                sds.setTopoology(view);
+            }
+            mgr.handleNewView(view);
+            if (delayInMillis>0) {
+                logger.debug("randomEventLoop: waiting "+delayInMillis+"ms ...");
+                Thread.sleep(delayInMillis);
+                logger.debug("randomEventLoop: waiting "+delayInMillis+"ms done.");
+            }
+            if (!shouldCallChanging) {
+                // in that case I should still get a CHANGING - by contract
+                logger.debug("randomEventLoop: asserting CHANGING, CHANGED events were sent");
+                for(int j=0; j<listeners.length; j++) {
+                    assertEvents(mgr, listeners[j], EventFactory.newChangingEvent(lastViews[j]), EventFactory.newChangedEvent(lastViews[j], view));
+                }
+            } else {
+                logger.debug("randomEventLoop: asserting CHANGED event was sent");
+                for(int j=0; j<listeners.length; j++) {
+                    assertEvents(mgr, listeners[j], EventFactory.newChangedEvent(lastViews[j], view));
+                }
+            }
+        }
+    }
+
+    public static SimpleTopologyView newView(boolean isCurrent, String leaderId, String localId, String... slingIds) {
+        return newView(UUID.randomUUID().toString(), UUID.randomUUID().toString(), isCurrent, leaderId, localId, slingIds);
+    }
+
+    public static SimpleTopologyView newView(String syncId, String clusterId, boolean isCurrent, String leaderId, String localId, String... slingIds) {
+        SimpleTopologyView topology = new SimpleTopologyView(syncId);
+        SimpleClusterView cluster = new SimpleClusterView(clusterId);
+        for (String slingId : slingIds) {
+            SimpleInstanceDescription id = new SimpleInstanceDescription(
+                    slingId.equals(leaderId), slingId.equals(localId), slingId, null);
+            id.setClusterView(cluster);
+            cluster.addInstanceDescription(id);
+            topology.addInstanceDescription(id);
+        }
+        if (!isCurrent) {
+            topology.setNotCurrent();
+        }
+        return topology;
+    }
+}

Propchange: sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/impl/TestHelper.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/impl/TestMinEventDelayHandler.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/impl/TestMinEventDelayHandler.java?rev=1707548&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/impl/TestMinEventDelayHandler.java (added)
+++ sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/impl/TestMinEventDelayHandler.java Thu Oct  8 14:31:45 2015
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.discovery.commons.providers.impl;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Random;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.log4j.Level;
+import org.apache.log4j.LogManager;
+import org.apache.sling.discovery.commons.providers.BaseTopologyView;
+import org.apache.sling.discovery.commons.providers.EventFactory;
+import org.apache.sling.discovery.commons.providers.spi.ConsistencyService;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestMinEventDelayHandler {
+
+    private final Logger logger = LoggerFactory.getLogger(this.getClass());
+
+    private ViewStateManagerImpl mgr;
+    
+    private Random defaultRandom;
+    
+    private SimpleDiscoveryService sds;
+
+    private Level logLevel;
+
+    private SimpleScheduler scheduler;
+
+    @Before
+    public void setup() throws Exception {
+        mgr = new ViewStateManagerImpl(new ReentrantLock(), new ConsistencyService() {
+            
+            public void sync(BaseTopologyView view, Runnable callback) {
+                callback.run();
+            }
+        });
+        defaultRandom = new Random(1234123412); // I want randomness yes, but deterministic, for some methods at least
+        
+        scheduler = new SimpleScheduler();
+        sds = new SimpleDiscoveryService();
+        mgr.installMinEventDelayHandler(sds, scheduler, 1);
+
+        final org.apache.log4j.Logger discoveryLogger = LogManager.getRootLogger().getLogger("org.apache.sling.discovery");
+        logLevel = discoveryLogger.getLevel();
+        discoveryLogger.setLevel(Level.DEBUG);
+    }
+    
+    @After
+    public void teardown() throws Exception {
+        mgr = null;
+        defaultRandom= null;
+        final org.apache.log4j.Logger discoveryLogger = LogManager.getRootLogger().getLogger("org.apache.sling.discovery");
+        discoveryLogger.setLevel(logLevel);
+    }
+    
+    private void assertNoEvents(Listener listener) {
+        assertEquals(0, listener.countEvents());
+    }
+
+    @Test
+    public void testNormalDelaying() throws Exception {
+        final Listener listener = new Listener();
+        // first activate
+        logger.info("testNormalDelaying: calling handleActivated...");
+        mgr.handleActivated();
+        assertNoEvents(listener); // paranoia
+        // then bind
+        logger.info("testNormalDelaying: calling bind...");
+        mgr.bind(listener);
+        assertNoEvents(listener); // there was no changing or changed yet
+        logger.info("testNormalDelaying: calling handleChanging...");
+        mgr.handleChanging();
+        assertNoEvents(listener);
+        final BaseTopologyView view = new SimpleTopologyView().addInstance();
+        logger.info("testNormalDelaying: calling handleNewView...");
+        mgr.handleNewView(view);
+        TestHelper.assertEvents(mgr, listener, EventFactory.newInitEvent(view));
+        for(int i=0; i<7; i++) {
+            logger.info("testNormalDelaying: calling randomEventLoop...");
+            TestHelper.randomEventLoop(mgr, sds, 4, 1500, defaultRandom, listener);
+            Thread.sleep(1000);
+        }
+    }
+
+    @Test
+    public void testFailedDelaying() throws Exception {
+        scheduler.failMode();
+        final Listener listener = new Listener();
+        // first activate
+        mgr.handleActivated();
+        assertNoEvents(listener); // paranoia
+        // then bind
+        mgr.bind(listener);
+        assertNoEvents(listener); // there was no changing or changed yet
+        mgr.handleChanging();
+        assertNoEvents(listener);
+        final BaseTopologyView view = new SimpleTopologyView().addInstance();
+        mgr.handleNewView(view);
+        TestHelper.assertEvents(mgr, listener, EventFactory.newInitEvent(view));
+        for(int i=0; i<7; i++) {
+            TestHelper.randomEventLoop(mgr, sds, 100, -1, defaultRandom, listener);
+            Thread.sleep(1000);
+        }
+    }
+}

Propchange: sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/impl/TestMinEventDelayHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native