You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by ro...@apache.org on 2017/11/07 09:26:35 UTC

[sling-org-apache-sling-discovery-commons] 32/38: SLING-5094 / SLING-5173 / SLING-4603 related : ensure that before invoking the ConsistencyService.sync no async events are still in the queue. This is achieved by enqueueing an async event too that once it gets triggered ensures that no async events are left. This mechanism ensures that before the syncToken is written, all TopologyEventListeners have received a TOPOLOGY_CHANGING - and only that guarantees that the syncToken mechanism carries a high guarantee.

This is an automated email from the ASF dual-hosted git repository.

rombert pushed a commit to annotated tag org.apache.sling.discovery.commons-1.0.0
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-discovery-commons.git

commit 5a5f5ee4585d1aec441657cad7acbde6f66bab57
Author: Stefan Egli <st...@apache.org>
AuthorDate: Fri Oct 23 11:28:24 2015 +0000

    SLING-5094 / SLING-5173 / SLING-4603 related : ensure that before invoking the ConsistencyService.sync no async events are still in the queue. This is achieved by enqueueing an async event too that once it gets triggered ensures that no async events are left. This mechanism ensures that before the syncToken is written, all TopologyEventListeners have received a TOPOLOGY_CHANGING - and only that guarantees that the syncToken mechanism carries a high guarantee.
    
    git-svn-id: https://svn.apache.org/repos/asf/sling/trunk/bundles/extensions/discovery/commons@1710175 13f79535-47bb-0310-9956-ffa450edef68
---
 .../commons/providers/ViewStateManager.java        |  5 +-
 .../commons/providers/base/AsyncEventSender.java   | 33 ++++-----
 .../{AsyncEvent.java => AsyncTopologyEvent.java}   | 24 ++++++-
 .../providers/base/ViewStateManagerImpl.java       | 80 +++++++++++++++++-----
 .../commons/providers/base/TestHelper.java         |  2 +
 .../providers/base/TestViewStateManager.java       | 21 ++++--
 .../base/TestOakSyncTokenConsistencyService.java   |  5 +-
 7 files changed, 127 insertions(+), 43 deletions(-)

diff --git a/src/main/java/org/apache/sling/discovery/commons/providers/ViewStateManager.java b/src/main/java/org/apache/sling/discovery/commons/providers/ViewStateManager.java
index 8c71603..16a70c5 100644
--- a/src/main/java/org/apache/sling/discovery/commons/providers/ViewStateManager.java
+++ b/src/main/java/org/apache/sling/discovery/commons/providers/ViewStateManager.java
@@ -94,8 +94,9 @@ public interface ViewStateManager {
      * <p>
      * @param timeout time in millis to wait for at max - 0 to not wait at all - -1 
      * to wait indefinitely
-     * @return true if no more async events exist, false if the timeout hit early 
+     * @return 0 if no more async events exist, >0 the number of queued or in-flight (being sent)
+     * events if the timeout hit early
      */
-    boolean waitForAsyncEvents(long timeout);
+    int waitForAsyncEvents(long timeout);
 
 }
