You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by st...@apache.org on 2013/04/26 13:08:29 UTC

svn commit: r1476138 [2/2] - in /sling/trunk/contrib/extensions/discovery/impl: ./ src/main/java/org/apache/sling/discovery/impl/ src/main/java/org/apache/sling/discovery/impl/cluster/ src/main/java/org/apache/sling/discovery/impl/cluster/voting/ src/m...

Modified: sling/trunk/contrib/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/topology/connector/ConnectorRegistryImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/topology/connector/ConnectorRegistryImpl.java?rev=1476138&r1=1476137&r2=1476138&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/topology/connector/ConnectorRegistryImpl.java (original)
+++ sling/trunk/contrib/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/topology/connector/ConnectorRegistryImpl.java Fri Apr 26 11:08:29 2013
@@ -28,13 +28,14 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 
+import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.Service;
 import org.apache.sling.discovery.impl.Config;
 import org.apache.sling.discovery.impl.cluster.ClusterViewService;
 import org.apache.sling.discovery.impl.topology.announcement.AnnouncementRegistry;
-import org.apache.sling.discovery.impl.topology.connector.TopologyConnectorClientInformation.OriginInfo;
 import org.osgi.service.component.ComponentContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -57,13 +58,24 @@ public class ConnectorRegistryImpl imple
     /** the local port is added to the announcement as the serverInfo object **/
     private String port = "";
 
