You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by st...@apache.org on 2015/10/08 16:31:46 UTC
svn commit: r1707548 [1/4] - in
/sling/trunk/bundles/extensions/discovery/commons: ./
src/main/java/org/apache/sling/discovery/commons/providers/
src/main/java/org/apache/sling/discovery/commons/providers/impl/
src/main/java/org/apache/sling/discovery/...
Author: stefanegli
Date: Thu Oct 8 14:31:45 2015
New Revision: 1707548
URL: http://svn.apache.org/viewvc?rev=1707548&view=rev
Log:
SLING-5131 : introducing ConsistencyService and an oak-discovery-lite based implementation of it - plus SLING-4697 : support for PROPERTIES_CHANGED added to ViewStateManagerImpl
Added:
sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/EventFactory.java (with props)
sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/impl/
sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/impl/AsyncEvent.java (with props)
sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/impl/AsyncEventSender.java (with props)
sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/impl/MinEventDelayHandler.java (with props)
sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/impl/ViewStateManagerFactory.java (with props)
sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/impl/ViewStateManagerImpl.java
- copied, changed from r1706814, sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/ViewStateManager.java
sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/spi/
sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/spi/ConsistencyService.java (with props)
sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/spi/impl/
sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/spi/impl/OakSyncTokenConsistencyService.java (with props)
sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/spi/impl/SyncTokenConsistencyService.java (with props)
sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/impl/
sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/impl/ClusterTest.java (with props)
sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/impl/Listener.java (with props)
sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/impl/SimpleClusterView.java (with props)
sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/impl/SimpleDiscoveryService.java (with props)
sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/impl/SimpleInstanceDescription.java (with props)
sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/impl/SimpleScheduler.java (with props)
sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/impl/SimpleTopologyView.java (with props)
sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/impl/TestHelper.java (with props)
sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/impl/TestMinEventDelayHandler.java (with props)
sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/impl/TestViewStateManager.java
- copied, changed from r1706814, sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/TestViewStateManager.java
sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/spi/
sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/spi/impl/
sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/spi/impl/DiscoLite.java (with props)
sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/spi/impl/MockFactory.java (with props)
sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/spi/impl/MockedResource.java (with props)
sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/spi/impl/MockedResourceResolver.java (with props)
sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/spi/impl/RepositoryHelper.java (with props)
sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/spi/impl/TestOakSyncTokenConsistencyService.java (with props)
sling/trunk/bundles/extensions/discovery/commons/src/test/resources/
sling/trunk/bundles/extensions/discovery/commons/src/test/resources/log4j.properties (with props)
Removed:
sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/ViewStateManager.java
sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/TestViewStateManager.java
Modified:
sling/trunk/bundles/extensions/discovery/commons/pom.xml
sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/BaseTopologyView.java
Modified: sling/trunk/bundles/extensions/discovery/commons/pom.xml
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/discovery/commons/pom.xml?rev=1707548&r1=1707547&r2=1707548&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/discovery/commons/pom.xml (original)
+++ sling/trunk/bundles/extensions/discovery/commons/pom.xml Thu Oct 8 14:31:45 2015
@@ -57,11 +57,77 @@
<artifactId>bndlib</artifactId>
</dependency>
<dependency>
+ <groupId>org.apache.sling</groupId>
+ <artifactId>org.apache.sling.commons.scheduler</artifactId>
+ <version>2.3.4</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
<groupId>org.apache.sling</groupId>
<artifactId>org.apache.sling.discovery.api</artifactId>
<version>1.0.0</version>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.sling</groupId>
+ <artifactId>org.apache.sling.api</artifactId>
+ <version>2.4.0</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.sling</groupId>
+ <artifactId>org.apache.sling.commons.json</artifactId>
+ <version>2.0.6</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>javax.jcr</groupId>
+ <artifactId>jcr</artifactId>
+ <version>2.0</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.sling</groupId>
+ <artifactId>org.apache.sling.commons.testing</artifactId>
+ <version>2.0.16</version>
+ <scope>test</scope>
+ <exclusions>
+ <!-- slf4j simple implementation logs INFO + higher to stdout (we don't want that behaviour) -->
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-simple</artifactId>
+ </exclusion>
+ <!-- also excluding jcl-over-slf4j as we need a newer vesion of this which is compatible with slf4j 1.6 -->
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>jcl-over-slf4j</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <!-- needed to get the nodetypes for testing properly with sling: prefix -->
+ <groupId>org.apache.sling</groupId>
+ <artifactId>org.apache.sling.jcr.resource</artifactId>
+ <version>2.3.8</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.jackrabbit</groupId>
+ <artifactId>oak-core</artifactId>
+ <version>1.3.7</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.jackrabbit</groupId>
+ <artifactId>oak-jcr</artifactId>
+ <version>1.3.7</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>javax.servlet</groupId>
+ <artifactId>servlet-api</artifactId>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
@@ -83,5 +149,31 @@
<version>1.9.5</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <version>1.7.5</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ <version>1.2.13</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.jackrabbit</groupId>
+ <artifactId>jackrabbit-jcr-commons</artifactId>
+ <version>2.11.0</version>
+ <type>bundle</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.jackrabbit</groupId>
+ <artifactId>jackrabbit-api</artifactId>
+ <version>2.11.0</version>
+ <type>bundle</type>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
Modified: sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/BaseTopologyView.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/BaseTopologyView.java?rev=1707548&r1=1707547&r2=1707548&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/BaseTopologyView.java (original)
+++ sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/BaseTopologyView.java Thu Oct 8 14:31:45 2015
@@ -49,4 +49,37 @@ public abstract class BaseTopologyView i
current = false;
}
+ /**
+ * Returns the id that shall be used in the syncToken
+ * by the ConsistencyService.
+ * <p>
+ * The clusterSyncId uniquely identifies each change
+ * of the local cluster for all participating instances.
+ * That means, all participating instances know of the
+ * clusterSyncId and it is the same for all instances.
+ * Whenever an instance joins/leaves the cluster, this
+ * clusterSyncId must change.
+ * <p>
+ * Since this method returns the *local* clusterSyncId,
+ * it doesn't care if a remote cluster experienced
+ * changes - it must only change when the local cluster changes.
+ * However, it *can* change when a remote cluster changes too.
+ * So the requirement is just that it changes *at least* when
+ * the local cluster changes - but implementations
+ * can opt to regard this rather as a TopologyView-ID too
+ * (ie an ID that identifies a particular incarnation
+ * of the TopologyView for all participating instances
+ * in the whole topology).
+ * <p>
+ * This id can further safely be used by the ConsistencyService
+ * to identify a syncToken that it writes and that all
+ * other instances in the lcoal cluster wait for, before
+ * sending a TOPOLOGY_CHANGED event.
+ * <p>
+ * Note that this is obviously not to be confused
+ * with the ClusterView.getId() which is stable throughout
+ * the lifetime of a cluster.
+ */
+ public abstract String getLocalClusterSyncTokenId();
+
}
Added: sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/EventFactory.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/EventFactory.java?rev=1707548&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/EventFactory.java (added)
+++ sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/EventFactory.java Thu Oct 8 14:31:45 2015
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.discovery.commons.providers;
+
+import org.apache.sling.discovery.TopologyEvent;
+import org.apache.sling.discovery.TopologyEvent.Type;
+
+/** Factory for creating TopologyEvents with BaseTopologyView **/
+public class EventFactory {
+
+ /** Simple factory method for creating a TOPOLOGY_INIT event with the given newView **/
+ public static TopologyEvent newInitEvent(final BaseTopologyView newView) {
+ if (newView==null) {
+ throw new IllegalStateException("newView must not be null");
+ }
+ if (!newView.isCurrent()) {
+ throw new IllegalStateException("newView must be current");
+ }
+ return new TopologyEvent(Type.TOPOLOGY_INIT, null, newView);
+ }
+
+ /** Simple factory method for creating a TOPOLOGY_CHANGING event with the given oldView **/
+ public static TopologyEvent newChangingEvent(final BaseTopologyView oldView) {
+ if (oldView==null) {
+ throw new IllegalStateException("oldView must not be null");
+ }
+ if (oldView.isCurrent()) {
+ throw new IllegalStateException("oldView must not be current");
+ }
+ return new TopologyEvent(Type.TOPOLOGY_CHANGING, oldView, null);
+ }
+
+ /** Simple factory method for creating a TOPOLOGY_CHANGED event with the given old and new views **/
+ public static TopologyEvent newChangedEvent(final BaseTopologyView oldView, final BaseTopologyView newView) {
+ if (oldView==null) {
+ throw new IllegalStateException("oldView must not be null");
+ }
+ if (oldView.isCurrent()) {
+ throw new IllegalStateException("oldView must not be current");
+ }
+ if (newView==null) {
+ throw new IllegalStateException("newView must not be null");
+ }
+ if (!newView.isCurrent()) {
+ throw new IllegalStateException("newView must be current");
+ }
+ return new TopologyEvent(Type.TOPOLOGY_CHANGED, oldView, newView);
+ }
+
+ public static TopologyEvent newPropertiesChangedEvent(final BaseTopologyView oldView, final BaseTopologyView newView) {
+ if (oldView==null) {
+ throw new IllegalStateException("oldView must not be null");
+ }
+ if (oldView.isCurrent()) {
+ throw new IllegalStateException("oldView must not be current");
+ }
+ if (newView==null) {
+ throw new IllegalStateException("newView must not be null");
+ }
+ if (!newView.isCurrent()) {
+ throw new IllegalStateException("newView must be current");
+ }
+ return new TopologyEvent(Type.PROPERTIES_CHANGED, oldView, newView);
+ }
+
+}
Propchange: sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/EventFactory.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/impl/AsyncEvent.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/impl/AsyncEvent.java?rev=1707548&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/impl/AsyncEvent.java (added)
+++ sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/impl/AsyncEvent.java Thu Oct 8 14:31:45 2015
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.discovery.commons.providers.impl;
+
+import org.apache.sling.discovery.TopologyEvent;
+import org.apache.sling.discovery.TopologyEventListener;
+
+/** SLING-4755 : encapsulates an event that yet has to be sent (asynchronously) for a particular listener **/
+final class AsyncEvent {
+ final TopologyEventListener listener;
+ final TopologyEvent event;
+ AsyncEvent(TopologyEventListener listener, TopologyEvent event) {
+ if (listener==null) {
+ throw new IllegalArgumentException("listener must not be null");
+ }
+ if (event==null) {
+ throw new IllegalArgumentException("event must not be null");
+ }
+ this.listener = listener;
+ this.event = event;
+ }
+ @Override
+ public String toString() {
+ return "an AsyncEvent[event="+event+", listener="+listener+"]";
+ }
+}
\ No newline at end of file
Propchange: sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/impl/AsyncEvent.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/impl/AsyncEventSender.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/impl/AsyncEventSender.java?rev=1707548&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/impl/AsyncEventSender.java (added)
+++ sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/impl/AsyncEventSender.java Thu Oct 8 14:31:45 2015
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.discovery.commons.providers.impl;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.sling.discovery.TopologyEvent;
+import org.apache.sling.discovery.TopologyEventListener;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * SLING-4755 : background runnable that takes care of asynchronously sending events.
+ * <p>
+ * API is: enqueue() puts a listener-event tuple onto the internal Q, which
+ * is processed in a loop in run that does so (uninterruptably, even catching
+ * Throwables to be 'very safe', but sleeps 5sec if an Error happens) until
+ * flushThenStop() is called - which puts the sender in a state where any pending
+ * events are still sent (flush) but then stops automatically. The argument of
+ * using flush before stop is that the event was originally meant to be sent
+ * before the bundle was stopped - thus just because the bundle is stopped
+ * doesn't undo the event and it still has to be sent. That obviously can
+ * mean that listeners can receive a topology event after deactivate. But I
+ * guess that was already the case before the change to become asynchronous.
+ */
+final class AsyncEventSender implements Runnable {
+
+ static final Logger logger = LoggerFactory.getLogger(AsyncEventSender.class);
+
+ /** stopped is always false until flushThenStop is called **/
+ private boolean stopped = false;
+
+ /** eventQ contains all AsyncEvent objects that have yet to be sent - in order to be sent **/
+ private final List<AsyncEvent> eventQ = new LinkedList<AsyncEvent>();
+
+ /** flag to track whether or not an event is currently being sent (but already taken off the Q **/
+ private boolean isSending = false;
+
+ /** Enqueues a particular event for asynchronous sending to a particular listener **/
+ void enqueue(TopologyEventListener listener, TopologyEvent event) {
+ final AsyncEvent asyncEvent = new AsyncEvent(listener, event);
+ synchronized(eventQ) {
+ eventQ.add(asyncEvent);
+ if (logger.isDebugEnabled()) {
+ logger.debug("enqueue: enqueued event {} for async sending (Q size: {})", asyncEvent, eventQ.size());
+ }
+ eventQ.notifyAll();
+ }
+ }
+
+ /**
+ * Stops the AsyncEventSender as soon as the queue is empty
+ */
+ void flushThenStop() {
+ synchronized(eventQ) {
+ logger.info("AsyncEventSender.flushThenStop: flushing (size: {}) & stopping...", eventQ.size());
+ stopped = true;
+ eventQ.notifyAll();
+ }
+ }
+
+ /** Main worker loop that dequeues from the eventQ and calls sendTopologyEvent with each **/
+ public void run() {
+ logger.info("AsyncEventSender.run: started.");
+ try{
+ while(true) {
+ try{
+ final AsyncEvent asyncEvent;
+ synchronized(eventQ) {
+ isSending = false;
+ while(!stopped && eventQ.isEmpty()) {
+ try {
+ eventQ.wait();
+ } catch (InterruptedException e) {
+ // issue a log debug but otherwise continue
+ logger.debug("AsyncEventSender.run: interrupted while waiting for async events");
+ }
+ }
+ if (stopped) {
+ if (eventQ.isEmpty()) {
+ // then we have flushed, so we can now finally stop
+ logger.info("AsyncEventSender.run: flush finished. stopped.");
+ return;
+ } else {
+ // otherwise the eventQ is not yet empty, so we are still in flush mode
+ logger.info("AsyncEventSender.run: flushing another event. (pending {})", eventQ.size());
+ }
+ }
+ asyncEvent = eventQ.remove(0);
+ if (logger.isDebugEnabled()) {
+ logger.debug("AsyncEventSender.run: dequeued event {}, remaining: {}", asyncEvent, eventQ.size());
+ }
+ isSending = asyncEvent!=null;
+ }
+ if (asyncEvent!=null) {
+ sendTopologyEvent(asyncEvent);
+ }
+ } catch(Throwable th) {
+ // Even though we should never catch Error or RuntimeException
+ // here's the thinking about doing it anyway:
+ // * in case of a RuntimeException that would be less dramatic
+ // and catching it is less of an issue - we rather want
+ // the background thread to be able to continue than
+ // having it finished just because of a RuntimeException
+ // * catching an Error is of course not so nice.
+ // however, should we really give up this thread even in
+ // case of an Error? It could be an OOM or some other
+ // nasty one, for sure. But even if. Chances are that
+ // other parts of the system would also get that Error
+ // if it is very dramatic. If not, then catching it
+ // sounds feasible.
+ // My two cents..
+ // the goal is to avoid quitting the AsyncEventSender thread
+ logger.error("AsyncEventSender.run: Throwable occurred. Sleeping 5sec. Throwable: "+th, th);
+ try {
+ Thread.sleep(5000);
+ } catch (InterruptedException e) {
+ logger.warn("AsyncEventSender.run: interrupted while sleeping");
+ }
+ }
+ }
+ } finally {
+ logger.info("AsyncEventSender.run: quits (finally).");
+ }
+ }
+
+ /** Actual sending of the asynchronous event - catches RuntimeExceptions a listener can send. (Error is caught outside) **/
+ private void sendTopologyEvent(AsyncEvent asyncEvent) {
+ logger.trace("sendTopologyEvent: start");
+ final TopologyEventListener listener = asyncEvent.listener;
+ final TopologyEvent event = asyncEvent.event;
+ try{
+ logger.debug("sendTopologyEvent: sending to listener: {}, event: {}", listener, event);
+ listener.handleTopologyEvent(event);
+ } catch(final Exception e) {
+ logger.warn("sendTopologyEvent: handler threw exception. handler: "+listener+", exception: "+e, e);
+ }
+ logger.trace("sendTopologyEvent: start: listener: {}, event: {}", listener, event);
+ }
+
+ /** for testing only: checks whether there are any events being queued or sent **/
+ boolean hasInFlightEvent() {
+ synchronized(eventQ) {
+ return isSending || !eventQ.isEmpty();
+ }
+ }
+
+}
\ No newline at end of file
Propchange: sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/impl/AsyncEventSender.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/impl/MinEventDelayHandler.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/impl/MinEventDelayHandler.java?rev=1707548&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/impl/MinEventDelayHandler.java (added)
+++ sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/impl/MinEventDelayHandler.java Thu Oct 8 14:31:45 2015
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.discovery.commons.providers.impl;
+
+import java.util.Date;
+import java.util.concurrent.locks.Lock;
+
+import org.apache.sling.commons.scheduler.Scheduler;
+import org.apache.sling.discovery.DiscoveryService;
+import org.apache.sling.discovery.TopologyView;
+import org.apache.sling.discovery.commons.providers.BaseTopologyView;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** hooks into the ViewStateManagerImpl and adds a delay between
+ * TOPOLOGY_CHANGING and TOPOLOGY_CHANGED - with the idea to avoid
+ * bundle multiple TOPOLOGY_CHANGED events should they happen within
+ * a very short amount of time.
+ */
+class MinEventDelayHandler {
+
+ private final static Logger logger = LoggerFactory.getLogger(MinEventDelayHandler.class);
+
+ private boolean isDelaying = false;
+
+ private final Scheduler scheduler;
+
+ private final long minEventDelaySecs;
+
+ private DiscoveryService discoveryService;
+
+ private ViewStateManagerImpl viewStateManager;
+
+ private Lock lock;
+
+ MinEventDelayHandler(ViewStateManagerImpl viewStateManager, Lock lock,
+ DiscoveryService discoveryService, Scheduler scheduler,
+ long minEventDelaySecs) {
+ this.viewStateManager = viewStateManager;
+ this.lock = lock;
+ if (discoveryService==null) {
+ throw new IllegalArgumentException("discoveryService must not be null");
+ }
+ this.discoveryService = discoveryService;
+ this.scheduler = scheduler;
+ if (minEventDelaySecs<=0) {
+ throw new IllegalArgumentException("minEventDelaySecs must be greater than 0 (is "+minEventDelaySecs+")");
+ }
+ this.minEventDelaySecs = minEventDelaySecs;
+ }
+
+ /**
+ * Asks the MinEventDelayHandler to handle the new view
+ * and return true if the caller shouldn't worry about any follow-up action -
+ * only if the method returns false should the caller do the usual
+ * handleNewView action
+ */
+ boolean handlesNewView(BaseTopologyView newView) {
+ if (isDelaying) {
+ // already delaying, so we'll soon ask the DiscoveryServiceImpl for the
+ // latest view and go ahead then
+ logger.info("handleNewView: already delaying, ignoring new view meanwhile");
+ return true;
+ }
+
+ if (!viewStateManager.hadPreviousView()
+ || viewStateManager.isPropertiesDiff(newView)
+ || viewStateManager.unchanged(newView)) {
+ logger.info("handleNewView: we never had a previous view, so we mustn't delay");
+ return false;
+ }
+
+ // thanks to force==true this will always return true
+ if (!triggerAsyncDelaying(newView)) {
+ logger.info("handleNewView: could not trigger async delaying, sending new view now.");
+ viewStateManager.handleNewViewNonDelayed(newView);
+ }
+ return true;
+ }
+
+ private boolean triggerAsyncDelaying(BaseTopologyView newView) {
+ final boolean triggered = runAfter(minEventDelaySecs /*seconds*/ , new Runnable() {
+
+ public void run() {
+ lock.lock();
+ try{
+ // unlock the CHANGED event for any subsequent call to handleTopologyChanged()
+ isDelaying = false;
+
+ // check if the new topology is already ready
+ TopologyView t = discoveryService.getTopology();
+ if (!(t instanceof BaseTopologyView)) {
+ logger.error("asyncDelay.run: topology not of type BaseTopologyView: "+t);
+ // cannot continue in this case
+ return;
+ }
+ BaseTopologyView topology = (BaseTopologyView) t;
+
+ if (topology.isCurrent()) {
+ logger.info("asyncDelay.run: got new view: "+topology);
+ viewStateManager.handleNewViewNonDelayed(topology);
+ } else {
+ logger.info("asyncDelay.run: new view (still) not current, delaying again");
+ triggerAsyncDelaying(topology);
+ // we're actually not interested in the result here
+ // if the async part failed, then we have to rely
+ // on a later handleNewView to come in - we can't
+ // really send a view now cos it is not current.
+ // so we're really stuck to waiting for handleNewView
+ // in this case.
+ }
+ } catch(RuntimeException re) {
+ logger.error("RuntimeException: "+re, re);
+ throw re;
+ } catch(Error er) {
+ logger.error("Error: "+er, er);
+ throw er;
+ } finally {
+ lock.unlock();
+ }
+ }
+ });
+
+ logger.info("triggerAsyncDelaying: asynch delaying of "+minEventDelaySecs+" triggered: "+triggered);
+ if (triggered) {
+ isDelaying = true;
+ }
+ return triggered;
+ }
+
+ /**
+ * run the runnable after the indicated number of seconds, once.
+ * @return true if the scheduling of the runnable worked, false otherwise
+ */
+ private boolean runAfter(long seconds, final Runnable runnable) {
+ final Scheduler theScheduler = scheduler;
+ if (theScheduler == null) {
+ logger.info("runAfter: no scheduler set");
+ return false;
+ }
+ logger.trace("runAfter: trying with scheduler.fireJob");
+ final Date date = new Date(System.currentTimeMillis() + seconds * 1000);
+ try {
+ theScheduler.fireJobAt(null, runnable, null, date);
+ return true;
+ } catch (Exception e) {
+ logger.info("runAfter: could not schedule a job: "+e);
+ return false;
+ }
+ }
+
+}
Propchange: sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/impl/MinEventDelayHandler.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/impl/ViewStateManagerFactory.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/impl/ViewStateManagerFactory.java?rev=1707548&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/impl/ViewStateManagerFactory.java (added)
+++ sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/impl/ViewStateManagerFactory.java Thu Oct 8 14:31:45 2015
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.discovery.commons.providers.impl;
+
+import java.util.concurrent.locks.Lock;
+
+import org.apache.sling.discovery.commons.providers.ViewStateManager;
+import org.apache.sling.discovery.commons.providers.spi.ConsistencyService;
+
+/**
+ * Used to create an implementation classes of type ViewStateManager
+ * (with the idea to be able to leave the implementation classes
+ * as package-protected)
+ */
+public class ViewStateManagerFactory {
+
+ public static ViewStateManager newViewStateManager(Lock lock,
+ ConsistencyService consistencyService) {
+ return new ViewStateManagerImpl(lock, consistencyService);
+ }
+
+}
Propchange: sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/impl/ViewStateManagerFactory.java
------------------------------------------------------------------------------
svn:eol-style = native
Copied: sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/impl/ViewStateManagerImpl.java (from r1706814, sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/ViewStateManager.java)
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/impl/ViewStateManagerImpl.java?p2=sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/impl/ViewStateManagerImpl.java&p1=sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/ViewStateManager.java&r1=1706814&r2=1707548&rev=1707548&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/ViewStateManager.java (original)
+++ sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/impl/ViewStateManagerImpl.java Thu Oct 8 14:31:45 2015
@@ -16,15 +16,29 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.sling.discovery.commons.providers;
+package org.apache.sling.discovery.commons.providers.impl;
import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
-
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.locks.Lock;
+
+import org.apache.sling.commons.scheduler.Scheduler;
+import org.apache.sling.discovery.DiscoveryService;
+import org.apache.sling.discovery.InstanceDescription;
import org.apache.sling.discovery.TopologyEvent;
import org.apache.sling.discovery.TopologyEvent.Type;
import org.apache.sling.discovery.TopologyEventListener;
+import org.apache.sling.discovery.commons.InstancesDiff;
+import org.apache.sling.discovery.commons.InstancesDiff.InstanceCollection;
+import org.apache.sling.discovery.commons.providers.BaseTopologyView;
+import org.apache.sling.discovery.commons.providers.EventFactory;
+import org.apache.sling.discovery.commons.providers.ViewStateManager;
+import org.apache.sling.discovery.commons.providers.spi.ConsistencyService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -33,21 +47,18 @@ import org.slf4j.LoggerFactory;
* the 'view state' (changing vs changed) and sending out the appropriate
* and according TopologyEvents to the registered listeners.
* <p>
- * Note that this class is completely unsynchronized and the idea is that
- * (since this class is only of interest to other implementors/providers of
- * the discovery.api, not to users of the discovery.api however) that those
- * other implementors take care of proper synchronization. Without synchronization
- * this class is prone to threading issues! The most simple form of
- * synchronization is to synchronize all of the public (non-static..) methods
- * with the same monitor object.
+ * Note re synchronization: this class rquires a lock object to be passed
+ * in the constructor - this will be applied to all public methods
+ * appropriately. Additionally, the ConsistencyService callback will
+ * also be locked using the provided lock object.
*/
-public class ViewStateManager {
+class ViewStateManagerImpl implements ViewStateManager {
- private final Logger logger = LoggerFactory.getLogger(this.getClass());
+ private static final Logger logger = LoggerFactory.getLogger(ViewStateManagerImpl.class);
/**
* List of bound listeners that have already received their INIT event - others are in unInitializedEventListeners.
- * @see ViewStateManager#unInitializedEventListeners
+ * @see ViewStateManagerImpl#unInitializedEventListeners
*/
private List<TopologyEventListener> eventListeners = new ArrayList<TopologyEventListener>();
@@ -57,7 +68,7 @@ public class ViewStateManager {
* <p>
* This list becomes necessary for cases where the bind() happens before activate, or after activate but at a time
* when the topology is TOPOLOGY_CHANGING - at which point an TOPOLOGY_INIT event can not yet be sent.
- * @see ViewStateManager#eventListeners
+ * @see ViewStateManagerImpl#eventListeners
*/
private List<TopologyEventListener> unInitializedEventListeners = new ArrayList<TopologyEventListener>();
@@ -66,10 +77,10 @@ public class ViewStateManager {
* <p>
* This controls whether handleChanging() and handleNewView() should cause any events
* to be sent - which they do not if called before handleActivated() (or after handleDeactivated())
- * @see ViewStateManager#handleActivated()
- * @see ViewStateManager#handleChanging()
- * @see ViewStateManager#handleNewView(BaseTopologyView)
- * @see ViewStateManager#handleDeactivated()
+ * @see ViewStateManagerImpl#handleActivated()
+ * @see ViewStateManagerImpl#handleChanging()
+ * @see ViewStateManagerImpl#handleNewView(BaseTopologyView)
+ * @see ViewStateManagerImpl#handleDeactivated()
*/
private boolean activated;
@@ -91,262 +102,520 @@ public class ViewStateManager {
* When this goes false, a TOPOLOGY_CHANGED is sent.
*/
private boolean isChanging;
+
+ /**
+ * The lock object with which all public methods are guarded - to be provided in the constructor.
+ */
+ protected final Lock lock;
+
+ /**
+ * An optional ConsistencyService can be provided in the constructor which, when set, will
+ * be invoked upon a new view becoming available (in handleNewView) and the actual
+ * TOPOLOGY_CHANGED event will only be sent once the ConsistencyService.sync method
+ * does the according callback (which can be synchronous or asynchronous again).
+ */
+ private final ConsistencyService consistencyService;
/**
- * Binds the given eventListener, sending it an INIT event if applicable.
- * <p>
- * Note: no synchronization done in ViewStateManager, <b>must</b> be done externally
- * @param eventListener the eventListener that is to bind
+ * A modification counter that increments on each of the following methods:
+ * <ul>
+ * <li>handleActivated()</li>
+ * <li>handleDeactivated()</li>
+ * <li>handleChanging()</li>
+ * <li>handleNewView()</li>
+ * </ul>
+ * with the intent that - when a consistencyService is set - the callback from the
+ * ConsistencyService can check if any of the above methods was invoked - and if so,
+ * it does not send the TOPOLOGY_CHANGED event due to those new facts that happened
+ * while it was synching with the repository.
*/
- public void bind(final TopologyEventListener eventListener) {
+ private int modCnt = 0;
- logger.debug("bind: Binding TopologyEventListener {}",
- eventListener);
-
- if (eventListeners.contains(eventListener) || unInitializedEventListeners.contains(eventListener)) {
- logger.info("bind: TopologyEventListener already registered: "+eventListener);
- return;
+ /** SLING-4755 : reference to the background AsyncEventSender. Started/stopped in activate/deactivate **/
+ private AsyncEventSender asyncEventSender;
+
+ /** SLING-5030 : this map contains the event last sent to each listener to prevent duplicate CHANGING events when scheduler is broken**/
+ private Map<TopologyEventListener,TopologyEvent.Type> lastEventMap = new HashMap<TopologyEventListener, TopologyEvent.Type>();
+
+ private MinEventDelayHandler minEventDelayHandler;
+
+ /**
+ * Creates a new ViewStateManager which synchronizes each method with the given
+ * lock and which optionally uses the given ConsistencyService to sync the repository
+ * upon handling a new view where an instances leaves the local cluster.
+ * @param lock the lock to be used - must not be null
+ * @param consistencyService optional (ie can be null) - the ConsistencyService to
+ * sync the repository upon handling a new view where an instances leaves the local cluster.
+ */
+ ViewStateManagerImpl(Lock lock, ConsistencyService consistencyService) {
+ if (lock==null) {
+ throw new IllegalArgumentException("lock must not be null");
}
+ this.lock = lock;
+ this.consistencyService = consistencyService;
+ }
- if (activated) {
- // check to see in which state we are
- if (isChanging || (previousView==null)) {
- // then we cannot send the TOPOLOGY_INIT at this point - need to delay this
- unInitializedEventListeners.add(eventListener);
+ @Override
+ public void installMinEventDelayHandler(DiscoveryService discoveryService, Scheduler scheduler, long minEventDelaySecs) {
+ this.minEventDelayHandler = new MinEventDelayHandler(this, lock, discoveryService, scheduler, minEventDelaySecs);
+ }
+
+ protected boolean hadPreviousView() {
+ return previousView!=null;
+ }
+
+ protected boolean unchanged(BaseTopologyView newView) {
+ if (isChanging) {
+ return false;
+ }
+ if (previousView==null) {
+ return false;
+ }
+ return previousView.equals(newView);
+ }
+
+ /* (non-Javadoc)
+ * @see org.apache.sling.discovery.commons.providers.impl.ViewStateManager#bind(org.apache.sling.discovery.TopologyEventListener)
+ */
+ @Override
+ public void bind(final TopologyEventListener eventListener) {
+ logger.trace("bind: start {}", eventListener);
+ lock.lock();
+ try{
+ logger.debug("bind: Binding TopologyEventListener {}",
+ eventListener);
+
+ if (eventListeners.contains(eventListener) || unInitializedEventListeners.contains(eventListener)) {
+ logger.info("bind: TopologyEventListener already registered: "+eventListener);
+ return;
+ }
+
+ if (activated) {
+ // check to see in which state we are
+ if (isChanging || (previousView==null)) {
+ // then we cannot send the TOPOLOGY_INIT at this point - need to delay this
+ if (logger.isDebugEnabled()) {
+ logger.debug("bind: view is not yet/currently not defined (isChanging: "+isChanging+
+ ", previousView==null: "+(previousView==null)+
+ ", delaying INIT to "+eventListener);
+ }
+ unInitializedEventListeners.add(eventListener);
+ } else {
+ // otherwise we can send the TOPOLOGY_INIT now
+ logger.debug("bind: view is defined, sending INIT now to {}",
+ eventListener);
+ enqueue(eventListener, EventFactory.newInitEvent(previousView));
+ eventListeners.add(eventListener);
+ }
} else {
- // otherwise we can send the TOPOLOGY_INIT now
- sendEvent(eventListener, newInitEvent(previousView));
- eventListeners.add(eventListener);
+ logger.debug("bind: not yet activated, delaying INIT to {}",
+ eventListener);
+ unInitializedEventListeners.add(eventListener);
}
- } else {
- unInitializedEventListeners.add(eventListener);
+ } finally {
+ lock.unlock();
+ logger.trace("bind: end");
}
}
- /**
- * Unbinds the given eventListener, returning whether or not it was bound at all.
- * <p>
- * Note: no synchronization done in ViewStateManager, <b>must</b> be done externally
- * @param eventListener the eventListner that is to unbind
- * @return whether or not the listener was added in the first place
+ /* (non-Javadoc)
+ * @see org.apache.sling.discovery.commons.providers.impl.ViewStateManager#unbind(org.apache.sling.discovery.TopologyEventListener)
*/
+ @Override
public boolean unbind(final TopologyEventListener eventListener) {
-
- logger.debug("unbind: Releasing TopologyEventListener {}",
- eventListener);
-
- // even though a listener must always only ever exist in one of the two,
- // the unbind we do - for safety-paranoia-reasons - remove them from both
- final boolean a = eventListeners.remove(eventListener);
- final boolean b = unInitializedEventListeners.remove(eventListener);
- return a || b;
+ logger.trace("unbind: start {}", eventListener);
+ lock.lock();
+ try{
+ logger.debug("unbind: Releasing TopologyEventListener {}",
+ eventListener);
+
+ // even though a listener must always only ever exist in one of the two,
+ // the unbind we do - for safety-paranoia-reasons - remove them from both
+ final boolean a = eventListeners.remove(eventListener);
+ final boolean b = unInitializedEventListeners.remove(eventListener);
+ return a || b;
+ } finally {
+ lock.unlock();
+ logger.trace("unbind: end");
+ }
}
- /** Internal helper method that sends a given event to a list of listeners **/
- private void sendEvent(final List<TopologyEventListener> audience, final TopologyEvent event) {
- if (logger.isDebugEnabled()) {
- logger.debug("sendEvent: sending topologyEvent {}, to all ({}) listeners", event, audience.size());
+ private void enqueue(final TopologyEventListener da, final TopologyEvent event) {
+ logger.trace("enqueue: start: topologyEvent {}, to {}", event, da);
+ if (asyncEventSender==null) {
+ // this should never happen - sendTopologyEvent should only be called
+ // when activated
+ logger.warn("enqueue: asyncEventSender is null, cannot send event ({}, {})!", da, event);
+ return;
}
+ if (lastEventMap.get(da)==event.getType() && event.getType()==Type.TOPOLOGY_CHANGING) {
+ // don't sent TOPOLOGY_CHANGING twice
+ logger.debug("enqueue: listener already got TOPOLOGY_CHANGING: {}", da);
+ return;
+ }
+ logger.debug("enqueue: enqueuing topologyEvent {}, to {}", event, da);
+ asyncEventSender.enqueue(da, event);
+ lastEventMap.put(da, event.getType());
+ logger.trace("enqueue: sending topologyEvent {}, to {}", event, da);
+ }
+
+ /** Internal helper method that sends a given event to a list of listeners **/
+ private void enqueueForAll(final List<TopologyEventListener> audience, final TopologyEvent event) {
+ logger.debug("enqueueForAll: sending topologyEvent {}, to all ({}) listeners", event, audience.size());
for (Iterator<TopologyEventListener> it = audience.iterator(); it.hasNext();) {
TopologyEventListener topologyEventListener = it.next();
- sendEvent(topologyEventListener, event);
- }
- if (logger.isDebugEnabled()) {
- logger.debug("sendEvent: sent topologyEvent {}, to all ({}) listeners", event, audience.size());
+ enqueue(topologyEventListener, event);
}
+ logger.trace("enqueueForAll: sent topologyEvent {}, to all ({}) listeners", event, audience.size());
}
- /** Internal helper method that sends a given event to a particular listener **/
- private void sendEvent(final TopologyEventListener da, final TopologyEvent event) {
- if (logger.isDebugEnabled()) {
- logger.debug("sendEvent: sending topologyEvent {}, to {}", event, da);
- }
+ /* (non-Javadoc)
+ * @see org.apache.sling.discovery.commons.providers.impl.ViewStateManager#handleActivated()
+ */
+ @Override
+ public void handleActivated() {
+ logger.trace("handleActivated: start");
+ lock.lock();
try{
- da.handleTopologyEvent(event);
- } catch(final Exception e) {
- logger.warn("sendEvent: handler threw exception. handler: "+da+", exception: "+e, e);
+ logger.debug("handleActivated: activating the ViewStateManager");
+ activated = true;
+ modCnt++;
+
+ // SLING-4755 : start the asyncEventSender in the background
+ // will be stopped in deactivate (at which point
+ // all pending events will still be sent but no
+ // new events can be enqueued)
+ asyncEventSender = new AsyncEventSender();
+ Thread th = new Thread(asyncEventSender);
+ th.setName("Discovery-AsyncEventSender");
+ th.setDaemon(true);
+ th.start();
+
+ if (previousView!=null && !isChanging) {
+ enqueueForAll(unInitializedEventListeners, EventFactory.newInitEvent(previousView));
+ eventListeners.addAll(unInitializedEventListeners);
+ unInitializedEventListeners.clear();
+ }
+ logger.debug("handleActivated: activated the ViewStateManager");
+ } finally {
+ lock.unlock();
+ logger.trace("handleActivated: finally");
}
}
- /** Note: no synchronization done in ViewStateManager, <b>must</b> be done externally **/
- public void handleActivated() {
- logger.debug("handleActivated: activating the ViewStateManager");
- activated = true;
-
- if (previousView!=null && !isChanging) {
- sendEvent(unInitializedEventListeners, newInitEvent(previousView));
- eventListeners.addAll(unInitializedEventListeners);
- unInitializedEventListeners.clear();
- }
- logger.debug("handleActivated: activated the ViewStateManager");
- }
+ /* (non-Javadoc)
+ * @see org.apache.sling.discovery.commons.providers.impl.ViewStateManager#handleDeactivated()
+ */
+ @Override
+ public void handleDeactivated() {
+ logger.trace("handleDeactivated: start");
+ lock.lock();
+ try{
+ logger.debug("handleDeactivated: deactivating the ViewStateManager");
+ activated = false;
+ modCnt++;
+
+ if (asyncEventSender!=null) {
+ // it should always be not-null though
+ asyncEventSender.flushThenStop();
+ asyncEventSender = null;
+ }
- /** Simple factory method for creating a TOPOLOGY_INIT event with the given newView **/
- public static TopologyEvent newInitEvent(final BaseTopologyView newView) {
- if (newView==null) {
- throw new IllegalStateException("newView must not be null");
- }
- if (!newView.isCurrent()) {
- throw new IllegalStateException("newView must be current");
+ if (previousView!=null) {
+ previousView.setNotCurrent();
+ logger.trace("handleDeactivated: setting previousView to null");
+ previousView = null;
+ }
+ logger.trace("handleDeactivated: setting isChanging to false");
+ isChanging = false;
+
+ eventListeners.clear();
+ unInitializedEventListeners.clear();
+ logger.debug("handleDeactivated: deactivated the ViewStateManager");
+ } finally {
+ lock.unlock();
+ logger.trace("handleDeactivated: finally");
}
- return new TopologyEvent(Type.TOPOLOGY_INIT, null, newView);
}
- /** Simple factory method for creating a TOPOLOGY_CHANGING event with the given oldView **/
- public static TopologyEvent newChangingEvent(final BaseTopologyView oldView) {
- if (oldView==null) {
- throw new IllegalStateException("oldView must not be null");
- }
- if (oldView.isCurrent()) {
- throw new IllegalStateException("oldView must not be current");
+ /* (non-Javadoc)
+ * @see org.apache.sling.discovery.commons.providers.impl.ViewStateManager#handleChanging()
+ */
+ @Override
+ public void handleChanging() {
+ logger.trace("handleChanging: start");
+ lock.lock();
+ try{
+
+ if (isChanging) {
+ // if isChanging: then this is no news
+ // hence: return asap
+ logger.debug("handleChanging: was already changing - ignoring.");
+ return;
+ }
+ modCnt++;
+
+ // whether activated or not: set isChanging to true now
+ logger.trace("handleChanging: setting isChanging to true");
+ isChanging = true;
+
+ if (!activated) {
+ // if not activated: we can only start sending events once activated
+ // hence returning here - after isChanging was set to true accordingly
+
+ // note however, that if !activated, there should be no eventListeners yet
+ // all of them should be in unInitializedEventListeners at the moment
+ // waiting for activate() and handleNewTopologyView
+ logger.debug("handleChanging: not yet activated - ignoring.");
+ return;
+ }
+
+ if (previousView==null) {
+ // then nothing further to do - this is a very early changing event
+ // before even the first view was available
+ logger.debug("handleChanging: no previousView set - ignoring.");
+ return;
+ }
+
+ logger.debug("handleChanging: sending TOPOLOGY_CHANGING to initialized listeners");
+ previousView.setNotCurrent();
+ enqueueForAll(eventListeners, EventFactory.newChangingEvent(previousView));
+ } finally {
+ lock.unlock();
+ logger.trace("handleChanging: finally");
}
- return new TopologyEvent(Type.TOPOLOGY_CHANGING, oldView, null);
}
- /** Simple factory method for creating a TOPOLOGY_CHANGED event with the given old and new views **/
- public static TopologyEvent newChangedEvent(final BaseTopologyView oldView, final BaseTopologyView newView) {
- if (oldView==null) {
- throw new IllegalStateException("oldView must not be null");
- }
- if (oldView.isCurrent()) {
- throw new IllegalStateException("oldView must not be current");
- }
+ /* (non-Javadoc)
+ * @see org.apache.sling.discovery.commons.providers.impl.ViewStateManager#handleNewView(org.apache.sling.discovery.commons.providers.BaseTopologyView)
+ */
+ @Override
+ public void handleNewView(final BaseTopologyView newView) {
+ logger.trace("handleNewView: start, newView={}", newView);
if (newView==null) {
- throw new IllegalStateException("newView must not be null");
+ throw new IllegalArgumentException("newView must not be null");
}
if (!newView.isCurrent()) {
- throw new IllegalStateException("newView must be current");
+ logger.debug("handleNewView: newView is not current - calling handleChanging.");
+ handleChanging();
+ return;// false;
}
- return new TopologyEvent(Type.TOPOLOGY_CHANGED, oldView, newView);
+ // paranoia-testing:
+ InstanceDescription localInstance = newView.getLocalInstance();
+ if (localInstance==null) {
+ throw new IllegalStateException("newView does not contain the local instance - hence cannot be current");
+ }
+ if (!localInstance.isLocal()) {
+ throw new IllegalStateException("newView's local instance is not isLocal - very unexpected - hence cannot be current");
+ }
+ logger.debug("handleNewView: newView is current, so trying with minEventDelayHandler...");
+ if (minEventDelayHandler!=null) {
+ if (minEventDelayHandler.handlesNewView(newView)) {
+ return;// true;
+ }
+ }
+ logger.debug("handleNewView: minEventDelayHandler not set or not applicable this time, invoking hanldeNewViewNonDelayed...");
+ /*return */handleNewViewNonDelayed(newView);
}
- /**
- * Must be called when the corresponding service (typically a DiscoveryService implementation)
- * is deactivated.
- * <p>
- * Will mark this manager as deactivated and flags the last available view as not current.
- * <p>
- * Note: no synchronization done in ViewStateManager, <b>must</b> be done externally
- */
- public void handleDeactivated() {
- logger.debug("handleDeactivated: deactivating the ViewStateManager");
- activated = false;
-
- if (previousView!=null) {
- previousView.setNotCurrent();
- previousView = null;
+ boolean handleNewViewNonDelayed(final BaseTopologyView newView) {
+ logger.trace("handleNewViewNonDelayed: start");
+ lock.lock();
+ try{
+ logger.debug("handleNewViewNonDelayed: start, newView={}", newView);
+ if (!newView.isCurrent()) {
+ logger.error("handleNewViewNonDelayed: newView must be current");
+ throw new IllegalArgumentException("newView must be current");
+ }
+ modCnt++;
+
+ if (!isChanging) {
+ // verify if there is actually a change between previousView and newView
+ // if there isn't, then there is not much point in sending a CHANGING/CHANGED tuple
+ // at all
+ if (previousView!=null && previousView.equals(newView)) {
+ // then nothing to send - the view has not changed, and we haven't
+ // sent the CHANGING event - so we should not do anything here
+ logger.debug("handleNewViewNonDelayed: we were not in changing state and new view matches old, so - ignoring");
+ return false;
+ }
+ logger.debug("handleNewViewNonDelayed: implicitly triggering a handleChanging as we were not in changing state");
+ handleChanging();
+ logger.debug("handleNewViewNonDelayed: implicitly triggering of a handleChanging done");
+ }
+
+ if (!activated) {
+ // then all we can do is to pass this on to previoueView
+ logger.trace("handleNewViewNonDelayed: setting previousView to {}", newView);
+ previousView = newView;
+ // plus set the isChanging flag to false
+ logger.trace("handleNewViewNonDelayed: setting isChanging to false");
+ isChanging = false;
+
+ // other than that, we can't currently send any event, before activate
+ logger.debug("handleNewViewNonDelayed: not yet activated - ignoring");
+ return true;
+ }
+
+ // now check if the view indeed changed or if it was just the properties
+ if (!isChanging && isPropertiesDiff(newView)) {
+ // well then send a properties changed event only
+ // and that one does not go via consistencyservice
+ logger.info("handleNewViewNonDelayed: properties changed to: "+newView);
+ previousView.setNotCurrent();
+ enqueueForAll(eventListeners, EventFactory.newPropertiesChangedEvent(previousView, newView));
+ logger.trace("handleNewViewNonDelayed: setting previousView to {}", newView);
+ previousView = newView;
+ return true;
+ }
+
+ final boolean invokeConsistencyService;
+ if (consistencyService==null) {
+ logger.debug("handleNewViewNonDelayed: no consistencyService set - continuing directly.");
+ invokeConsistencyService = false;
+ } else if (previousView==null) {
+ // when there was no previous view, we cannot determine if
+ // any instance left
+ // so for safety reason: always invoke the consistencyservice
+ logger.debug("handleNewViewNonDelayed: no previousView set - invoking consistencyService");
+ invokeConsistencyService = true;
+ } else {
+ final InstancesDiff diff = new InstancesDiff(previousView, newView);
+ InstanceCollection removed = diff.removed();
+// Collection<InstanceDescription> c = removed.get();
+// Iterator<InstanceDescription> it = c.iterator();
+// while(it.hasNext()) {
+// logger.info("handleNewViewNonDelayed: removed: "+it.next());
+// }
+ InstanceCollection inClusterView = removed.
+ isInClusterView(newView.getLocalInstance().getClusterView());
+// c = removed.get();
+// it = c.iterator();
+// while(it.hasNext()) {
+// logger.info("handleNewViewNonDelayed: inClusterView: "+it.next());
+// }
+ final boolean anyInstanceLeftLocalCluster = inClusterView.
+ get().size()>0;
+ if (anyInstanceLeftLocalCluster) {
+ logger.debug("handleNewViewNonDelayed: anyInstanceLeftLocalCluster=true, hence invoking consistencyService next");
+ } else {
+ logger.debug("handleNewViewNonDelayed: anyInstanceLeftLocalCluster=false - continuing directly.");
+ }
+ invokeConsistencyService = anyInstanceLeftLocalCluster;
+ }
+
+ if (invokeConsistencyService) {
+ // if "instances from the local cluster have been removed"
+ // then:
+ // run the set consistencyService
+ final int lastModCnt = modCnt;
+ logger.debug("handleNewViewNonDelayed: invoking consistencyService (modCnt={})", modCnt);
+ consistencyService.sync(newView,
+ new Runnable() {
+
+ public void run() {
+ logger.trace("consistencyService.callback.run: start. acquiring lock...");
+ lock.lock();
+ try{
+ logger.debug("consistencyService.callback.run: lock aquired. (modCnt should be {}, is {})", lastModCnt, modCnt);
+ if (modCnt!=lastModCnt) {
+ logger.debug("consistencyService.callback.run: modCnt changed (from {} to {}) - ignoring",
+ lastModCnt, modCnt);
+ return;
+ }
+ // else:
+ doHandleConsistent(newView);
+ } finally {
+ lock.unlock();
+ logger.trace("consistencyService.callback.run: end.");
+ }
+ }
+
+ });
+ } else {
+ // otherwise we're either told not to use any ConsistencyService
+ // or using it is not applicable at this stage - so continue
+ // with sending the TOPOLOGY_CHANGED (or TOPOLOGY_INIT if there
+ // are any newly bound topology listeners) directly
+ doHandleConsistent(newView);
+ }
+ logger.debug("handleNewViewNonDelayed: end");
+ return true;
+ } finally {
+ lock.unlock();
+ logger.trace("handleNewViewNonDelayed: finally");
}
- isChanging = false;
-
- eventListeners.clear();
- unInitializedEventListeners.clear();
- logger.debug("handleDeactivated: deactivated the ViewStateManager");
}
-
- /** Note: no synchronization done in ViewStateManager, <b>must</b> be done externally **/
- public void handleChanging() {
- logger.debug("handleChanging: start");
- if (isChanging) {
- // if isChanging: then this is no news
- // hence: return asap
- logger.debug("handleChanging: was already changing - ignoring.");
- return;
+ protected boolean isPropertiesDiff(BaseTopologyView newView) {
+ if (previousView==null) {
+ return false;
}
-
- // whether activated or not: set isChanging to true now
- isChanging = true;
-
- if (!activated) {
- // if not activated: we can only start sending events once activated
- // hence returning here - after isChanging was set to true accordingly
-
- // note however, that if !activated, there should be no eventListeners yet
- // all of them should be in unInitializedEventListeners at the moment
- // waiting for activate() and handleNewTopologyView
- logger.debug("handleChanging: not yet activated - ignoring.");
- return;
+ if (newView==null) {
+ throw new IllegalArgumentException("newView must not be null");
}
-
- if (previousView==null) {
- // then nothing further to do - this is a very early changing event
- // before even the first view was available
- logger.debug("handleChanging: no previousView set - ignoring.");
- return;
+ if (previousView.getInstances().size()!=newView.getInstances().size()) {
+ return false;
}
-
- logger.debug("handleChanging: sending TOPOLOGY_CHANGING to initialized listeners");
- previousView.setNotCurrent();
- sendEvent(eventListeners, newChangingEvent(previousView));
- logger.debug("handleChanging: end");
- }
-
- /** Note: no synchronization done in ViewStateManager, <b>must</b> be done externally **/
- public void handleNewView(BaseTopologyView newView) {
- if (logger.isDebugEnabled()) {
- logger.debug("handleNewView: start, newView={}", newView);
+ if (previousView.equals(newView)) {
+ return false;
}
- if (!newView.isCurrent()) {
- logger.error("handleNewView: newView must be current");
- throw new IllegalArgumentException("newView must be current");
+ Set<String> newIds = new HashSet<String>();
+ for(InstanceDescription newInstance : newView.getInstances()) {
+ newIds.add(newInstance.getSlingId());
}
- if (!isChanging) {
- // verify if there is actually a change between previousView and newView
- // if there isn't, then there is not much point in sending a CHANGING/CHANGED tuple
- // at all
- if (previousView!=null && previousView.equals(newView)) {
- // then nothing to send - the view has not changed, and we haven't
- // sent the CHANGING event - so we should not do anything here
- logger.debug("handleNewView: we were not in changing state and new view matches old, so - ignoring");
- return;
+ for(InstanceDescription oldInstance : previousView.getInstances()) {
+ if (!newIds.contains(oldInstance.getSlingId())) {
+ return false;
}
- logger.debug("handleNewView: simulating a handleChanging as we were not in changing state");
- handleChanging();
- logger.debug("handleNewView: simulation of a handleChanging done");
}
+ return true;
+ }
- // whether activated or not: set isChanging to false, first thing
- isChanging = false;
-
- if (!activated) {
- // then all we can do is to pass this on to previoueView
- previousView = newView;
- // other than that, we can't currently send any event, before activate
- logger.debug("handleNewView: not yet activated - ignoring");
- return;
- }
+ private void doHandleConsistent(BaseTopologyView newView) {
+ logger.trace("doHandleConsistent: start");
+ // unset the isChanging flag
+ logger.trace("doHandleConsistent: setting isChanging to false");
+ isChanging = false;
+
if (previousView==null) {
// this is the first time handleNewTopologyView is called
if (eventListeners.size()>0) {
- logger.info("handleNewTopologyView: no previous view available even though listeners already got CHANGED event");
+ logger.info("doHandleConsistent: no previous view available even though listeners already got CHANGED event");
+ } else {
+ logger.debug("doHandleConsistent: no previous view and there are no event listeners yet. very quiet.");
}
-
+
// otherwise this is the normal case where there are uninitialized event listeners waiting below
-
+
} else {
- logger.debug("handleNewView: sending TOPOLOGY_CHANGED to initialized listeners");
+ logger.debug("doHandleConsistent: sending TOPOLOGY_CHANGED to initialized listeners");
previousView.setNotCurrent();
- sendEvent(eventListeners, newChangedEvent(previousView, newView));
+ enqueueForAll(eventListeners, EventFactory.newChangedEvent(previousView, newView));
}
if (unInitializedEventListeners.size()>0) {
// then there were bindTopologyEventListener calls coming in while
// we were in CHANGING state - so we must send those the INIT they were
// waiting for oh so long
- if (logger.isDebugEnabled()) {
- logger.debug("handleNewView: sending TOPOLOGY_INIT to uninitialized listeners ({})",
- unInitializedEventListeners.size());
- }
- sendEvent(unInitializedEventListeners, newInitEvent(newView));
+ logger.debug("doHandleConsistent: sending TOPOLOGY_INIT to uninitialized listeners ({})",
+ unInitializedEventListeners.size());
+ enqueueForAll(unInitializedEventListeners, EventFactory.newInitEvent(newView));
eventListeners.addAll(unInitializedEventListeners);
unInitializedEventListeners.clear();
}
+ logger.trace("doHandleConsistent: setting previousView to {}", newView);
previousView = newView;
- logger.debug("handleNewView: end");
+ logger.trace("doHandleConsistent: end");
+ }
+
+ /** get-hook for testing only! **/
+ AsyncEventSender getAsyncEventSender() {
+ return asyncEventSender;
}
}
Added: sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/spi/ConsistencyService.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/spi/ConsistencyService.java?rev=1707548&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/spi/ConsistencyService.java (added)
+++ sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/spi/ConsistencyService.java Thu Oct 8 14:31:45 2015
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.discovery.commons.providers.spi;
+
+import org.apache.sling.discovery.commons.providers.BaseTopologyView;
+
+/**
+ * The ConsistencyService can be used to establish strong
+ * consistency with the underlying (eventually consistent) repository in use.
+ * <p>
+ * The issue is described in length in SLING-4627 - the short
+ * version is composed of two different factors:
+ * <ul>
+ * <li>concurrency of discovery service and its listeners on the
+ * different instances: upon a change in the topology it is
+ * important that one listener doesn't do activity based on
+ * an older incarnation of the topologyView than another listener
+ * on another instance. they should change from one view to the
+ * next view based on the same repository state.</li>
+ * </li>
+ * <li>when an instance leaves the cluster (eg crashes), then
+ * depending on the repository it might have left a backlog around
+ * which would yet have to be processed and which could contain
+ * relevant topology-dependent data that should be waited for
+ * to settle before the topology-dependent activity can continue
+ * </li>
+ * </ul>
+ * Both of these two aspects are handled by this ConsistencyService.
+ * The former one by introducing a 'sync token' that gets written
+ * to the repository and on receiving it by the peers they know
+ * that the writing instance is aware of the ongoing change, that
+ * the writing instance has sent out TOPOLOGY_CHANGING and that
+ * the receiving instance has seen all changes that the writing
+ * instance did prior to sending a TOPOLOGY_CHANGING.
+ * The latter aspect is achieved by making use of the underlying
+ * repository: eg on Oak the 'discovery lite' descriptor is
+ * used to determine if any instance not part of the new view
+ * is still being deactivated (eg has backlog). So this second
+ * part is repository dependent.
+ */
+public interface ConsistencyService {
+
+ /**
+ * Starts the synchronization process and calls the provided
+ * callback upon completion.
+ * <p>
+ * sync() is not thread-safe and should not be invoked
+ * concurrently.
+ * <p>
+ * If sync() gets called before a previous invocation finished,
+ * that previous invocation will be discarded, ie the callback
+ * of the previous invocation will no longer be called.
+ * <p>
+ * The synchronization process consists of making sure that
+ * the repository has processed any potential backlog of instances
+ * that are no longer part of the provided, new view. Plus
+ * it writes a 'sync-token' to a well-defined location, with
+ * all peers doing the same, and upon seeing all other sync-tokens
+ * declares successful completion - at which point it calls the
+ * callback.run().
+ * @param view the view which all instances in the local cluster
+ * should agree on having seen
+ * @param callback the runnable which should be called after
+ * successful syncing
+ */
+ void sync(BaseTopologyView view, Runnable callback);
+
+}
Propchange: sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/spi/ConsistencyService.java
------------------------------------------------------------------------------
svn:eol-style = native