You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by st...@apache.org on 2013/04/26 13:08:29 UTC
svn commit: r1476138 [2/2] - in
/sling/trunk/contrib/extensions/discovery/impl: ./
src/main/java/org/apache/sling/discovery/impl/
src/main/java/org/apache/sling/discovery/impl/cluster/
src/main/java/org/apache/sling/discovery/impl/cluster/voting/ src/m...
Modified: sling/trunk/contrib/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/topology/connector/ConnectorRegistryImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/topology/connector/ConnectorRegistryImpl.java?rev=1476138&r1=1476137&r2=1476138&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/topology/connector/ConnectorRegistryImpl.java (original)
+++ sling/trunk/contrib/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/topology/connector/ConnectorRegistryImpl.java Fri Apr 26 11:08:29 2013
@@ -28,13 +28,14 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.Service;
import org.apache.sling.discovery.impl.Config;
import org.apache.sling.discovery.impl.cluster.ClusterViewService;
import org.apache.sling.discovery.impl.topology.announcement.AnnouncementRegistry;
-import org.apache.sling.discovery.impl.topology.connector.TopologyConnectorClientInformation.OriginInfo;
import org.osgi.service.component.ComponentContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -57,13 +58,24 @@ public class ConnectorRegistryImpl imple
/** the local port is added to the announcement as the serverInfo object **/
private String port = "";
- protected void activate(ComponentContext cc) {
+ @Activate
+ protected void activate(final ComponentContext cc) {
port = cc.getBundleContext().getProperty("org.osgi.service.http.port");
}
-
- public TopologyConnectorClientInformation registerOutgoingConnection(
- final ClusterViewService clusterViewService, final URL connectorUrl,
- final OriginInfo originInfo) {
+
+ @Deactivate
+ protected void deactivate() {
+ synchronized (outgoingClientsMap) {
+ for (Iterator<TopologyConnectorClient> it = outgoingClientsMap.values().iterator(); it.hasNext();) {
+ final TopologyConnectorClient client = it.next();
+ client.disconnect();
+ it.remove();
+ }
+ }
+ }
+
+ public TopologyConnectorClientInformation registerOutgoingConnector(
+ final ClusterViewService clusterViewService, final URL connectorUrl) {
if (announcementRegistry == null) {
logger.error("registerOutgoingConnection: announcementRegistry is null");
return null;
@@ -86,7 +98,7 @@ public class ConnectorRegistryImpl imple
serverInfo = "localhost:" + port;
}
client = new TopologyConnectorClient(clusterViewService,
- announcementRegistry, config, connectorUrl, originInfo,
+ announcementRegistry, config, connectorUrl,
serverInfo);
outgoingClientsMap.put(client.getId(), client);
}
@@ -94,7 +106,7 @@ public class ConnectorRegistryImpl imple
return client;
}
- public Collection<TopologyConnectorClientInformation> listOutgoingConnections() {
+ public Collection<TopologyConnectorClientInformation> listOutgoingConnectors() {
final List<TopologyConnectorClientInformation> result = new ArrayList<TopologyConnectorClientInformation>();
synchronized (outgoingClientsMap) {
result.addAll(outgoingClientsMap.values());
@@ -102,7 +114,7 @@ public class ConnectorRegistryImpl imple
return result;
}
- public boolean unregisterOutgoingConnection(final String id) {
+ public boolean unregisterOutgoingConnector(final String id) {
if (id == null || id.length() == 0) {
throw new IllegalArgumentException("id must not be null");
}
@@ -115,20 +127,7 @@ public class ConnectorRegistryImpl imple
}
}
- public boolean pingOutgoingConnection(final String id) {
- if (id == null || id.length() == 0) {
- throw new IllegalArgumentException("id must not be null");
- }
- synchronized (outgoingClientsMap) {
- TopologyConnectorClient client = outgoingClientsMap.get(id);
- if (client != null) {
- client.ping();
- }
- return client != null;
- }
- }
-
- public void pingOutgoingConnections() {
+ public void pingOutgoingConnectors() {
List<TopologyConnectorClient> outgoingTemplatesClone;
synchronized (outgoingClientsMap) {
outgoingTemplatesClone = new ArrayList<TopologyConnectorClient>(
Modified: sling/trunk/contrib/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/topology/connector/TopologyConnectorClient.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/topology/connector/TopologyConnectorClient.java?rev=1476138&r1=1476137&r2=1476138&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/topology/connector/TopologyConnectorClient.java (original)
+++ sling/trunk/contrib/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/topology/connector/TopologyConnectorClient.java Fri Apr 26 11:08:29 2013
@@ -23,12 +23,16 @@ import java.net.URL;
import java.util.Iterator;
import java.util.UUID;
+import javax.servlet.http.HttpServletResponse;
+
import org.apache.commons.httpclient.Credentials;
import org.apache.commons.httpclient.HttpClient;
import org.apache.commons.httpclient.URIException;
import org.apache.commons.httpclient.UsernamePasswordCredentials;
import org.apache.commons.httpclient.auth.AuthScope;
-import org.apache.commons.httpclient.methods.PostMethod;
+import org.apache.commons.httpclient.methods.DeleteMethod;
+import org.apache.commons.httpclient.methods.PutMethod;
+import org.apache.commons.httpclient.methods.StringRequestEntity;
import org.apache.sling.commons.json.JSONException;
import org.apache.sling.discovery.InstanceDescription;
import org.apache.sling.discovery.impl.Config;
@@ -66,9 +70,6 @@ public class TopologyConnectorClient imp
/** the last inherited announcement **/
private Announcement lastInheritedAnnouncement;
- /** the information as to where this connector came from **/
- private final OriginInfo originInfo;
-
/** the information about this server **/
private final String serverInfo;
@@ -77,7 +78,7 @@ public class TopologyConnectorClient imp
TopologyConnectorClient(final ClusterViewService clusterViewService,
final AnnouncementRegistry announcementRegistry, final Config config,
- final URL connectorUrl, final OriginInfo originInfo, final String serverInfo) {
+ final URL connectorUrl, final String serverInfo) {
if (clusterViewService == null) {
throw new IllegalArgumentException(
"clusterViewService must not be null");
@@ -92,24 +93,22 @@ public class TopologyConnectorClient imp
if (connectorUrl == null) {
throw new IllegalArgumentException("connectorUrl must not be null");
}
- if (originInfo == null) {
- throw new IllegalArgumentException("originInfo must not be null");
- }
this.clusterViewService = clusterViewService;
this.announcementRegistry = announcementRegistry;
this.config = config;
this.connectorUrl = connectorUrl;
- this.originInfo = originInfo;
this.serverInfo = serverInfo;
this.id = UUID.randomUUID();
}
/** ping the server and pass the announcements between the two **/
void ping() {
- logger.debug("ping: connectorUrl=" + connectorUrl);
+ final String uri = connectorUrl.toString()+"."+clusterViewService.getSlingId()+".json";
+ if (logger.isDebugEnabled()) {
+ logger.debug("ping: connectorUrl=" + connectorUrl + ", complete uri=" + uri);
+ }
HttpClient httpClient = new HttpClient();
- PostMethod method = new PostMethod(connectorUrl.toString());
-
+ PutMethod method = new PutMethod(uri);
try {
String userInfo = connectorUrl.getUserInfo();
if (userInfo != null) {
@@ -121,7 +120,6 @@ public class TopologyConnectorClient imp
Announcement topologyAnnouncement = new Announcement(
clusterViewService.getSlingId());
- topologyAnnouncement.setOriginInfo(originInfo);
topologyAnnouncement.setServerInfo(serverInfo);
topologyAnnouncement.setLocalCluster(clusterViewService
.getClusterView());
@@ -147,37 +145,60 @@ public class TopologyConnectorClient imp
});
final String p = topologyAnnouncement.asJSON();
- logger.debug("ping: topologyAnnouncement json is: " + p);
- method.addParameter("topologyAnnouncement", p);
+ if (logger.isDebugEnabled()) {
+ logger.debug("ping: topologyAnnouncement json is: " + p);
+ }
+ method.setRequestEntity(new StringRequestEntity(p, "application/json", "UTF-8"));
httpClient.executeMethod(method);
- logger.debug("ping: done. code=" + method.getStatusCode() + " - "
- + method.getStatusText());
+ if (logger.isDebugEnabled()) {
+ logger.debug("ping: done. code=" + method.getStatusCode() + " - "
+ + method.getStatusText());
+ }
lastStatusCode = method.getStatusCode();
- if (method.getStatusCode()==200) {
+ if (method.getStatusCode()==HttpServletResponse.SC_OK) {
String responseBody = method.getResponseBodyAsString(16*1024*1024); // limiting to 16MB, should be way enough
- logger.debug("ping: response body=" + responseBody);
- Announcement inheritedAnnouncement = Announcement
- .fromJSON(responseBody);
- inheritedAnnouncement.setInherited(true);
- if (!announcementRegistry
- .registerAnnouncement(inheritedAnnouncement)) {
- logger.info("ping: connector response is from an instance which I already see in my topology"
- + inheritedAnnouncement);
+ if (logger.isDebugEnabled()) {
+ logger.debug("ping: response body=" + responseBody);
+ }
+ if (responseBody!=null && responseBody.length()>0) {
+ Announcement inheritedAnnouncement = Announcement
+ .fromJSON(responseBody);
+ if (inheritedAnnouncement.isLoop()) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("ping: connector response indicated a loop detected. not registering this announcement from "+
+ inheritedAnnouncement.getOwnerId());
+ }
+ } else {
+ inheritedAnnouncement.setInherited(true);
+ if (!announcementRegistry
+ .registerAnnouncement(inheritedAnnouncement)) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("ping: connector response is from an instance which I already see in my topology"
+ + inheritedAnnouncement);
+ }
+ lastInheritedAnnouncement = null;
+ return;
+ }
+ }
+ lastInheritedAnnouncement = inheritedAnnouncement;
+ } else {
lastInheritedAnnouncement = null;
- return;
}
- lastInheritedAnnouncement = inheritedAnnouncement;
} else {
lastInheritedAnnouncement = null;
}
} catch (URIException e) {
logger.warn("ping: Got URIException: " + e);
+ lastInheritedAnnouncement = null;
} catch (IOException e) {
logger.warn("ping: got IOException: " + e);
+ lastInheritedAnnouncement = null;
} catch (JSONException e) {
logger.warn("ping: got JSONException: " + e);
+ lastInheritedAnnouncement = null;
} catch (RuntimeException re) {
- logger.error("ping: got RuntimeException: " + re, re);
+ logger.warn("ping: got RuntimeException: " + re, re);
+ lastInheritedAnnouncement = null;
}
}
@@ -188,6 +209,14 @@ public class TopologyConnectorClient imp
public URL getConnectorUrl() {
return connectorUrl;
}
+
+ public boolean representsLoop() {
+ if (lastInheritedAnnouncement == null) {
+ return false;
+ } else {
+ return lastInheritedAnnouncement.isLoop();
+ }
+ }
public boolean isConnected() {
if (lastInheritedAnnouncement == null) {
@@ -205,17 +234,16 @@ public class TopologyConnectorClient imp
}
}
- public OriginInfo getOriginInfo() {
- return originInfo;
- }
-
public String getId() {
return id.toString();
}
/** Disconnect this connector **/
public void disconnect() {
- logger.debug("disconnect: connectorUrl=" + connectorUrl);
+ final String uri = connectorUrl.toString()+"."+clusterViewService.getSlingId()+".json";
+ if (logger.isDebugEnabled()) {
+ logger.debug("disconnect: connectorUrl=" + connectorUrl + ", complete uri="+uri);
+ }
if (lastInheritedAnnouncement != null) {
announcementRegistry
@@ -224,7 +252,7 @@ public class TopologyConnectorClient imp
}
HttpClient httpClient = new HttpClient();
- PostMethod method = new PostMethod(connectorUrl.toString());
+ DeleteMethod method = new DeleteMethod(uri);
try {
String userInfo = connectorUrl.getUserInfo();
@@ -235,11 +263,13 @@ public class TopologyConnectorClient imp
.getURI().getPort()), c);
}
- method.addParameter("topologyDisconnect",
- clusterViewService.getSlingId());
httpClient.executeMethod(method);
- logger.debug("disconnect: done. code=" + method.getStatusCode()
- + " - " + method.getStatusText());
+ if (logger.isDebugEnabled()) {
+ logger.debug("disconnect: done. code=" + method.getStatusCode()
+ + " - " + method.getStatusText());
+ }
+ // ignoring the actual statuscode though as there's little we can
+ // do about it after this point
} catch (URIException e) {
logger.warn("disconnect: Got URIException: " + e);
} catch (IOException e) {
Modified: sling/trunk/contrib/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/topology/connector/TopologyConnectorClientInformation.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/topology/connector/TopologyConnectorClientInformation.java?rev=1476138&r1=1476137&r2=1476138&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/topology/connector/TopologyConnectorClientInformation.java (original)
+++ sling/trunk/contrib/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/topology/connector/TopologyConnectorClientInformation.java Fri Apr 26 11:08:29 2013
@@ -25,12 +25,6 @@ import java.net.URL;
*/
public interface TopologyConnectorClientInformation {
- enum OriginInfo {
- Config, // this connector was created via config
- WebConsole, // this connector was created via the wbconsole
- Programmatically // this connector was created programmatically
- }
-
/** the endpoint url where this connector is connecting to **/
URL getConnectorUrl();
@@ -40,12 +34,13 @@ public interface TopologyConnectorClient
/** whether or not this connector was able to successfully connect **/
boolean isConnected();
+ /** whether or not the counterpart of this connector has detected a loop in the topology connectors **/
+ boolean representsLoop();
+
/** the sling id of the remote end **/
String getRemoteSlingId();
/** the unique id of this connector **/
String getId();
- /** the information about how this connector was created **/
- OriginInfo getOriginInfo();
}
Modified: sling/trunk/contrib/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/topology/connector/TopologyConnectorServlet.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/topology/connector/TopologyConnectorServlet.java?rev=1476138&r1=1476137&r2=1476138&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/topology/connector/TopologyConnectorServlet.java (original)
+++ sling/trunk/contrib/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/topology/connector/TopologyConnectorServlet.java Fri Apr 26 11:08:29 2013
@@ -18,18 +18,21 @@
*/
package org.apache.sling.discovery.impl.topology.connector;
+import java.io.BufferedReader;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.HashSet;
import java.util.Set;
import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletResponse;
import org.apache.felix.scr.annotations.Property;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.sling.SlingServlet;
import org.apache.sling.api.SlingHttpServletRequest;
import org.apache.sling.api.SlingHttpServletResponse;
+import org.apache.sling.api.request.RequestPathInfo;
import org.apache.sling.api.servlets.SlingAllMethodsServlet;
import org.apache.sling.commons.json.JSONException;
import org.apache.sling.discovery.impl.Config;
@@ -46,14 +49,13 @@ import org.slf4j.LoggerFactory;
* Servlet which receives topology announcements at
* /libs/sling/topology/connector (which is reachable without authorization)
*/
-@SlingServlet(paths = { "/libs/sling/topology/connector" })
-@Property(name = "sling.auth.requirements", value = { "-/libs/sling/topology/connector" })
+@SuppressWarnings("serial")
+@SlingServlet(paths = { TopologyConnectorServlet.TOPOLOGY_CONNECTOR_PATH })
+@Property(name = "sling.auth.requirements", value = { "-"+TopologyConnectorServlet.TOPOLOGY_CONNECTOR_PATH })
public class TopologyConnectorServlet extends SlingAllMethodsServlet {
public static final String TOPOLOGY_CONNECTOR_PATH = "/libs/sling/topology/connector";
- private static final long serialVersionUID = 1300640476823585873L;
-
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@Reference
@@ -80,75 +82,135 @@ public class TopologyConnectorServlet ex
whitelist.add(aWhitelistEntry);
}
}
-
+
@Override
- protected void doPost(final SlingHttpServletRequest request,
- final SlingHttpServletResponse response) throws ServletException,
+ protected void doDelete(SlingHttpServletRequest request,
+ SlingHttpServletResponse response) throws ServletException,
IOException {
-
+
if (!isWhitelisted(request)) {
- response.sendError(404); // in theory it would be 403==forbidden, but that would reveal that
- // a resource would exist there in the first place
+ // in theory it would be 403==forbidden, but that would reveal that
+ // a resource would exist there in the first place
+ response.sendError(HttpServletResponse.SC_NOT_FOUND);
+ return;
+ }
+
+ final RequestPathInfo pathInfo = request.getRequestPathInfo();
+ final String extension = pathInfo.getExtension();
+ if (!"json".equals(extension)) {
+ response.sendError(HttpServletResponse.SC_NOT_FOUND);
return;
}
+ final String selector = pathInfo.getSelectorString();
+
+ announcementRegistry.unregisterAnnouncement(selector);
+ }
+
+ @Override
+ protected void doPut(SlingHttpServletRequest request,
+ SlingHttpServletResponse response) throws ServletException,
+ IOException {
- final String topologyAnnouncementJSON = request
- .getParameter("topologyAnnouncement");
- final String topologyDisconnect = request
- .getParameter("topologyDisconnect");
- if (topologyDisconnect != null) {
- announcementRegistry.unregisterAnnouncement(topologyDisconnect);
+ if (!isWhitelisted(request)) {
+ // in theory it would be 403==forbidden, but that would reveal that
+ // a resource would exist there in the first place
+ response.sendError(HttpServletResponse.SC_NOT_FOUND);
+ return;
+ }
+
+ final RequestPathInfo pathInfo = request.getRequestPathInfo();
+ final String extension = pathInfo.getExtension();
+ if (!"json".equals(extension)) {
+ response.sendError(HttpServletResponse.SC_NOT_FOUND);
return;
}
- logger.debug("doPost: incoming topology announcement is: "
- + topologyAnnouncementJSON);
+ final String selector = pathInfo.getSelectorString();
+
+ final BufferedReader reader = request.getReader();
+ StringBuffer sb = new StringBuffer();
+ while(true) {
+ final String line = reader.readLine();
+ if (line==null) {
+ break;
+ }
+ sb.append(line);
+ sb.append(System.getProperty("line.separator"));
+ }
+
+ String topologyAnnouncementJSON = sb.toString();
+ if (logger.isDebugEnabled()) {
+ logger.debug("doPost: incoming topology announcement is: "
+ + topologyAnnouncementJSON);
+ }
final Announcement incomingTopologyAnnouncement;
try {
incomingTopologyAnnouncement = Announcement
.fromJSON(topologyAnnouncementJSON);
- incomingTopologyAnnouncement.removeInherited(clusterViewService
- .getSlingId());
-
- if (clusterViewService.contains(incomingTopologyAnnouncement
- .getOwnerId())) {
- logger.info("doPost: rejecting an announcement from an instance that is part of my cluster: "
- + incomingTopologyAnnouncement);
- response.sendError(409);
+
+ if (!incomingTopologyAnnouncement.getOwnerId().equals(selector)) {
+ response.sendError(HttpServletResponse.SC_BAD_REQUEST);
return;
}
- if (clusterViewService.containsAny(incomingTopologyAnnouncement
- .listInstances())) {
- logger.info("doPost: rejecting an announcement as it contains instance(s) that is/are part of my cluster: "
- + incomingTopologyAnnouncement);
- response.sendError(409);
- return;
+
+ String slingId = clusterViewService.getSlingId();
+ if (slingId==null) {
+ response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
+ logger.info("doPut: no slingId available. Service not ready as expected at the moment.");
+ return;
}
- if (!announcementRegistry
- .registerAnnouncement(incomingTopologyAnnouncement)) {
- logger.info("doPost: rejecting an announcement from an instance that I already see in my topology: "
+ incomingTopologyAnnouncement.removeInherited(slingId);
+
+ final Announcement replyAnnouncement = new Announcement(
+ slingId);
+
+ if (!incomingTopologyAnnouncement.isCorrectVersion()) {
+ logger.warn("doPost: rejecting an announcement from an incompatible connector protocol version: "
+ incomingTopologyAnnouncement);
- response.sendError(409);
+ response.sendError(HttpServletResponse.SC_BAD_REQUEST);
return;
- }
-
- Announcement replyAnnouncement = new Announcement(
- clusterViewService.getSlingId());
- replyAnnouncement.setLocalCluster(clusterViewService
- .getClusterView());
- announcementRegistry.addAllExcept(replyAnnouncement,
- new AnnouncementFilter() {
-
- public boolean accept(final String receivingSlingId, Announcement announcement) {
- if (announcement.getPrimaryKey().equals(
- incomingTopologyAnnouncement
- .getPrimaryKey())) {
- return false;
+ } else if (clusterViewService.contains(incomingTopologyAnnouncement
+ .getOwnerId())) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("doPost: rejecting an announcement from an instance that is part of my cluster: "
+ + incomingTopologyAnnouncement);
+ }
+ // marking as 'loop'
+ replyAnnouncement.setLoop(true);
+ } else if (clusterViewService.containsAny(incomingTopologyAnnouncement
+ .listInstances())) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("doPost: rejecting an announcement as it contains instance(s) that is/are part of my cluster: "
+ + incomingTopologyAnnouncement);
+ }
+ // marking as 'loop'
+ replyAnnouncement.setLoop(true);
+ } else if (!announcementRegistry
+ .registerAnnouncement(incomingTopologyAnnouncement)) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("doPost: rejecting an announcement from an instance that I already see in my topology: "
+ + incomingTopologyAnnouncement);
+ }
+ // marking as 'loop'
+ replyAnnouncement.setLoop(true);
+ } else {
+ // normal, successful case: replying with the part of the topology which this instance sees
+ replyAnnouncement.setLocalCluster(clusterViewService
+ .getClusterView());
+ announcementRegistry.addAllExcept(replyAnnouncement,
+ new AnnouncementFilter() {
+
+ public boolean accept(final String receivingSlingId, Announcement announcement) {
+ if (announcement.getPrimaryKey().equals(
+ incomingTopologyAnnouncement
+ .getPrimaryKey())) {
+ return false;
+ }
+ return true;
}
- return true;
- }
- });
+ });
+ }
final String p = replyAnnouncement.asJSON();
- PrintWriter pw = response.getWriter();
+ final PrintWriter pw = response.getWriter();
pw.print(p);
pw.flush();
} catch (JSONException e) {
Modified: sling/trunk/contrib/extensions/discovery/impl/src/main/resources/OSGI-INF/metatype/metatype.properties
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/discovery/impl/src/main/resources/OSGI-INF/metatype/metatype.properties?rev=1476138&r1=1476137&r2=1476138&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/discovery/impl/src/main/resources/OSGI-INF/metatype/metatype.properties (original)
+++ sling/trunk/contrib/extensions/discovery/impl/src/main/resources/OSGI-INF/metatype/metatype.properties Fri Apr 26 11:08:29 2013
@@ -32,8 +32,14 @@ heartbeatInterval.name = Heartbeat inter
heartbeatInterval.description = Configure the interval (in seconds) according to which the \
heartbeats are exchanged in the topology. Default is 15 seconds.
-topologyConnectorUrl.name = Topology Connector URL
-topologyConnectorUrl.description = URL where to join a topology, e.g. \
+minEventDelay.name = Minimal Event Delay (seconds)
+minEventDelay.description = Configure a minimal delay (in seconds) between TOPOLOGY_CHANGING \
+ and TOPOLOGY_CHANGED. Any further changes happening during this delay are accumulated and \
+ combined in the TOPOLOGY_CHANGED after this delay. THis helps avoiding event-flooding. \
+ Default is 3 seconds. A negative value or zero disables this delay.
+
+topologyConnectorUrls.name = Topology Connector URLs
+topologyConnectorUrls.description = URLs where to join a topology, e.g. \
http://localhost:4502/libs/sling/topology/connector
topologyConnectorWhitelist.name = Topology Connector Whitelist
Modified: sling/trunk/contrib/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/cluster/SingleInstanceTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/cluster/SingleInstanceTest.java?rev=1476138&r1=1476137&r2=1476138&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/cluster/SingleInstanceTest.java (original)
+++ sling/trunk/contrib/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/cluster/SingleInstanceTest.java Fri Apr 26 11:08:29 2013
@@ -138,6 +138,7 @@ public class SingleInstanceTest {
assertEquals(1, assertingTopologyEventListener.getRemainingExpectedCount());
assertEquals(0, pp.getGetCnt());
instance.bindPropertyProvider(pp, propertyName);
+ Thread.sleep(1500);
assertEquals(0, assertingTopologyEventListener.getRemainingExpectedCount());
// we can only assume that the getProperty was called at least once - it
// could be called multiple times though..
@@ -153,13 +154,13 @@ public class SingleInstanceTest {
instance.runHeartbeatOnce();
Thread.sleep(2000);
assertEquals(0, assertingTopologyEventListener.getRemainingExpectedCount());
- assertEquals(1, pp.getGetCnt());
+ assertEquals(2, pp.getGetCnt());
// a heartbeat repeat should not result in another call though
instance.runHeartbeatOnce();
Thread.sleep(2000);
assertEquals(0, assertingTopologyEventListener.getRemainingExpectedCount());
- assertEquals(2, pp.getGetCnt());
+ assertEquals(3, pp.getGetCnt());
}
Modified: sling/trunk/contrib/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/setup/Instance.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/setup/Instance.java?rev=1476138&r1=1476137&r2=1476138&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/setup/Instance.java (original)
+++ sling/trunk/contrib/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/setup/Instance.java Fri Apr 26 11:08:29 2013
@@ -39,11 +39,14 @@ import javax.jcr.observation.Observation
import junitx.util.PrivateAccessor;
import org.apache.sling.api.SlingConstants;
-import org.apache.sling.api.resource.LoginException;
import org.apache.sling.api.resource.ResourceResolver;
import org.apache.sling.api.resource.ResourceResolverFactory;
-import org.apache.sling.discovery.TopologyEventListener;
+import org.apache.sling.commons.scheduler.Scheduler;
+import org.apache.sling.commons.scheduler.impl.QuartzScheduler;
+import org.apache.sling.commons.threads.ThreadPoolManager;
+import org.apache.sling.commons.threads.impl.DefaultThreadPoolManager;
import org.apache.sling.discovery.PropertyProvider;
+import org.apache.sling.discovery.TopologyEventListener;
import org.apache.sling.discovery.impl.Config;
import org.apache.sling.discovery.impl.DiscoveryServiceImpl;
import org.apache.sling.discovery.impl.cluster.ClusterViewService;
@@ -84,6 +87,26 @@ public class Instance {
private ResourceResolver resourceResolver;
private int serviceId = 999;
+
+ private static Scheduler singletonScheduler = null;
+
+ private static Scheduler getSingletonScheduler() throws Exception {
+ if (singletonScheduler!=null) {
+ return singletonScheduler;
+ }
+ final Scheduler newscheduler = new QuartzScheduler();
+ final ThreadPoolManager tpm = new DefaultThreadPoolManager(null, null);
+ try {
+ PrivateAccessor.invoke(newscheduler, "bindThreadPoolManager",
+ new Class[] { ThreadPoolManager.class },
+ new Object[] { tpm });
+ } catch (Throwable e1) {
+ org.junit.Assert.fail(e1.toString());
+ }
+ OSGiMock.activate(newscheduler);
+ singletonScheduler = newscheduler;
+ return singletonScheduler;
+ }
private Instance(String debugName,
ResourceResolverFactory resourceResolverFactory, boolean resetRepo)
@@ -97,10 +120,15 @@ public class Instance {
Config config = new Config() {
@Override
public long getHeartbeatTimeout() {
- return 20000;
+ return 20;
+ }
+
+ @Override
+ public int getMinEventDelay() {
+ return 1;
}
};
-
+
clusterViewService = OSGiFactory.createClusterViewServiceImpl(slingId,
resourceResolverFactory, config);
announcementRegistry = OSGiFactory.createITopologyAnnouncementRegistry(
@@ -111,11 +139,11 @@ public class Instance {
resourceResolverFactory, slingId, announcementRegistry,
connectorRegistry, config,
resourceResolverFactory.getAdministrativeResourceResolver(null)
- .adaptTo(Repository.class));
-
- discoveryService = OSGiFactory.createDiscoverService(slingId,
+ .adaptTo(Repository.class), getSingletonScheduler());
+
+ discoveryService = OSGiFactory.createDiscoverService(slingId,
heartbeatHandler, clusterViewService, announcementRegistry,
- resourceResolverFactory, config);
+ resourceResolverFactory, config, connectorRegistry, getSingletonScheduler());
votingHandler = OSGiFactory.createVotingHandler(slingId,
resourceResolverFactory, config);
@@ -272,10 +300,11 @@ public class Instance {
}
}
- public void stop() throws LoginException {
+ public void stop() throws Exception {
if (resourceResolver != null) {
resourceResolver.close();
}
+ osgiMock.deactivateAll();
}
public void bindTopologyEventListener(TopologyEventListener eventListener)
Modified: sling/trunk/contrib/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/setup/MockFactory.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/setup/MockFactory.java?rev=1476138&r1=1476137&r2=1476138&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/setup/MockFactory.java (original)
+++ sling/trunk/contrib/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/setup/MockFactory.java Fri Apr 26 11:08:29 2013
@@ -22,6 +22,7 @@ import java.util.Dictionary;
import java.util.Properties;
import org.apache.sling.api.resource.ResourceResolverFactory;
+import org.apache.sling.commons.scheduler.Scheduler;
import org.apache.sling.settings.SlingSettingsService;
import org.hamcrest.Description;
import org.jmock.Expectations;
Modified: sling/trunk/contrib/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/setup/OSGiFactory.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/setup/OSGiFactory.java?rev=1476138&r1=1476137&r2=1476138&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/setup/OSGiFactory.java (original)
+++ sling/trunk/contrib/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/setup/OSGiFactory.java Fri Apr 26 11:08:29 2013
@@ -23,6 +23,7 @@ import javax.jcr.Repository;
import junitx.util.PrivateAccessor;
import org.apache.sling.api.resource.ResourceResolverFactory;
+import org.apache.sling.commons.scheduler.Scheduler;
import org.apache.sling.discovery.impl.Config;
import org.apache.sling.discovery.impl.DiscoveryServiceImpl;
import org.apache.sling.discovery.impl.cluster.ClusterViewServiceImpl;
@@ -52,7 +53,7 @@ public class OSGiFactory {
public static HeartbeatHandler createHeartbeatHandler(
ResourceResolverFactory resourceResolverFactory, String slingId,
AnnouncementRegistry topologyService,
- ConnectorRegistry connectorRegistry, Config config, Repository repository)
+ ConnectorRegistry connectorRegistry, Config config, Repository repository, Scheduler scheduler)
throws Exception {
HeartbeatHandler heartbeatHandler = new HeartbeatHandler();
PrivateAccessor.setField(heartbeatHandler, "resourceResolverFactory",
@@ -64,6 +65,7 @@ public class OSGiFactory {
PrivateAccessor.setField(heartbeatHandler, "connectorRegistry",
connectorRegistry);
PrivateAccessor.setField(heartbeatHandler, "config", config);
+ PrivateAccessor.setField(heartbeatHandler, "scheduler", scheduler);
return heartbeatHandler;
}
@@ -72,7 +74,8 @@ public class OSGiFactory {
HeartbeatHandler heartbeatHandler,
ClusterViewServiceImpl clusterViewService,
AnnouncementRegistry topologyRegistry,
- ResourceResolverFactory resourceResolverFactory, Config config)
+ ResourceResolverFactory resourceResolverFactory, Config config,
+ ConnectorRegistry connectorRegistry, Scheduler scheduler)
throws Exception {
DiscoveryServiceImpl discoveryService = new DiscoveryServiceImpl();
PrivateAccessor.setField(discoveryService, "settingsService",
@@ -86,6 +89,10 @@ public class OSGiFactory {
PrivateAccessor.setField(discoveryService, "resourceResolverFactory",
resourceResolverFactory);
PrivateAccessor.setField(discoveryService, "config", config);
+ PrivateAccessor.setField(discoveryService, "connectorRegistry",
+ connectorRegistry);
+ PrivateAccessor.setField(discoveryService, "scheduler",
+ scheduler);
return discoveryService;
}
Modified: sling/trunk/contrib/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/setup/OSGiMock.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/setup/OSGiMock.java?rev=1476138&r1=1476137&r2=1476138&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/setup/OSGiMock.java (original)
+++ sling/trunk/contrib/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/setup/OSGiMock.java Fri Apr 26 11:08:29 2013
@@ -18,6 +18,7 @@
*/
package org.apache.sling.discovery.impl.setup;
+import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Iterator;
import java.util.LinkedList;
@@ -53,18 +54,48 @@ public class OSGiMock {
Iterator it = services.iterator(); it.hasNext();) {
Object aService = it.next();
- Method[] methods = aService.getClass().getDeclaredMethods();
- for (int i = 0; i < methods.length; i++) {
- Method method = methods[i];
- if (method.getName().equals("activate")) {
- method.setAccessible(true);
- if ( method.getParameterTypes().length == 0 ) {
- method.invoke(aService, null);
- } else {
- method.invoke(aService, MockFactory.mockComponentContext());
- }
- }
- }
+ activate(aService);
}
}
+
+ public static void activate(Object aService) throws IllegalAccessException,
+ InvocationTargetException {
+ Method[] methods = aService.getClass().getDeclaredMethods();
+ for (int i = 0; i < methods.length; i++) {
+ Method method = methods[i];
+ if (method.getName().equals("activate")) {
+ method.setAccessible(true);
+ if ( method.getParameterTypes().length == 0 ) {
+ method.invoke(aService, null);
+ } else {
+ method.invoke(aService, MockFactory.mockComponentContext());
+ }
+ }
+ }
+ }
+
+ public void deactivateAll() throws Exception {
+ for (@SuppressWarnings("rawtypes")
+ Iterator it = services.iterator(); it.hasNext();) {
+ Object aService = it.next();
+
+ deactivate(aService);
+ }
+ }
+
+ public static void deactivate(Object aService) throws IllegalAccessException,
+ InvocationTargetException {
+ Method[] methods = aService.getClass().getDeclaredMethods();
+ for (int i = 0; i < methods.length; i++) {
+ Method method = methods[i];
+ if (method.getName().equals("deactivate")) {
+ method.setAccessible(true);
+ if ( method.getParameterTypes().length == 0 ) {
+ method.invoke(aService, null);
+ } else {
+ method.invoke(aService, MockFactory.mockComponentContext());
+ }
+ }
+ }
+ }
}
Modified: sling/trunk/contrib/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/topology/connector/ConnectorRegistryTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/contrib/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/topology/connector/ConnectorRegistryTest.java?rev=1476138&r1=1476137&r2=1476138&view=diff
==============================================================================
--- sling/trunk/contrib/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/topology/connector/ConnectorRegistryTest.java (original)
+++ sling/trunk/contrib/extensions/discovery/impl/src/test/java/org/apache/sling/discovery/impl/topology/connector/ConnectorRegistryTest.java Fri Apr 26 11:08:29 2013
@@ -29,7 +29,6 @@ import org.apache.sling.discovery.impl.s
import org.apache.sling.discovery.impl.setup.MockFactory;
import org.apache.sling.discovery.impl.setup.OSGiFactory;
import org.apache.sling.discovery.impl.topology.announcement.AnnouncementRegistry;
-import org.apache.sling.discovery.impl.topology.connector.TopologyConnectorClientInformation.OriginInfo;
import org.junit.Test;
public class ConnectorRegistryTest {
@@ -51,44 +50,37 @@ public class ConnectorRegistryTest {
ConnectorRegistry c = OSGiFactory.createConnectorRegistry(
announcementRegistry, config);
- final URL url = new URL("http://localhost:1234");
+ final URL url = new URL("http://localhost:1234/connector");
final ClusterViewService cvs = i.getClusterViewService();
try {
- c.registerOutgoingConnection(cvs, url, null);
+ c.registerOutgoingConnector(null, url);
fail("should have complained");
} catch (IllegalArgumentException e) {
// ok
}
try {
- c.registerOutgoingConnection(null, url, OriginInfo.Programmatically);
- fail("should have complained");
- } catch (IllegalArgumentException e) {
- // ok
- }
- try {
- c.registerOutgoingConnection(cvs, null, OriginInfo.Config);
+ c.registerOutgoingConnector(cvs, null);
fail("should have complained");
} catch (IllegalArgumentException e) {
// ok
}
TopologyConnectorClientInformation client = c
- .registerOutgoingConnection(cvs, url, OriginInfo.WebConsole);
+ .registerOutgoingConnector(cvs, url);
try {
// should not be able to register same url twice
- client = c.registerOutgoingConnection(cvs, url,
- OriginInfo.Programmatically);
+ client = c.registerOutgoingConnector(cvs, url);
fail("should have complained");
} catch (IllegalStateException e) {
// ok
}
try {
- c.unregisterOutgoingConnection(null);
+ c.unregisterOutgoingConnector(null);
fail("should have complained");
} catch (IllegalArgumentException e) {
// ok
}
- c.unregisterOutgoingConnection(client.getId());
+ c.unregisterOutgoingConnector(client.getId());
}
}