You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by cz...@apache.org on 2013/05/13 16:17:28 UTC
svn commit: r1481862 - in
/sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl:
DiscoveryServiceImpl.java cluster/ClusterViewServiceImpl.java
Author: cziegeler
Date: Mon May 13 14:17:27 2013
New Revision: 1481862
URL: http://svn.apache.org/r1481862
Log:
SLING-2868 : Improve and cleanup implementation : Simplify locking in DiscoveryViewServiceImpl and DiscoveryServiceImpl
Modified:
sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/DiscoveryServiceImpl.java
sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/cluster/ClusterViewServiceImpl.java
Modified: sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/DiscoveryServiceImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/DiscoveryServiceImpl.java?rev=1481862&r1=1481861&r2=1481862&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/DiscoveryServiceImpl.java (original)
+++ sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/DiscoveryServiceImpl.java Mon May 13 14:17:27 2013
@@ -23,14 +23,13 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
-import java.util.UUID;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
@@ -122,7 +121,7 @@ public class DiscoveryServiceImpl implem
private String slingId;
/** the old view previously valid and sent to the TopologyEventListeners **/
- private TopologyViewImpl oldView = null;
+ private TopologyViewImpl oldView;
/** whether or not there is a delayed event sending pending **/
private boolean delayedEventPending = false;
@@ -184,14 +183,14 @@ public class DiscoveryServiceImpl implem
logger.debug("DiscoveryServiceImpl activated.");
}
-
- private void sendTopologyEvent(TopologyEventListener da, TopologyEvent event) {
+
+ private void sendTopologyEvent(final TopologyEventListener da, final TopologyEvent event) {
if (logger.isDebugEnabled()) {
- logger.debug("sendTopologyEvent: sending topologyEvent "+event+", to "+da);
+ logger.debug("sendTopologyEvent: sending topologyEvent {}, to {}", event, da);
}
try{
da.handleTopologyEvent(event);
- } catch(Exception e) {
+ } catch(final Exception e) {
logger.warn("sendTopologyEvent: handler threw exception. handler: "+da+", exception: "+e, e);
}
}
@@ -208,7 +207,7 @@ public class DiscoveryServiceImpl implem
}
/**
- * bind a discovery aware
+ * bind a topology event listener
*/
protected void bindTopologyEventListener(final TopologyEventListener eventListener) {
@@ -217,7 +216,7 @@ public class DiscoveryServiceImpl implem
boolean activated = false;
synchronized (lock) {
- List<TopologyEventListener> currentList = new ArrayList<TopologyEventListener>(
+ final List<TopologyEventListener> currentList = new ArrayList<TopologyEventListener>(
Arrays.asList(eventListeners));
currentList.add(eventListener);
this.eventListeners = currentList
@@ -232,17 +231,17 @@ public class DiscoveryServiceImpl implem
}
/**
- * Unbind a discovery aware
+ * Unbind a topology event listener
*/
- protected void unbindTopologyEventListener(final TopologyEventListener clusterAware) {
+ protected void unbindTopologyEventListener(final TopologyEventListener eventListener) {
logger.debug("unbindTopologyEventListener: Releasing TopologyEventListener {}",
- clusterAware);
+ eventListener);
synchronized (lock) {
- List<TopologyEventListener> currentList = new ArrayList<TopologyEventListener>(
+ final List<TopologyEventListener> currentList = new ArrayList<TopologyEventListener>(
Arrays.asList(eventListeners));
- currentList.remove(clusterAware);
+ currentList.remove(eventListener);
this.eventListeners = currentList
.toArray(new TopologyEventListener[currentList.size()]);
}
@@ -251,79 +250,65 @@ public class DiscoveryServiceImpl implem
/**
* Bind a new property provider.
*/
- private void bindPropertyProvider(final PropertyProvider propertyProvider,
- final Map<String, Object> props) {
+ protected void bindPropertyProvider(final PropertyProvider propertyProvider,
+ final Map<String, Object> props) {
logger.debug("bindPropertyProvider: Binding PropertyProvider {}",
propertyProvider);
- final TopologyEventListener[] awares;
synchronized (lock) {
- final ProviderInfo info = new ProviderInfo(propertyProvider, props);
- this.providerInfos.add(info);
- Collections.sort(this.providerInfos);
- this.doUpdateProperties();
- if (activated) {
- awares = this.eventListeners;
- } else {
- awares = null;
- }
- }
- if (awares != null) {
- logger.debug("bindPropertyProvider: calling handlePotentialTopologyChange.");
- handlePotentialTopologyChange();
+ this.bindPropertyProviderInteral(propertyProvider, props);
}
}
/**
- * Update a property provider.
+ * Bind a new property provider.
*/
- @SuppressWarnings("unused")
- private void updatedPropertyProvider(
- final PropertyProvider propertyProvider,
+ private void bindPropertyProviderInteral(final PropertyProvider propertyProvider,
final Map<String, Object> props) {
+ final ProviderInfo info = new ProviderInfo(propertyProvider, props);
+ this.providerInfos.add(info);
+ Collections.sort(this.providerInfos);
+ this.doUpdateProperties();
+ handlePotentialTopologyChange();
+ }
+
+ /**
+ * Update a property provider.
+ */
+ protected void updatedPropertyProvider(final PropertyProvider propertyProvider,
+ final Map<String, Object> props) {
logger.debug("bindPropertyProvider: Updating PropertyProvider {}",
propertyProvider);
- this.unbindPropertyProvider(propertyProvider, props, false);
- this.bindPropertyProvider(propertyProvider, props);
- if (heartbeatHandler!=null) {
- heartbeatHandler.triggerHeartbeat();
+ synchronized (lock) {
+ this.unbindPropertyProviderInternal(propertyProvider, props, false);
+ this.bindPropertyProviderInteral(propertyProvider, props);
}
}
/**
* Unbind a property provider
*/
- @SuppressWarnings("unused")
- private void unbindPropertyProvider(
- final PropertyProvider propertyProvider,
- final Map<String, Object> props) {
- this.unbindPropertyProvider(propertyProvider, props, true);
+ protected void unbindPropertyProvider(final PropertyProvider propertyProvider,
+ final Map<String, Object> props) {
+ logger.debug("unbindPropertyProvider: Releasing PropertyProvider {}",
+ propertyProvider);
+ synchronized (lock) {
+ this.unbindPropertyProviderInternal(propertyProvider, props, true);
+ }
}
/**
* Unbind a property provider
*/
- private void unbindPropertyProvider(
+ private void unbindPropertyProviderInternal(
final PropertyProvider propertyProvider,
- final Map<String, Object> props, final boolean inform) {
- logger.debug("unbindPropertyProvider: Releasing PropertyProvider {}",
- propertyProvider);
+ final Map<String, Object> props, final boolean update) {
- final TopologyEventListener[] awares;
- synchronized (lock) {
- final ProviderInfo info = new ProviderInfo(propertyProvider, props);
- this.providerInfos.remove(info);
+ final ProviderInfo info = new ProviderInfo(propertyProvider, props);
+ if ( this.providerInfos.remove(info) && update ) {
this.doUpdateProperties();
- if (activated) {
- awares = this.eventListeners;
- } else {
- awares = null;
- }
- }
- if (inform && awares != null) {
- logger.debug("unbindPropertyProvider: calling handlePotentialTopologyChange.");
- handlePotentialTopologyChange();
+ this.handlePotentialTopologyChange();
}
}
@@ -361,12 +346,10 @@ public class DiscoveryServiceImpl implem
resourceResolver,
config.getClusterInstancesPath()
+ "/" + slingId + "/properties");
-
+
final ModifiableValueMap myInstanceMap = myInstance.adaptTo(ModifiableValueMap.class);
final Set<String> keys = new HashSet<String>(myInstanceMap.keySet());
- final Iterator<String> it1 = keys.iterator();
- while(it1.hasNext()) {
- final String key = it1.next();
+ for(final String key : keys) {
if (newProps.containsKey(key)) {
// perfect
continue;
@@ -379,12 +362,9 @@ public class DiscoveryServiceImpl implem
}
}
- for (Iterator<Entry<String, String>> it2 = newProps.entrySet()
- .iterator(); it2.hasNext();) {
- Entry<String, String> entry = it2.next();
+ for(final Entry<String, String> entry : newProps.entrySet()) {
if (logger.isDebugEnabled()) {
- logger.debug("doUpdateProperties: " + entry.getKey() + "="
- + entry.getValue());
+ logger.debug("doUpdateProperties: {}={}", entry.getKey(), entry.getValue());
}
myInstanceMap.put(entry.getKey(), entry.getValue());
}
@@ -453,125 +433,113 @@ public class DiscoveryServiceImpl implem
* oldView/newView as well.
*/
private void handlePotentialTopologyChange() {
-
- synchronized (lock) {
- if (!activated) {
- // ignore this call then - an early call to issue
- // a topologyevent before even activated
- logger.debug("handlePotentialTopologyChange: ignoring early change before activate finished.");
- return;
- }
- if (delayedEventPending) {
- logger.debug("handlePotentialTopologyChange: ignoring potential change since a delayed event is pending.");
- return;
- }
- if (oldView == null) {
- throw new IllegalStateException("oldView must not be null");
- }
- TopologyViewImpl newView = (TopologyViewImpl) getTopology();
- TopologyViewImpl oldView = this.oldView;
+ if (!activated) {
+ // ignore this call then - an early call to issue
+ // a topologyevent before even activated
+ logger.debug("handlePotentialTopologyChange: ignoring early change before activate finished.");
+ return;
+ }
+ if (delayedEventPending) {
+ logger.debug("handlePotentialTopologyChange: ignoring potential change since a delayed event is pending.");
+ return;
+ }
+ if (oldView == null) {
+ throw new IllegalStateException("oldView must not be null");
+ }
+ TopologyViewImpl newView = (TopologyViewImpl) getTopology();
+ TopologyViewImpl oldView = this.oldView;
- Type difference = newView.compareTopology(oldView);
- if (difference == null) {
- // then dont send any event then
- logger.debug("handlePotentialTopologyChange: identical views. not informing listeners.");
- return;
- } else {
- if (logger.isDebugEnabled()) {
- logger.debug("handlePotentialTopologyChange: difference: "+difference+
- ", oldView="+oldView+", newView="+newView);
- }
+ Type difference = newView.compareTopology(oldView);
+ if (difference == null) {
+ // then dont send any event then
+ logger.debug("handlePotentialTopologyChange: identical views. not informing listeners.");
+ return;
+ } else {
+ if (logger.isDebugEnabled()) {
+ logger.debug("handlePotentialTopologyChange: difference: {}, oldView={}, newView={}",
+ new Object[] {difference, oldView, newView});
}
+ }
- oldView.markOld();
- if (difference!=Type.TOPOLOGY_CHANGED) {
- for (final TopologyEventListener da : eventListeners) {
- sendTopologyEvent(da, new TopologyEvent(difference, oldView,
- newView));
- }
- } else { // TOPOLOGY_CHANGED
-
- // send a TOPOLOGY_CHANGING first
- for (final TopologyEventListener da : eventListeners) {
- sendTopologyEvent(da, new TopologyEvent(Type.TOPOLOGY_CHANGING, oldView,
- null));
- }
-
- if (config.getMinEventDelay()>0) {
- // then delay the sending of the next event
- logger.debug("handlePotentialTopologyChange: delaying event sending to avoid event flooding");
-
- if (runAfter(config.getMinEventDelay() /*seconds*/ , new Runnable() {
-
- public void run() {
- synchronized(lock) {
- delayedEventPending = false;
- logger.debug("handlePotentialTopologyChange: sending delayed event now");
- if (!activated) {
- logger.debug("handlePotentialTopologyChange: no longer activated. not sending delayed event");
- return;
- }
- final TopologyViewImpl newView = (TopologyViewImpl) getTopology();
- // irrespective of the difference, send the latest topology
- // via a topology_changed event (since we already sent a changing)
- for (final TopologyEventListener da : eventListeners) {
- sendTopologyEvent(da, new TopologyEvent(Type.TOPOLOGY_CHANGED,
- DiscoveryServiceImpl.this.oldView, newView));
- }
- DiscoveryServiceImpl.this.oldView = newView;
- }
- if (heartbeatHandler!=null) {
- // trigger a heartbeat 'now' to pass it on to the topology asap
- heartbeatHandler.triggerHeartbeat();
+ oldView.markOld();
+ if (difference!=Type.TOPOLOGY_CHANGED) {
+ for (final TopologyEventListener da : eventListeners) {
+ sendTopologyEvent(da, new TopologyEvent(difference, oldView,
+ newView));
+ }
+ } else { // TOPOLOGY_CHANGED
+
+ // send a TOPOLOGY_CHANGING first
+ for (final TopologyEventListener da : eventListeners) {
+ sendTopologyEvent(da, new TopologyEvent(Type.TOPOLOGY_CHANGING, oldView,
+ null));
+ }
+
+ if (config.getMinEventDelay()>0) {
+ // then delay the sending of the next event
+ logger.debug("handlePotentialTopologyChange: delaying event sending to avoid event flooding");
+
+ if (runAfter(config.getMinEventDelay() /*seconds*/ , new Runnable() {
+
+ public void run() {
+ synchronized(lock) {
+ delayedEventPending = false;
+ logger.debug("handlePotentialTopologyChange: sending delayed event now");
+ if (!activated) {
+ logger.debug("handlePotentialTopologyChange: no longer activated. not sending delayed event");
+ return;
+ }
+ final TopologyViewImpl newView = (TopologyViewImpl) getTopology();
+ // irrespective of the difference, send the latest topology
+ // via a topology_changed event (since we already sent a changing)
+ for (final TopologyEventListener da : eventListeners) {
+ sendTopologyEvent(da, new TopologyEvent(Type.TOPOLOGY_CHANGED,
+ DiscoveryServiceImpl.this.oldView, newView));
}
+ DiscoveryServiceImpl.this.oldView = newView;
+ }
+ if (heartbeatHandler!=null) {
+ // trigger a heartbeat 'now' to pass it on to the topology asap
+ heartbeatHandler.triggerHeartbeat();
}
- })) {
- delayedEventPending = true;
- logger.debug("handlePotentialTopologyChange: delaying of event triggered.");
- return;
- } else {
- logger.debug("handlePotentialTopologyChange: delaying did not work for some reason.");
}
- }
-
- // otherwise, send the TOPOLOGY_CHANGED now
- for (final TopologyEventListener da : eventListeners) {
- sendTopologyEvent(da, new TopologyEvent(Type.TOPOLOGY_CHANGED, oldView,
- newView));
+ })) {
+ delayedEventPending = true;
+ logger.debug("handlePotentialTopologyChange: delaying of event triggered.");
+ return;
+ } else {
+ logger.debug("handlePotentialTopologyChange: delaying did not work for some reason.");
}
- }
+ }
- this.oldView = newView;
+ // otherwise, send the TOPOLOGY_CHANGED now
+ for (final TopologyEventListener da : eventListeners) {
+ sendTopologyEvent(da, new TopologyEvent(Type.TOPOLOGY_CHANGED, oldView,
+ newView));
+ }
}
+
+ this.oldView = newView;
if (heartbeatHandler!=null) {
// trigger a heartbeat 'now' to pass it on to the topology asap
heartbeatHandler.triggerHeartbeat();
}
}
- /**
+ /**
* run the runnable after the indicated number of seconds, once.
- * @return true if the scheduling of the runnable worked, false otherwise
+ * @return true if the scheduling of the runnable worked, false otherwise
*/
private boolean runAfter(int seconds, final Runnable runnable) {
- if (scheduler==null) {
+ final Scheduler theScheduler = scheduler;
+ if (theScheduler == null) {
logger.info("runAfter: no scheduler set");
return false;
}
logger.debug("runAfter: trying with scheduler.fireJob");
- final String id = UUID.randomUUID().toString();
+ final Date date = new Date(System.currentTimeMillis() + seconds * 1000);
try {
- final Scheduler theScheduler = scheduler;
- scheduler.addPeriodicJob(id, new Runnable() {
-
- public void run() {
- try{
- runnable.run();
- } finally {
- theScheduler.removeJob(id);
- }
- }
- }, null, seconds, false);
+ theScheduler.fireJobAt(null, runnable, null, date);
return true;
} catch (Exception e) {
logger.info("runAfter: could not schedule a job: "+e);
@@ -585,7 +553,7 @@ public class DiscoveryServiceImpl implem
private final static class ProviderInfo implements Comparable<ProviderInfo> {
public final PropertyProvider provider;
- public final Map<String, Object> serviceProps;
+ public final Object propertyProperties;
public final int ranking;
public final long serviceId;
public final Map<String, String> properties = new HashMap<String, String>();
@@ -593,7 +561,7 @@ public class DiscoveryServiceImpl implem
public ProviderInfo(final PropertyProvider provider,
final Map<String, Object> serviceProps) {
this.provider = provider;
- this.serviceProps = serviceProps;
+ this.propertyProperties = serviceProps.get(PropertyProvider.PROPERTY_PROPERTIES);
final Object sr = serviceProps.get(Constants.SERVICE_RANKING);
if (sr == null || !(sr instanceof Integer)) {
this.ranking = 0;
@@ -606,15 +574,13 @@ public class DiscoveryServiceImpl implem
public void refreshProperties() {
properties.clear();
- final Object namesObj = serviceProps
- .get(PropertyProvider.PROPERTY_PROPERTIES);
- if (namesObj instanceof String) {
- final String val = provider.getProperty((String) namesObj);
+ if (this.propertyProperties instanceof String) {
+ final String val = provider.getProperty((String) this.propertyProperties);
if (val != null) {
- this.properties.put((String) namesObj, val);
+ this.properties.put((String) this.propertyProperties, val);
}
- } else if (namesObj instanceof String[]) {
- for (final String name : (String[]) namesObj) {
+ } else if (this.propertyProperties instanceof String[]) {
+ for (final String name : (String[]) this.propertyProperties) {
final String val = provider.getProperty(name);
if (val != null) {
this.properties.put(name, val);
@@ -656,7 +622,9 @@ public class DiscoveryServiceImpl implem
*/
public void handleTopologyChanged() {
logger.debug("handleTopologyChanged: calling handlePotentialTopologyChange.");
- handlePotentialTopologyChange();
+ synchronized ( this.lock ) {
+ handlePotentialTopologyChange();
+ }
}
}
Modified: sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/cluster/ClusterViewServiceImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/cluster/ClusterViewServiceImpl.java?rev=1481862&r1=1481861&r2=1481862&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/cluster/ClusterViewServiceImpl.java (original)
+++ sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/cluster/ClusterViewServiceImpl.java Mon May 13 14:17:27 2013
@@ -23,6 +23,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.UUID;
+import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.Service;
@@ -38,7 +39,6 @@ import org.apache.sling.discovery.impl.c
import org.apache.sling.discovery.impl.common.resource.EstablishedClusterView;
import org.apache.sling.discovery.impl.common.resource.IsolatedInstanceDescription;
import org.apache.sling.settings.SlingSettingsService;
-import org.osgi.service.component.ComponentContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -75,7 +75,8 @@ public class ClusterViewServiceImpl impl
return isolatedClusterViewId;
}
- protected void activate(final ComponentContext context) {
+ @Activate
+ protected void activate() {
ResourceResolver resourceResolver = null;
try {
resourceResolver = resourceResolverFactory