-    protected void activate(ComponentContext cc) {
+    @Activate
+    protected void activate(final ComponentContext cc) {
         port = cc.getBundleContext().getProperty("org.osgi.service.http.port");
     }
-
-    public TopologyConnectorClientInformation registerOutgoingConnection(
-            final ClusterViewService clusterViewService, final URL connectorUrl,
-            final OriginInfo originInfo) {
+    
+    @Deactivate
+    protected void deactivate() {
+        synchronized (outgoingClientsMap) {
+            for (Iterator<TopologyConnectorClient> it = outgoingClientsMap.values().iterator(); it.hasNext();) {
+                final TopologyConnectorClient client = it.next();
+                client.disconnect();
+                it.remove();
+            }
+        }
+    }
+    
+    public TopologyConnectorClientInformation registerOutgoingConnector(
+            final ClusterViewService clusterViewService, final URL connectorUrl) {
         if (announcementRegistry == null) {
             logger.error("registerOutgoingConnection: announcementRegistry is null");
             return null;
@@ -86,7 +98,7 @@ public class ConnectorRegistryImpl imple
                 serverInfo = "localhost:" + port;
             }
             client = new TopologyConnectorClient(clusterViewService,
-                    announcementRegistry, config, connectorUrl, originInfo,
+                    announcementRegistry, config, connectorUrl,
                     serverInfo);
             outgoingClientsMap.put(client.getId(), client);
         }
@@ -94,7 +106,7 @@ public class ConnectorRegistryImpl imple
         return client;
     }
 
-    public Collection<TopologyConnectorClientInformation> listOutgoingConnections() {
+    public Collection<TopologyConnectorClientInformation> listOutgoingConnectors() {
         final List<TopologyConnectorClientInformation> result = new ArrayList<TopologyConnectorClientInformation>();
         synchronized (outgoingClientsMap) {
             result.addAll(outgoingClientsMap.values());
@@ -102,7 +114,7 @@ public class ConnectorRegistryImpl imple
         return result;
     }
 
-    public boolean unregisterOutgoingConnection(final String id) {
+    public boolean unregisterOutgoingConnector(final String id) {
         if (id == null || id.length() == 0) {
             throw new IllegalArgumentException("id must not be null");
         }
@@ -115,20 +127,7 @@ public class ConnectorRegistryImpl imple
         }
     }
 
-    public boolean pingOutgoingConnection(final String id) {
-        if (id == null || id.length() == 0) {
-            throw new IllegalArgumentException("id must not be null");
-        }
-        synchronized (outgoingClientsMap) {
-            TopologyConnectorClient client = outgoingClientsMap.get(id);
-            if (client != null) {
-                client.ping();
-            }
-            return client != null;
-        }
-    }
-
-    public void pingOutgoingConnections() {
+    public void pingOutgoingConnectors() {
         List<TopologyConnectorClient> outgoingTemplatesClone;
         synchronized (outgoingClientsMap) {
             outgoingTemplatesClone = new ArrayList<TopologyConnectorClient>(

Modified: sling/trunk/contrib/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/topology/connector/TopologyConnectorClient.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/topology/connector/TopologyConnectorClient.java?rev=1476138&r1=1476137&r2=1476138&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/topology/connector/TopologyConnectorClient.java (original)
+++ sling/trunk/contrib/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/topology/connector/TopologyConnectorClient.java Fri Apr 26 11:08:29 2013
@@ -23,12 +23,16 @@ import java.net.URL;
 import java.util.Iterator;
 import java.util.UUID;
 
+import javax.servlet.http.HttpServletResponse;
+
 import org.apache.commons.httpclient.Credentials;
 import org.apache.commons.httpclient.HttpClient;
 import org.apache.commons.httpclient.URIException;
 import org.apache.commons.httpclient.UsernamePasswordCredentials;
 import org.apache.commons.httpclient.auth.AuthScope;
-import org.apache.commons.httpclient.methods.PostMethod;
+import org.apache.commons.httpclient.methods.DeleteMethod;
+import org.apache.commons.httpclient.methods.PutMethod;
+import org.apache.commons.httpclient.methods.StringRequestEntity;
 import org.apache.sling.commons.json.JSONException;
 import org.apache.sling.discovery.InstanceDescription;
 import org.apache.sling.discovery.impl.Config;
@@ -66,9 +70,6 @@ public class TopologyConnectorClient imp
     /** the last inherited announcement **/
     private Announcement lastInheritedAnnouncement;
 
-    /** the information as to where this connector came from **/
-    private final OriginInfo originInfo;
-
     /** the information about this server **/
     private final String serverInfo;
     
@@ -77,7 +78,7 @@ public class TopologyConnectorClient imp
 
     TopologyConnectorClient(final ClusterViewService clusterViewService,
             final AnnouncementRegistry announcementRegistry, final Config config,
-            final URL connectorUrl, final OriginInfo originInfo, final String serverInfo) {
+            final URL connectorUrl, final String serverInfo) {
         if (clusterViewService == null) {
             throw new IllegalArgumentException(
                     "clusterViewService must not be null");
@@ -92,24 +93,22 @@ public class TopologyConnectorClient imp
         if (connectorUrl == null) {
             throw new IllegalArgumentException("connectorUrl must not be null");
         }
-        if (originInfo == null) {
-            throw new IllegalArgumentException("originInfo must not be null");
-        }
         this.clusterViewService = clusterViewService;
         this.announcementRegistry = announcementRegistry;
         this.config = config;
         this.connectorUrl = connectorUrl;
-        this.originInfo = originInfo;
         this.serverInfo = serverInfo;
         this.id = UUID.randomUUID();
     }
 
     /** ping the server and pass the announcements between the two **/
     void ping() {
-        logger.debug("ping: connectorUrl=" + connectorUrl);
+        final String uri = connectorUrl.toString()+"."+clusterViewService.getSlingId()+".json";
+    	if (logger.isDebugEnabled()) {
+    		logger.debug("ping: connectorUrl=" + connectorUrl + ", complete uri=" + uri);
+    	}
         HttpClient httpClient = new HttpClient();
-        PostMethod method = new PostMethod(connectorUrl.toString());
-
+        PutMethod method = new PutMethod(uri);
         try {
             String userInfo = connectorUrl.getUserInfo();
             if (userInfo != null) {
@@ -121,7 +120,6 @@ public class TopologyConnectorClient imp
 
             Announcement topologyAnnouncement = new Announcement(
                     clusterViewService.getSlingId());
-            topologyAnnouncement.setOriginInfo(originInfo);
             topologyAnnouncement.setServerInfo(serverInfo);
             topologyAnnouncement.setLocalCluster(clusterViewService
                     .getClusterView());
@@ -147,37 +145,60 @@ public class TopologyConnectorClient imp
             });
             final String p = topologyAnnouncement.asJSON();
 
-            logger.debug("ping: topologyAnnouncement json is: " + p);
-            method.addParameter("topologyAnnouncement", p);
+        	if (logger.isDebugEnabled()) {
+        		logger.debug("ping: topologyAnnouncement json is: " + p);
+        	}
+            method.setRequestEntity(new StringRequestEntity(p, "application/json", "UTF-8"));
             httpClient.executeMethod(method);
-            logger.debug("ping: done. code=" + method.getStatusCode() + " - "
-                    + method.getStatusText());
+        	if (logger.isDebugEnabled()) {
+	            logger.debug("ping: done. code=" + method.getStatusCode() + " - "
+	                    + method.getStatusText());
+        	}
             lastStatusCode = method.getStatusCode();
-            if (method.getStatusCode()==200) {
+            if (method.getStatusCode()==HttpServletResponse.SC_OK) {
                 String responseBody = method.getResponseBodyAsString(16*1024*1024); // limiting to 16MB, should be way enough
-                logger.debug("ping: response body=" + responseBody);
-                Announcement inheritedAnnouncement = Announcement
-                        .fromJSON(responseBody);
-                inheritedAnnouncement.setInherited(true);
-                if (!announcementRegistry
-                        .registerAnnouncement(inheritedAnnouncement)) {
-                    logger.info("ping: connector response is from an instance which I already see in my topology"
-                            + inheritedAnnouncement);
+            	if (logger.isDebugEnabled()) {
+            		logger.debug("ping: response body=" + responseBody);
+            	}
+                if (responseBody!=null && responseBody.length()>0) {
+                    Announcement inheritedAnnouncement = Announcement
+                            .fromJSON(responseBody);
+                    if (inheritedAnnouncement.isLoop()) {
+                    	if (logger.isDebugEnabled()) {
+	                        logger.debug("ping: connector response indicated a loop detected. not registering this announcement from "+
+	                                    inheritedAnnouncement.getOwnerId());
+                    	}
+                    } else {
+                        inheritedAnnouncement.setInherited(true);
+                        if (!announcementRegistry
+                                .registerAnnouncement(inheritedAnnouncement)) {
+                        	if (logger.isDebugEnabled()) {
+	                            logger.debug("ping: connector response is from an instance which I already see in my topology"
+	                                    + inheritedAnnouncement);
+                        	}
+                            lastInheritedAnnouncement = null;
+                            return;
+                        }
+                    }
+                    lastInheritedAnnouncement = inheritedAnnouncement;
+                } else {
                     lastInheritedAnnouncement = null;
-                    return;
                 }
-                lastInheritedAnnouncement = inheritedAnnouncement;
             } else {
                 lastInheritedAnnouncement = null;
             }
         } catch (URIException e) {
             logger.warn("ping: Got URIException: " + e);
+            lastInheritedAnnouncement = null;
         } catch (IOException e) {
             logger.warn("ping: got IOException: " + e);
+            lastInheritedAnnouncement = null;
         } catch (JSONException e) {
             logger.warn("ping: got JSONException: " + e);
+            lastInheritedAnnouncement = null;
         } catch (RuntimeException re) {
-            logger.error("ping: got RuntimeException: " + re, re);
+            logger.warn("ping: got RuntimeException: " + re, re);
+            lastInheritedAnnouncement = null;
         }
     }
 
@@ -188,6 +209,14 @@ public class TopologyConnectorClient imp
     public URL getConnectorUrl() {
         return connectorUrl;
     }
+    
+    public boolean representsLoop() {
+        if (lastInheritedAnnouncement == null) {
+            return false;
+        } else {
+            return lastInheritedAnnouncement.isLoop();
+        }
+    }
 
     public boolean isConnected() {
         if (lastInheritedAnnouncement == null) {
@@ -205,17 +234,16 @@ public class TopologyConnectorClient imp
         }
     }
 
-    public OriginInfo getOriginInfo() {
-        return originInfo;
-    }
-
     public String getId() {
         return id.toString();
     }
 
     /** Disconnect this connector **/
     public void disconnect() {
-        logger.debug("disconnect: connectorUrl=" + connectorUrl);
+        final String uri = connectorUrl.toString()+"."+clusterViewService.getSlingId()+".json";
+    	if (logger.isDebugEnabled()) {
+    		logger.debug("disconnect: connectorUrl=" + connectorUrl + ", complete uri="+uri);
+    	}
 
         if (lastInheritedAnnouncement != null) {
             announcementRegistry
@@ -224,7 +252,7 @@ public class TopologyConnectorClient imp
         }
 
         HttpClient httpClient = new HttpClient();
-        PostMethod method = new PostMethod(connectorUrl.toString());
+        DeleteMethod method = new DeleteMethod(uri);
 
         try {
             String userInfo = connectorUrl.getUserInfo();
@@ -235,11 +263,13 @@ public class TopologyConnectorClient imp
                                 .getURI().getPort()), c);
             }
 
-            method.addParameter("topologyDisconnect",
-                    clusterViewService.getSlingId());
             httpClient.executeMethod(method);
-            logger.debug("disconnect: done. code=" + method.getStatusCode()
-                    + " - " + method.getStatusText());
+        	if (logger.isDebugEnabled()) {
+	            logger.debug("disconnect: done. code=" + method.getStatusCode()
+	                    + " - " + method.getStatusText());
+        	}
+            // ignoring the actual statuscode though as there's little we can
+            // do about it after this point
         } catch (URIException e) {
             logger.warn("disconnect: Got URIException: " + e);
         } catch (IOException e) {

Modified: sling/trunk/contrib/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/topology/connector/TopologyConnectorClientInformation.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/topology/connector/TopologyConnectorClientInformation.java?rev=1476138&r1=1476137&r2=1476138&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/topology/connector/TopologyConnectorClientInformation.java (original)
+++ sling/trunk/contrib/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/topology/connector/TopologyConnectorClientInformation.java Fri Apr 26 11:08:29 2013
@@ -25,12 +25,6 @@ import java.net.URL;
  */
 public interface TopologyConnectorClientInformation {
 
-    enum OriginInfo {
-        Config, // this connector was created via config
-        WebConsole, // this connector was created via the wbconsole
-        Programmatically // this connector was created programmatically
-    }
-
     /** the endpoint url where this connector is connecting to **/
     URL getConnectorUrl();
 
@@ -40,12 +34,13 @@ public interface TopologyConnectorClient
     /** whether or not this connector was able to successfully connect **/
     boolean isConnected();
 
+    /** whether or not the counterpart of this connector has detected a loop in the topology connectors **/
+    boolean representsLoop();
+    
     /** the sling id of the remote end **/
     String getRemoteSlingId();
 
     /** the unique id of this connector **/
     String getId();
 
-    /** the information about how this connector was created **/
-    OriginInfo getOriginInfo();
 }

Modified: sling/trunk/contrib/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/topology/connector/TopologyConnectorServlet.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/topology/connector/TopologyConnectorServlet.java?rev=1476138&r1=1476137&r2=1476138&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/topology/connector/TopologyConnectorServlet.java (original)
+++ sling/trunk/contrib/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/topology/connector/TopologyConnectorServlet.java Fri Apr 26 11:08:29 2013
@@ -18,18 +18,21 @@
  */
 package org.apache.sling.discovery.impl.topology.connector;
 
+import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.util.HashSet;
 import java.util.Set;
 
 import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletResponse;
 
 import org.apache.felix.scr.annotations.Property;
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.sling.SlingServlet;
 import org.apache.sling.api.SlingHttpServletRequest;
 import org.apache.sling.api.SlingHttpServletResponse;
+import org.apache.sling.api.request.RequestPathInfo;
 import org.apache.sling.api.servlets.SlingAllMethodsServlet;
 import org.apache.sling.commons.json.JSONException;
 import org.apache.sling.discovery.impl.Config;
@@ -46,14 +49,13 @@ import org.slf4j.LoggerFactory;
  * Servlet which receives topology announcements at
  * /libs/sling/topology/connector (which is reachable without authorization)
  */
-@SlingServlet(paths = { "/libs/sling/topology/connector" })
-@Property(name = "sling.auth.requirements", value = { "-/libs/sling/topology/connector" })
+@SuppressWarnings("serial")
+@SlingServlet(paths = { TopologyConnectorServlet.TOPOLOGY_CONNECTOR_PATH })
+@Property(name = "sling.auth.requirements", value = { "-"+TopologyConnectorServlet.TOPOLOGY_CONNECTOR_PATH })
 public class TopologyConnectorServlet extends SlingAllMethodsServlet {
 
     public static final String TOPOLOGY_CONNECTOR_PATH = "/libs/sling/topology/connector";
 
-    private static final long serialVersionUID = 1300640476823585873L;
-
     private final Logger logger = LoggerFactory.getLogger(this.getClass());
 
     @Reference
@@ -80,75 +82,135 @@ public class TopologyConnectorServlet ex
             whitelist.add(aWhitelistEntry);
         }
     }
-
+    
     @Override
-    protected void doPost(final SlingHttpServletRequest request,
-            final SlingHttpServletResponse response) throws ServletException,
+    protected void doDelete(SlingHttpServletRequest request,
+            SlingHttpServletResponse response) throws ServletException,
             IOException {
-
+        
         if (!isWhitelisted(request)) {
-            response.sendError(404); // in theory it would be 403==forbidden, but that would reveal that 
-                                     // a resource would exist there in the first place
+            // in theory it would be 403==forbidden, but that would reveal that 
+            // a resource would exist there in the first place
+            response.sendError(HttpServletResponse.SC_NOT_FOUND); 
+            return;
+        }
+        
+        final RequestPathInfo pathInfo = request.getRequestPathInfo();
+        final String extension = pathInfo.getExtension();
+        if (!"json".equals(extension)) {
+            response.sendError(HttpServletResponse.SC_NOT_FOUND);
             return;
         }
+        final String selector = pathInfo.getSelectorString();
+        
+        announcementRegistry.unregisterAnnouncement(selector);
+    }
+    
+    @Override
+    protected void doPut(SlingHttpServletRequest request,
+            SlingHttpServletResponse response) throws ServletException,
+            IOException {
 
-        final String topologyAnnouncementJSON = request
-                .getParameter("topologyAnnouncement");
-        final String topologyDisconnect = request
-                .getParameter("topologyDisconnect");
-        if (topologyDisconnect != null) {
-            announcementRegistry.unregisterAnnouncement(topologyDisconnect);
+        if (!isWhitelisted(request)) {
+            // in theory it would be 403==forbidden, but that would reveal that 
+            // a resource would exist there in the first place
+            response.sendError(HttpServletResponse.SC_NOT_FOUND); 
+            return;
+        }
+        
+        final RequestPathInfo pathInfo = request.getRequestPathInfo();
+        final String extension = pathInfo.getExtension();
+        if (!"json".equals(extension)) {
+            response.sendError(HttpServletResponse.SC_NOT_FOUND);
             return;
         }
-        logger.debug("doPost: incoming topology announcement is: "
-                + topologyAnnouncementJSON);
+        final String selector = pathInfo.getSelectorString();
+        
+        final BufferedReader reader = request.getReader();
+        StringBuffer sb = new StringBuffer();
+        while(true) {
+            final String line = reader.readLine();
+            if (line==null) {
+                break;
+            }
+            sb.append(line);
+            sb.append(System.getProperty("line.separator"));
+        }
+
+        String topologyAnnouncementJSON = sb.toString();
+    	if (logger.isDebugEnabled()) {
+	        logger.debug("doPost: incoming topology announcement is: "
+	                + topologyAnnouncementJSON);
+    	}
         final Announcement incomingTopologyAnnouncement;
         try {
             incomingTopologyAnnouncement = Announcement
                     .fromJSON(topologyAnnouncementJSON);
-            incomingTopologyAnnouncement.removeInherited(clusterViewService
-                    .getSlingId());
-
-            if (clusterViewService.contains(incomingTopologyAnnouncement
-                    .getOwnerId())) {
-                logger.info("doPost: rejecting an announcement from an instance that is part of my cluster: "
-                        + incomingTopologyAnnouncement);
-                response.sendError(409);
+            
+            if (!incomingTopologyAnnouncement.getOwnerId().equals(selector)) {
+                response.sendError(HttpServletResponse.SC_BAD_REQUEST);
                 return;
             }
-            if (clusterViewService.containsAny(incomingTopologyAnnouncement
-                    .listInstances())) {
-                logger.info("doPost: rejecting an announcement as it contains instance(s) that is/are part of my cluster: "
-                        + incomingTopologyAnnouncement);
-                response.sendError(409);
-                return;
+            
+            String slingId = clusterViewService.getSlingId();
+            if (slingId==null) {
+            	response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
+            	logger.info("doPut: no slingId available. Service not ready as expected at the moment.");
+            	return;
             }
-            if (!announcementRegistry
-                    .registerAnnouncement(incomingTopologyAnnouncement)) {
-                logger.info("doPost: rejecting an announcement from an instance that I already see in my topology: "
+			incomingTopologyAnnouncement.removeInherited(slingId);
+
+            final Announcement replyAnnouncement = new Announcement(
+                    slingId);
+
+            if (!incomingTopologyAnnouncement.isCorrectVersion()) {
+                logger.warn("doPost: rejecting an announcement from an incompatible connector protocol version: "
                         + incomingTopologyAnnouncement);
-                response.sendError(409);
+                response.sendError(HttpServletResponse.SC_BAD_REQUEST);
                 return;
-            }
-
-            Announcement replyAnnouncement = new Announcement(
-                    clusterViewService.getSlingId());
-            replyAnnouncement.setLocalCluster(clusterViewService
-                    .getClusterView());
-            announcementRegistry.addAllExcept(replyAnnouncement,
-                    new AnnouncementFilter() {
-
-                        public boolean accept(final String receivingSlingId, Announcement announcement) {
-                            if (announcement.getPrimaryKey().equals(
-                                    incomingTopologyAnnouncement
-                                            .getPrimaryKey())) {
-                                return false;
+            } else if (clusterViewService.contains(incomingTopologyAnnouncement
+                    .getOwnerId())) {
+            	if (logger.isDebugEnabled()) {
+	                logger.debug("doPost: rejecting an announcement from an instance that is part of my cluster: "
+	                        + incomingTopologyAnnouncement);
+            	}
+                // marking as 'loop'
+                replyAnnouncement.setLoop(true);
+            } else if (clusterViewService.containsAny(incomingTopologyAnnouncement
+                    .listInstances())) {
+            	if (logger.isDebugEnabled()) {
+	                logger.debug("doPost: rejecting an announcement as it contains instance(s) that is/are part of my cluster: "
+	                        + incomingTopologyAnnouncement);
+            	}
+                // marking as 'loop'
+                replyAnnouncement.setLoop(true);
+            } else if (!announcementRegistry
+                    .registerAnnouncement(incomingTopologyAnnouncement)) {
+            	if (logger.isDebugEnabled()) {
+	                logger.debug("doPost: rejecting an announcement from an instance that I already see in my topology: "
+	                        + incomingTopologyAnnouncement);
+            	}
+                // marking as 'loop'
+                replyAnnouncement.setLoop(true);
+            } else {
+                // normal, successful case: replying with the part of the topology which this instance sees
+                replyAnnouncement.setLocalCluster(clusterViewService
+                        .getClusterView());
+                announcementRegistry.addAllExcept(replyAnnouncement,
+                        new AnnouncementFilter() {
+    
+                            public boolean accept(final String receivingSlingId, Announcement announcement) {
+                                if (announcement.getPrimaryKey().equals(
+                                        incomingTopologyAnnouncement
+                                                .getPrimaryKey())) {
+                                    return false;
+                                }
+                                return true;
                             }
-                            return true;
-                        }
-                    });
+                        });
+            }
             final String p = replyAnnouncement.asJSON();
-            PrintWriter pw = response.getWriter();
+            final PrintWriter pw = response.getWriter();
             pw.print(p);
             pw.flush();
         } catch (JSONException e) {

Modified: sling/trunk/contrib/extensions/discovery/impl/src/main/resources/OSGI-INF/metatype/metatype.properties
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/discovery/impl/src/main/resources/OSGI-INF/metatype/metatype.properties?rev=1476138&r1=1476137&r2=1476138&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/discovery/impl/src/main/resources/OSGI-INF/metatype/metatype.properties (original)
+++ sling/trunk/contrib/extensions/discovery/impl/src/main/resources/OSGI-INF/metatype/metatype.properties Fri Apr 26 11:08:29 2013
@@ -32,8 +32,14 @@ heartbeatInterval.name = Heartbeat inter
 heartbeatInterval.description = Configure the interval (in seconds) according to which the \
  heartbeats are exchanged in the topology. Default is 15 seconds.
 
-topologyConnectorUrl.name = Topology Connector URL
-topologyConnectorUrl.description = URL where to join a topology, e.g. \
+minEventDelay.name = Minimal Event Delay (seconds)
+minEventDelay.description = Configure a minimal delay (in seconds) between TOPOLOGY_CHANGING \
+ and TOPOLOGY_CHANGED. Any further changes happening during this delay are accumulated and \
+ combined in the TOPOLOGY_CHANGED after this delay. THis helps avoiding event-flooding. \
+ Default is 3 seconds. A negative value or zero disables this delay.
+
+topologyConnectorUrls.name = Topology Connector URLs
+topologyConnectorUrls.description = URLs where to join a topology, e.g. \
  http://localhost:4502/libs/sling/topology/connector
 
 topologyConnectorWhitelist.name = Topology Connector Whitelist

Modified: sling/trunk/contrib/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/cluster/SingleInstanceTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/cluster/SingleInstanceTest.java?rev=1476138&r1=1476137&r2=1476138&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/cluster/SingleInstanceTest.java (original)
+++ sling/trunk/contrib/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/cluster/SingleInstanceTest.java Fri Apr 26 11:08:29 2013
@@ -138,6 +138,7 @@ public class SingleInstanceTest {
         assertEquals(1, assertingTopologyEventListener.getRemainingExpectedCount());
         assertEquals(0, pp.getGetCnt());
         instance.bindPropertyProvider(pp, propertyName);
+        Thread.sleep(1500);
         assertEquals(0, assertingTopologyEventListener.getRemainingExpectedCount());
         // we can only assume that the getProperty was called at least once - it
         // could be called multiple times though..
@@ -153,13 +154,13 @@ public class SingleInstanceTest {
         instance.runHeartbeatOnce();
         Thread.sleep(2000);
         assertEquals(0, assertingTopologyEventListener.getRemainingExpectedCount());
-        assertEquals(1, pp.getGetCnt());
+        assertEquals(2, pp.getGetCnt());
 
         // a heartbeat repeat should not result in another call though
         instance.runHeartbeatOnce();
         Thread.sleep(2000);
         assertEquals(0, assertingTopologyEventListener.getRemainingExpectedCount());
-        assertEquals(2, pp.getGetCnt());
+        assertEquals(3, pp.getGetCnt());
 
     }
 

Modified: sling/trunk/contrib/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/setup/Instance.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/setup/Instance.java?rev=1476138&r1=1476137&r2=1476138&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/setup/Instance.java (original)
+++ sling/trunk/contrib/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/setup/Instance.java Fri Apr 26 11:08:29 2013
@@ -39,11 +39,14 @@ import javax.jcr.observation.Observation
 import junitx.util.PrivateAccessor;
 
 import org.apache.sling.api.SlingConstants;
-import org.apache.sling.api.resource.LoginException;
 import org.apache.sling.api.resource.ResourceResolver;
 import org.apache.sling.api.resource.ResourceResolverFactory;
-import org.apache.sling.discovery.TopologyEventListener;
+import org.apache.sling.commons.scheduler.Scheduler;
+import org.apache.sling.commons.scheduler.impl.QuartzScheduler;
+import org.apache.sling.commons.threads.ThreadPoolManager;
+import org.apache.sling.commons.threads.impl.DefaultThreadPoolManager;
 import org.apache.sling.discovery.PropertyProvider;
+import org.apache.sling.discovery.TopologyEventListener;
 import org.apache.sling.discovery.impl.Config;
 import org.apache.sling.discovery.impl.DiscoveryServiceImpl;
 import org.apache.sling.discovery.impl.cluster.ClusterViewService;
@@ -84,6 +87,26 @@ public class Instance {
     private ResourceResolver resourceResolver;
 
     private int serviceId = 999;
+    
+    private static Scheduler singletonScheduler = null;
+    
+    private static Scheduler getSingletonScheduler() throws Exception {
+    	if (singletonScheduler!=null) {
+    		return singletonScheduler;
+    	}
+        final Scheduler newscheduler = new QuartzScheduler();
+        final ThreadPoolManager tpm = new DefaultThreadPoolManager(null, null);
+        try {
+        	PrivateAccessor.invoke(newscheduler, "bindThreadPoolManager",
+        			new Class[] { ThreadPoolManager.class },
+        			new Object[] { tpm });
+        } catch (Throwable e1) {
+        	org.junit.Assert.fail(e1.toString());
+        }
+        OSGiMock.activate(newscheduler);
+        singletonScheduler = newscheduler;
+        return singletonScheduler;
+    }
 
     private Instance(String debugName,
             ResourceResolverFactory resourceResolverFactory, boolean resetRepo)
@@ -97,10 +120,15 @@ public class Instance {
         Config config = new Config() {
             @Override
             public long getHeartbeatTimeout() {
-                return 20000;
+                return 20;
+            }
+            
+            @Override
+            public int getMinEventDelay() {
+            	return 1;
             }
         };
-
+        
         clusterViewService = OSGiFactory.createClusterViewServiceImpl(slingId,
                 resourceResolverFactory, config);
         announcementRegistry = OSGiFactory.createITopologyAnnouncementRegistry(
@@ -111,11 +139,11 @@ public class Instance {
                 resourceResolverFactory, slingId, announcementRegistry,
                 connectorRegistry, config,
                 resourceResolverFactory.getAdministrativeResourceResolver(null)
-                        .adaptTo(Repository.class));
-
-        discoveryService = OSGiFactory.createDiscoverService(slingId,
+                        .adaptTo(Repository.class), getSingletonScheduler());
+        
+		discoveryService = OSGiFactory.createDiscoverService(slingId,
                 heartbeatHandler, clusterViewService, announcementRegistry,
-                resourceResolverFactory, config);
+                resourceResolverFactory, config, connectorRegistry, getSingletonScheduler());
 
         votingHandler = OSGiFactory.createVotingHandler(slingId,
                 resourceResolverFactory, config);
@@ -272,10 +300,11 @@ public class Instance {
         }
     }
 
-    public void stop() throws LoginException {
+    public void stop() throws Exception {
         if (resourceResolver != null) {
             resourceResolver.close();
         }
+        osgiMock.deactivateAll();
     }
 
     public void bindTopologyEventListener(TopologyEventListener eventListener)

Modified: sling/trunk/contrib/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/setup/MockFactory.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/setup/MockFactory.java?rev=1476138&r1=1476137&r2=1476138&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/setup/MockFactory.java (original)
+++ sling/trunk/contrib/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/setup/MockFactory.java Fri Apr 26 11:08:29 2013
@@ -22,6 +22,7 @@ import java.util.Dictionary;
 import java.util.Properties;
 
 import org.apache.sling.api.resource.ResourceResolverFactory;
+import org.apache.sling.commons.scheduler.Scheduler;
 import org.apache.sling.settings.SlingSettingsService;
 import org.hamcrest.Description;
 import org.jmock.Expectations;

Modified: sling/trunk/contrib/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/setup/OSGiFactory.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/setup/OSGiFactory.java?rev=1476138&r1=1476137&r2=1476138&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/setup/OSGiFactory.java (original)
+++ sling/trunk/contrib/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/setup/OSGiFactory.java Fri Apr 26 11:08:29 2013
@@ -23,6 +23,7 @@ import javax.jcr.Repository;
 import junitx.util.PrivateAccessor;
 
 import org.apache.sling.api.resource.ResourceResolverFactory;
+import org.apache.sling.commons.scheduler.Scheduler;
 import org.apache.sling.discovery.impl.Config;
 import org.apache.sling.discovery.impl.DiscoveryServiceImpl;
 import org.apache.sling.discovery.impl.cluster.ClusterViewServiceImpl;
@@ -52,7 +53,7 @@ public class OSGiFactory {
     public static HeartbeatHandler createHeartbeatHandler(
             ResourceResolverFactory resourceResolverFactory, String slingId,
             AnnouncementRegistry topologyService,
-            ConnectorRegistry connectorRegistry, Config config, Repository repository)
+            ConnectorRegistry connectorRegistry, Config config, Repository repository, Scheduler scheduler)
             throws Exception {
         HeartbeatHandler heartbeatHandler = new HeartbeatHandler();
         PrivateAccessor.setField(heartbeatHandler, "resourceResolverFactory",
@@ -64,6 +65,7 @@ public class OSGiFactory {
         PrivateAccessor.setField(heartbeatHandler, "connectorRegistry",
                 connectorRegistry);
         PrivateAccessor.setField(heartbeatHandler, "config", config);
+        PrivateAccessor.setField(heartbeatHandler, "scheduler", scheduler);
 
         return heartbeatHandler;
     }
@@ -72,7 +74,8 @@ public class OSGiFactory {
             HeartbeatHandler heartbeatHandler,
             ClusterViewServiceImpl clusterViewService,
             AnnouncementRegistry topologyRegistry,
-            ResourceResolverFactory resourceResolverFactory, Config config)
+            ResourceResolverFactory resourceResolverFactory, Config config, 
+            ConnectorRegistry connectorRegistry, Scheduler scheduler)
             throws Exception {
         DiscoveryServiceImpl discoveryService = new DiscoveryServiceImpl();
         PrivateAccessor.setField(discoveryService, "settingsService",
@@ -86,6 +89,10 @@ public class OSGiFactory {
         PrivateAccessor.setField(discoveryService, "resourceResolverFactory",
                 resourceResolverFactory);
         PrivateAccessor.setField(discoveryService, "config", config);
+        PrivateAccessor.setField(discoveryService, "connectorRegistry",
+        		connectorRegistry);
+        PrivateAccessor.setField(discoveryService, "scheduler",
+        		scheduler);
 
         return discoveryService;
     }

Modified: sling/trunk/contrib/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/setup/OSGiMock.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/setup/OSGiMock.java?rev=1476138&r1=1476137&r2=1476138&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/setup/OSGiMock.java (original)
+++ sling/trunk/contrib/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/setup/OSGiMock.java Fri Apr 26 11:08:29 2013
@@ -18,6 +18,7 @@
  */
 package org.apache.sling.discovery.impl.setup;
 
+import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.util.Iterator;
 import java.util.LinkedList;
@@ -53,18 +54,48 @@ public class OSGiMock {
         Iterator it = services.iterator(); it.hasNext();) {
             Object aService = it.next();
 
-            Method[] methods = aService.getClass().getDeclaredMethods();
-            for (int i = 0; i < methods.length; i++) {
-                Method method = methods[i];
-                if (method.getName().equals("activate")) {
-                    method.setAccessible(true);
-                    if ( method.getParameterTypes().length == 0 ) {
-                        method.invoke(aService, null);
-                    } else {
-                        method.invoke(aService, MockFactory.mockComponentContext());
-                    }
-                }
-            }
+            activate(aService);
         }
     }
+
+	public static void activate(Object aService) throws IllegalAccessException,
+			InvocationTargetException {
+		Method[] methods = aService.getClass().getDeclaredMethods();
+		for (int i = 0; i < methods.length; i++) {
+		    Method method = methods[i];
+		    if (method.getName().equals("activate")) {
+		        method.setAccessible(true);
+		        if ( method.getParameterTypes().length == 0 ) {
+		            method.invoke(aService, null);
+		        } else {
+		            method.invoke(aService, MockFactory.mockComponentContext());
+		        }
+		    }
+		}
+	}
+
+	public void deactivateAll() throws Exception {
+        for (@SuppressWarnings("rawtypes")
+        Iterator it = services.iterator(); it.hasNext();) {
+            Object aService = it.next();
+
+            deactivate(aService);
+        }
+	}
+
+	public static void deactivate(Object aService) throws IllegalAccessException,
+			InvocationTargetException {
+		Method[] methods = aService.getClass().getDeclaredMethods();
+		for (int i = 0; i < methods.length; i++) {
+		    Method method = methods[i];
+		    if (method.getName().equals("deactivate")) {
+		        method.setAccessible(true);
+		        if ( method.getParameterTypes().length == 0 ) {
+		            method.invoke(aService, null);
+		        } else {
+		            method.invoke(aService, MockFactory.mockComponentContext());
+		        }
+		    }
+		}
+	}
 }

Modified: sling/trunk/contrib/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/topology/connector/ConnectorRegistryTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/topology/connector/ConnectorRegistryTest.java?rev=1476138&r1=1476137&r2=1476138&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/topology/connector/ConnectorRegistryTest.java (original)
+++ sling/trunk/contrib/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/topology/connector/ConnectorRegistryTest.java Fri Apr 26 11:08:29 2013
@@ -29,7 +29,6 @@ import org.apache.sling.discovery.impl.s
 import org.apache.sling.discovery.impl.setup.MockFactory;
 import org.apache.sling.discovery.impl.setup.OSGiFactory;
 import org.apache.sling.discovery.impl.topology.announcement.AnnouncementRegistry;
-import org.apache.sling.discovery.impl.topology.connector.TopologyConnectorClientInformation.OriginInfo;
 import org.junit.Test;
 
 public class ConnectorRegistryTest {
@@ -51,44 +50,37 @@ public class ConnectorRegistryTest {
         ConnectorRegistry c = OSGiFactory.createConnectorRegistry(
                 announcementRegistry, config);
 
-        final URL url = new URL("http://localhost:1234");
+        final URL url = new URL("http://localhost:1234/connector");
         final ClusterViewService cvs = i.getClusterViewService();
         try {
-            c.registerOutgoingConnection(cvs, url, null);
+            c.registerOutgoingConnector(null, url);
             fail("should have complained");
         } catch (IllegalArgumentException e) {
             // ok
         }
         try {
-            c.registerOutgoingConnection(null, url, OriginInfo.Programmatically);
-            fail("should have complained");
-        } catch (IllegalArgumentException e) {
-            // ok
-        }
-        try {
-            c.registerOutgoingConnection(cvs, null, OriginInfo.Config);
+            c.registerOutgoingConnector(cvs, null);
             fail("should have complained");
         } catch (IllegalArgumentException e) {
             // ok
         }
         TopologyConnectorClientInformation client = c
-                .registerOutgoingConnection(cvs, url, OriginInfo.WebConsole);
+                .registerOutgoingConnector(cvs, url);
         try {
             // should not be able to register same url twice
-            client = c.registerOutgoingConnection(cvs, url,
-                    OriginInfo.Programmatically);
+            client = c.registerOutgoingConnector(cvs, url);
             fail("should have complained");
         } catch (IllegalStateException e) {
             // ok
         }
 
         try {
-            c.unregisterOutgoingConnection(null);
+            c.unregisterOutgoingConnector(null);
             fail("should have complained");
         } catch (IllegalArgumentException e) {
             // ok
         }
 
-        c.unregisterOutgoingConnection(client.getId());
+        c.unregisterOutgoingConnector(client.getId());
     }
 }