\ No newline at end of file
diff --git a/src/main/java/org/apache/sling/discovery/commons/providers/base/AsyncEventSender.java b/src/main/java/org/apache/sling/discovery/commons/providers/base/AsyncEventSender.java
index 014f522..89b5b13 100644
--- a/src/main/java/org/apache/sling/discovery/commons/providers/base/AsyncEventSender.java
+++ b/src/main/java/org/apache/sling/discovery/commons/providers/base/AsyncEventSender.java
@@ -55,7 +55,12 @@ final class AsyncEventSender implements Runnable {
     
     /** Enqueues a particular event for asynchronous sending to a particular listener **/
     void enqueue(TopologyEventListener listener, TopologyEvent event) {
-        final AsyncEvent asyncEvent = new AsyncEvent(listener, event);
+        final AsyncTopologyEvent asyncEvent = new AsyncTopologyEvent(listener, event);
+        enqueue(asyncEvent);
+    }
+
+    /** Enqueues an AsyncEvent for later in-order execution **/
+    void enqueue(final AsyncEvent asyncEvent) {
         synchronized(eventQ) {
             eventQ.add(asyncEvent);
             if (logger.isDebugEnabled()) {
@@ -110,7 +115,7 @@ final class AsyncEventSender implements Runnable {
                         isSending = asyncEvent!=null;
                     }
                     if (asyncEvent!=null) {
-                        sendTopologyEvent(asyncEvent);
+                        asyncEvent.trigger();
                     }
                 } catch(Throwable th) {
                     // Even though we should never catch Error or RuntimeException
@@ -141,25 +146,21 @@ final class AsyncEventSender implements Runnable {
         }
     }
 
-    /** Actual sending of the asynchronous event - catches RuntimeExceptions a listener can send. (Error is caught outside) **/
-    private void sendTopologyEvent(AsyncEvent asyncEvent) {
-        logger.trace("sendTopologyEvent: start");
-        final TopologyEventListener listener = asyncEvent.listener;
-        final TopologyEvent event = asyncEvent.event;
-        try{
-            logger.debug("sendTopologyEvent: sending to listener: {}, event: {}", listener, event);
-            listener.handleTopologyEvent(event);
-        } catch(final Exception e) {
-            logger.warn("sendTopologyEvent: handler threw exception. handler: "+listener+", exception: "+e, e);
-        }
-        logger.trace("sendTopologyEvent: start: listener: {}, event: {}", listener, event);
-    }
-
     /** for testing only: checks whether there are any events being queued or sent **/
     boolean hasInFlightEvent() {
         synchronized(eventQ) {
             return isSending || !eventQ.isEmpty();
         }
     }
+
+    public int getInFlightEventCnt() {
+        synchronized(eventQ) {
+            int cnt = eventQ.size();
+            if (isSending) {
+                cnt++;
+            }
+            return cnt;
+        }
+    }
     
 }
\ No newline at end of file
diff --git a/src/main/java/org/apache/sling/discovery/commons/providers/base/AsyncEvent.java b/src/main/java/org/apache/sling/discovery/commons/providers/base/AsyncTopologyEvent.java
similarity index 61%
rename from src/main/java/org/apache/sling/discovery/commons/providers/base/AsyncEvent.java
rename to src/main/java/org/apache/sling/discovery/commons/providers/base/AsyncTopologyEvent.java
index 1a73908..4d2a443 100644
--- a/src/main/java/org/apache/sling/discovery/commons/providers/base/AsyncEvent.java
+++ b/src/main/java/org/apache/sling/discovery/commons/providers/base/AsyncTopologyEvent.java
@@ -20,12 +20,17 @@ package org.apache.sling.discovery.commons.providers.base;
 
 import org.apache.sling.discovery.TopologyEvent;
 import org.apache.sling.discovery.TopologyEventListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /** SLING-4755 : encapsulates an event that yet has to be sent (asynchronously) for a particular listener **/
-final class AsyncEvent {
+final class AsyncTopologyEvent implements AsyncEvent {
+    
+    static final Logger logger = LoggerFactory.getLogger(AsyncTopologyEvent.class);
+
     final TopologyEventListener listener;
     final TopologyEvent event;
-    AsyncEvent(TopologyEventListener listener, TopologyEvent event) {
+    AsyncTopologyEvent(TopologyEventListener listener, TopologyEvent event) {
         if (listener==null) {
             throw new IllegalArgumentException("listener must not be null");
         }
@@ -37,6 +42,19 @@ final class AsyncEvent {
     }
     @Override
     public String toString() {
-        return "an AsyncEvent[event="+event+", listener="+listener+"]";
+        return "an AsyncTopologyEvent[event="+event+", listener="+listener+"]";
     }
+
+    /** Actual sending of the asynchronous event - catches RuntimeExceptions a listener can send. (Error is caught outside) **/
+    public void trigger() {
+        logger.trace("trigger: start");
+        try{
+            logger.debug("trigger: sending to listener: {}, event: {}", listener, event);
+            listener.handleTopologyEvent(event);
+        } catch(final Exception e) {
+            logger.warn("trigger: handler threw exception. handler: "+listener+", exception: "+e, e);
+        }
+        logger.trace("trigger: start: listener: {}, event: {}", listener, event);
+    }
+
 }
\ No newline at end of file
diff --git a/src/main/java/org/apache/sling/discovery/commons/providers/base/ViewStateManagerImpl.java b/src/main/java/org/apache/sling/discovery/commons/providers/base/ViewStateManagerImpl.java
index 9fa980b..61b39f6 100644
--- a/src/main/java/org/apache/sling/discovery/commons/providers/base/ViewStateManagerImpl.java
+++ b/src/main/java/org/apache/sling/discovery/commons/providers/base/ViewStateManagerImpl.java
@@ -512,26 +512,56 @@ public class ViewStateManagerImpl implements ViewStateManager {
                 // then:
                 // run the set consistencyService
                 final int lastModCnt = modCnt;
-                logger.info("handleNewViewNonDelayed: invoking consistencyService (modCnt={})", modCnt);
-                consistencyService.sync(newView,
-                        new Runnable() {
+                logger.info("handleNewViewNonDelayed: invoking waitForAsyncEvents, then consistencyService (modCnt={})", modCnt);
+                asyncEventSender.enqueue(new AsyncEvent() {
                     
-                    public void run() {
-                        logger.trace("consistencyService.callback.run: start. acquiring lock...");
+                    @Override
+                    public String toString() {
+                        return "the waitForAsyncEvents-flush-token-"+hashCode();
+                    }
+                    
+                    @Override
+                    public void trigger() {
+                        // when this event is triggered we're guaranteed to have 
+                        // no more async events - cos the async events are handled
+                        // in a queue and this AsyncEvent was put at the end of the
+                        // queue at enqueue time. So now e can go ahead.
+                        // the plus using such a token event is that others when
+                        // calling waitForAsyncEvent() will get blocked while this
+                        // 'token async event' is handled. Which is what we explicitly want.
                         lock.lock();
                         try{
-                            logger.debug("consistencyService.callback.run: lock aquired. (modCnt should be {}, is {})", lastModCnt, modCnt);
                             if (modCnt!=lastModCnt) {
-                                logger.info("consistencyService.callback.run: modCnt changed (from {} to {}) - ignoring",
+                                logger.info("handleNewViewNonDelayed/waitForAsyncEvents.run: modCnt changed (from {} to {}) - ignoring",
                                         lastModCnt, modCnt);
                                 return;
                             }
-                            logger.info("consistencyService.callback.run: invoking doHandleConsistent.");
-                            // else:
-                            doHandleConsistent(newView);
+                            logger.info("handleNewViewNonDelayed/waitForAsyncEvents.run: done, now invoking consistencyService (modCnt={})", modCnt);
+                            consistencyService.sync(newView,
+                                    new Runnable() {
+                                
+                                public void run() {
+                                    logger.trace("consistencyService.callback.run: start. acquiring lock...");
+                                    lock.lock();
+                                    try{
+                                        logger.debug("consistencyService.callback.run: lock aquired. (modCnt should be {}, is {})", lastModCnt, modCnt);
+                                        if (modCnt!=lastModCnt) {
+                                            logger.info("consistencyService.callback.run: modCnt changed (from {} to {}) - ignoring",
+                                                    lastModCnt, modCnt);
+                                            return;
+                                        }
+                                        logger.info("consistencyService.callback.run: invoking doHandleConsistent.");
+                                        // else:
+                                        doHandleConsistent(newView);
+                                    } finally {
+                                        lock.unlock();
+                                        logger.trace("consistencyService.callback.run: end.");
+                                    }
+                                }
+                                
+                            });
                         } finally {
                             lock.unlock();
-                            logger.trace("consistencyService.callback.run: end.");
                         }
                     }
                     
@@ -628,22 +658,38 @@ public class ViewStateManagerImpl implements ViewStateManager {
     }
 
     @Override
-    public boolean waitForAsyncEvents(long timeout) {
+    public int waitForAsyncEvents(long timeout) {
         long end = System.currentTimeMillis() + timeout;
-        while(asyncEventSender.hasInFlightEvent() || 
-                (minEventDelayHandler!=null && minEventDelayHandler.isDelaying())) {
+        while(true) {
+            int inFlightEventCnt = getInFlightAsyncEventCnt();
+            if (inFlightEventCnt==0) {
+                // no in-flight events - return 0
+                return 0;
+            }
             if (timeout==0) {
-                return false;
+                // timeout is set to 'no-wait', but we have in-flight events,
+                // return the actual cnt
+                return inFlightEventCnt;
             }
-            if (timeout<0 || System.currentTimeMillis()<end) {
+            if (timeout<0 /*infinite waiting*/ || System.currentTimeMillis()<end) {
                 try {
                     Thread.sleep(50);
                 } catch (InterruptedException e) {
                     // ignore
                 }
+            } else {
+                // timeout hit
+                return inFlightEventCnt;
             }
         }
-        return true;
+    }
+    
+    private int getInFlightAsyncEventCnt() {
+        int cnt = asyncEventSender.getInFlightEventCnt();
+        if (minEventDelayHandler!=null && minEventDelayHandler.isDelaying()) {
+            cnt++;
+        }
+        return cnt;
     }
     
 }
diff --git a/src/test/java/org/apache/sling/discovery/commons/providers/base/TestHelper.java b/src/test/java/org/apache/sling/discovery/commons/providers/base/TestHelper.java
index 23b6821..72ab0d2 100644
--- a/src/test/java/org/apache/sling/discovery/commons/providers/base/TestHelper.java
+++ b/src/test/java/org/apache/sling/discovery/commons/providers/base/TestHelper.java
@@ -19,6 +19,7 @@
 package org.apache.sling.discovery.commons.providers.base;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.fail;
 
@@ -134,6 +135,7 @@ public class TestHelper {
                 Thread.sleep(delayInMillis);
                 logger.debug("randomEventLoop: waiting "+delayInMillis+"ms done.");
             }
+            assertEquals(0, mgr.waitForAsyncEvents(500));
             if (!shouldCallChanging) {
                 // in that case I should still get a CHANGING - by contract
                 logger.debug("randomEventLoop: asserting CHANGING, CHANGED events were sent");
diff --git a/src/test/java/org/apache/sling/discovery/commons/providers/base/TestViewStateManager.java b/src/test/java/org/apache/sling/discovery/commons/providers/base/TestViewStateManager.java
index 13d586d..e80268c 100644
--- a/src/test/java/org/apache/sling/discovery/commons/providers/base/TestViewStateManager.java
+++ b/src/test/java/org/apache/sling/discovery/commons/providers/base/TestViewStateManager.java
@@ -63,7 +63,9 @@ public class TestViewStateManager {
             try {
                 lock.unlock();
                 try{
+                    logger.info("ConsistencyServiceWithSemaphore.sync: acquiring lock ...");
                     semaphore.acquire();
+                    logger.info("ConsistencyServiceWithSemaphore.sync: lock acquired.");
                 } finally {
                     lock.lock();
                 }
@@ -102,6 +104,10 @@ public class TestViewStateManager {
     
     @After
     public void teardown() throws Exception {
+        if (mgr != null) {
+            // release any async event sender ..
+            mgr.handleDeactivated();
+        }
         mgr = null;
         defaultRandom= null;
     }
@@ -240,7 +246,7 @@ public class TestViewStateManager {
         mgr.handleChanging();
         final BaseTopologyView view = new DummyTopologyView().addInstance();
         mgr.handleNewView(view);
-        assertTrue(mgr.waitForAsyncEvents(1000));
+        assertEquals(0, mgr.waitForAsyncEvents(1000));
         TestHelper.assertNoEvents(listener);
         synchronized(syncCallbacks) {
             assertEquals(1, syncCallbacks.size());
@@ -249,14 +255,14 @@ public class TestViewStateManager {
         String id2 = UUID.randomUUID().toString();
         final BaseTopologyView view2 = TestHelper.newView(true, id1, id1, id1, id2); 
         mgr.handleNewView(view2);
-        assertTrue(mgr.waitForAsyncEvents(1000));
+        assertEquals(0, mgr.waitForAsyncEvents(1000));
         TestHelper.assertNoEvents(listener);
         synchronized(syncCallbacks) {
             assertEquals(1, syncCallbacks.size());
             syncCallbacks.get(0).run();
             syncCallbacks.clear();
         }
-        assertTrue(mgr.waitForAsyncEvents(1000));
+        assertEquals(0, mgr.waitForAsyncEvents(1000));
         assertEvents(listener, EventHelper.newInitEvent(view2));
     }
     
@@ -609,10 +615,12 @@ public class TestViewStateManager {
         });
         Thread.sleep(1000);
         TestHelper.assertNoEvents(listener);
+        assertEquals("should have one thread now waiting", 1, serviceSemaphore.getQueueLength());
         serviceSemaphore.release(1); // release the first one only
         Thread.sleep(1000);
         assertEvents(listener, EventHelper.newInitEvent(view1));
         mgr.handleChanging();
+        assertEquals(0, mgr.waitForAsyncEvents(500));
         assertEvents(listener, EventHelper.newChangingEvent(view1));
         async(new Runnable() {
 
@@ -625,6 +633,7 @@ public class TestViewStateManager {
         Thread.sleep(1000);
         logger.debug("run: asserting no events");
         TestHelper.assertNoEvents(listener);
+        assertEquals("should have one thread now waiting", 1, serviceSemaphore.getQueueLength());
         assertFalse("should not be locked", lock.isLocked());
 
         logger.debug("run: issuing a second event");
@@ -649,12 +658,16 @@ public class TestViewStateManager {
         });
         logger.debug("run: waiting 1sec");
         Thread.sleep(1000);
+        int remainingAsyncEvents = mgr.waitForAsyncEvents(2000);
+        logger.info("run: result of waitForAsyncEvent is: "+remainingAsyncEvents);
+        assertEquals("should have one thread now waiting", 1, serviceSemaphore.getQueueLength());
         assertEquals("should be acquiring (by thread2)", 1, testSemaphore.getQueueLength());
         // releasing the testSemaphore
         testSemaphore.release();
         logger.debug("run: waiting 1sec");
         Thread.sleep(1000);
-        assertEquals("should have both threads now waiting", 2, serviceSemaphore.getQueueLength());
+        assertEquals("should have two async events now in the queue or being sent", 2, mgr.waitForAsyncEvents(500));
+        assertEquals("but should only have 1 thread actually sitting on the semaphore waiting", 1, serviceSemaphore.getQueueLength());
         logger.debug("run: releasing consistencyService");
         serviceSemaphore.release(1); // release the first one only
         logger.debug("run: waiting 1sec");
diff --git a/src/test/java/org/apache/sling/discovery/commons/providers/spi/base/TestOakSyncTokenConsistencyService.java b/src/test/java/org/apache/sling/discovery/commons/providers/spi/base/TestOakSyncTokenConsistencyService.java
index a774d76..357c8a5 100644
--- a/src/test/java/org/apache/sling/discovery/commons/providers/spi/base/TestOakSyncTokenConsistencyService.java
+++ b/src/test/java/org/apache/sling/discovery/commons/providers/spi/base/TestOakSyncTokenConsistencyService.java
@@ -136,7 +136,7 @@ public class TestOakSyncTokenConsistencyService {
         DescriptorHelper.setDiscoveryLiteDescriptor(factory1, new DiscoveryLiteDescriptorBuilder().me(1).seq(1).activeIds(1).setFinal(true));
         assertTrue(idMapService1.waitForInit(2000));
         cs.triggerBackgroundCheck();
-        assertTrue(vsm.waitForAsyncEvents(1000));
+        assertEquals(0, vsm.waitForAsyncEvents(1000));
         assertEquals(1, l.countEvents());
     }
     
@@ -180,7 +180,10 @@ public class TestOakSyncTokenConsistencyService {
         DummyTopologyView two2 = TestHelper.newView(two1.getLocalClusterSyncTokenId(), two1.getLocalInstance().getClusterView().getId(), true, slingId1, slingId1, slingId1, slingId2);
         vsm2.handleNewView(two2);
         cs1.triggerBackgroundCheck();
+        cs1.triggerBackgroundCheck();
+        cs2.triggerBackgroundCheck();
         cs2.triggerBackgroundCheck();
+        assertEquals(0, vsm1.waitForAsyncEvents(500));
         assertEquals(1, l.countEvents());
         DummyTopologyView oneLeaving = two1.clone();
         oneLeaving.removeInstance(slingId2);

-- 
To stop receiving notification emails like this one, please contact
"commits@sling.apache.org" <co...@sling.apache.org>.