You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by st...@apache.org on 2015/10/20 16:12:34 UTC
svn commit: r1709601 [2/11] - in /sling/trunk/bundles/extensions/discovery:
base/ base/src/ base/src/main/ base/src/main/java/ base/src/main/java/org/
base/src/main/java/org/apache/ base/src/main/java/org/apache/sling/
base/src/main/java/org/apache/sli...
Added: sling/trunk/bundles/extensions/discovery/base/src/main/java/org/apache/sling/discovery/base/connectors/announcement/Announcement.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/discovery/base/src/main/java/org/apache/sling/discovery/base/connectors/announcement/Announcement.java?rev=1709601&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/discovery/base/src/main/java/org/apache/sling/discovery/base/connectors/announcement/Announcement.java (added)
+++ sling/trunk/bundles/extensions/discovery/base/src/main/java/org/apache/sling/discovery/base/connectors/announcement/Announcement.java Tue Oct 20 14:12:31 2015
@@ -0,0 +1,460 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.discovery.base.connectors.announcement;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.sling.api.resource.ModifiableValueMap;
+import org.apache.sling.api.resource.PersistenceException;
+import org.apache.sling.api.resource.Resource;
+import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.commons.json.JSONArray;
+import org.apache.sling.commons.json.JSONException;
+import org.apache.sling.commons.json.JSONObject;
+import org.apache.sling.discovery.ClusterView;
+import org.apache.sling.discovery.InstanceDescription;
+import org.apache.sling.discovery.commons.providers.DefaultClusterView;
+import org.apache.sling.discovery.commons.providers.DefaultInstanceDescription;
+import org.apache.sling.discovery.commons.providers.NonLocalInstanceDescription;
+
+/**
+ * An announcement is the information exchanged by the topology connector and
+ * contains all clusters and instances which both the topology connector client
+ * and servlet see (in their part before joining the two worlds).
+ * <p>
+ * An announcement is exchanged in json format and carries a timeout.
+ */
+public class Announcement {
+
+ /** the protocol version this announcement currently represents. Mismatching protocol versions are
+ * used to detect incompatible topology connectors
+ */
+ private final static int PROTOCOL_VERSION = 1;
+
+ /** the sling id of the owner of this announcement. the owner is where this announcement comes from **/
+ private final String ownerId;
+
+ /** announcement protocol version **/
+ private final int protocolVersion;
+
+ /** the local cluster view **/
+ private ClusterView localCluster;
+
+ /** the incoming instances **/
+ private List<Announcement> incomings = new LinkedList<Announcement>();
+
+ /** whether or not this annoucement was inherited (response of a connect) or incoming (the connect) **/
+ private boolean inherited = false;
+
+ /** some information about the server where this announcement came from **/
+ private String serverInfo;
+
+ /** whether or not this announcement represents a loop detected in the topology connectors **/
+ private boolean loop = false;
+
+ /** SLING-3382: Sets the backoffInterval which the connector servlets passes back to the client to use as the next heartbeatInterval **/
+ private long backoffInterval = -1;
+
+ /** SLING-3382: the resetBackoff flag is sent from client to server and indicates that the client wants to start from (backoff) scratch **/
+ private boolean resetBackoff = false;
+
+ public Announcement(final String ownerId) {
+ this(ownerId, PROTOCOL_VERSION);
+ }
+
+ public Announcement(final String ownerId, int protocolVersion) {
+ if (ownerId==null || ownerId.length()==0) {
+ throw new IllegalArgumentException("ownerId must not be null or empty");
+ }
+ this.ownerId = ownerId;
+ this.protocolVersion = protocolVersion;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder incomingList = new StringBuilder();
+ for (Iterator<Announcement> it = incomings.iterator(); it.hasNext();) {
+ Announcement anIncomingAnnouncement = it.next();
+ if (incomingList.length()!=0) {
+ incomingList.append(", ");
+ }
+ incomingList.append(anIncomingAnnouncement);
+ }
+ return "Announcement[ownerId="+getOwnerId()+
+ ", protocolVersion="+protocolVersion+
+ ", inherited="+isInherited()+
+ ", loop="+loop+
+ ", incomings="+incomingList+"]";
+ }
+
+ /** check whether this is announcement contains the valid protocol version **/
+ public boolean isCorrectVersion() {
+ return (protocolVersion==PROTOCOL_VERSION);
+ }
+
+ /** check whether this is a valid announcement, containing the minimal information **/
+ public boolean isValid() {
+ if (ownerId==null || ownerId.length()==0) {
+ return false;
+ }
+ if (loop) {
+ return true;
+ }
+ if (!isCorrectVersion()) {
+ return false;
+ }
+ if (localCluster==null) {
+ return false;
+ }
+ try{
+ List<InstanceDescription> instances = localCluster.getInstances();
+ if (instances==null || instances.size()==0) {
+ return false;
+ }
+ boolean isOwnerMemberOfLocalCluster = false;
+ for (Iterator<InstanceDescription> it = instances.iterator(); it.hasNext();) {
+ InstanceDescription instanceDescription = it.next();
+ if (instanceDescription.getSlingId().equals(ownerId)) {
+ isOwnerMemberOfLocalCluster = true;
+ }
+ }
+ if (!isOwnerMemberOfLocalCluster) {
+ return false;
+ }
+ } catch(Exception ise) {
+ return false;
+ }
+ return true;
+ }
+
+ /** set the inherited flag - if true this means this announcement is the response of a topology connect **/
+ public void setInherited(final boolean inherited) {
+ this.inherited = inherited;
+ }
+
+ /** Returns the inherited flag - if true this means that this announcement is the response of a topology connect **/
+ public boolean isInherited() {
+ return inherited;
+ }
+
+ /** Sets the loop falg - set true when this announcement should represent a loop detected in the topology connectors **/
+ public void setLoop(final boolean loop) {
+ this.loop = loop;
+ }
+
+ /** Sets the backoffInterval which the connector servlets passes back to the client to use as the next heartbeatInterval **/
+ public void setBackoffInterval(long backoffInterval) {
+ this.backoffInterval = backoffInterval;
+ }
+
+ /** Gets the backoffInterval which the connector servlets passes back to the client to use as the next heartbeatInterval **/
+ public long getBackoffInterval() {
+ return this.backoffInterval;
+ }
+
+ /** sets the resetBackoff flag **/
+ public void setResetBackoff(boolean resetBackoff) {
+ this.resetBackoff = resetBackoff;
+ }
+
+ /** gets the resetBackoff flag **/
+ public boolean getResetBackoff() {
+ return resetBackoff;
+ }
+
+ /** Returns the loop flag - set when this announcement represents a loop detected in the topology connectors **/
+ public boolean isLoop() {
+ return loop;
+ }
+
+ /** Returns the protocolVersion of this announcement **/
+ public int getProtocolVersion() {
+ return protocolVersion;
+ }
+
+ /** sets the information about the server where this announcement came from **/
+ public void setServerInfo(final String serverInfo) {
+ this.serverInfo = serverInfo;
+ }
+
+ /** the information about the server where this announcement came from **/
+ public String getServerInfo() {
+ return serverInfo;
+ }
+
+ /**
+ * Returns the slingid of the owner of this announcement.
+ * <p>
+ * The owner is the instance which initiated the topology connection
+ */
+ public String getOwnerId() {
+ return ownerId;
+ }
+
+ /** Convert this announcement into a json object **/
+ public JSONObject asJSONObject() throws JSONException {
+ return asJSONObject(false);
+ }
+
+ /** Convert this announcement into a json object **/
+ private JSONObject asJSONObject(boolean filterTimes) throws JSONException {
+ JSONObject announcement = new JSONObject();
+ announcement.put("ownerId", ownerId);
+ announcement.put("protocolVersion", protocolVersion);
+ // SLING-3389: leaving the 'created' property in the announcement
+ // for backwards compatibility!
+ if (!filterTimes) {
+ announcement.put("created", System.currentTimeMillis());
+ }
+ announcement.put("inherited", inherited);
+ if (loop) {
+ announcement.put("loop", loop);
+ }
+ if (serverInfo != null) {
+ announcement.put("serverInfo", serverInfo);
+ }
+ if (localCluster!=null) {
+ announcement.put("localClusterView", asJSON(localCluster));
+ }
+ if (!filterTimes && backoffInterval>0) {
+ announcement.put("backoffInterval", backoffInterval);
+ }
+ if (resetBackoff) {
+ announcement.put("resetBackoff", resetBackoff);
+ }
+ JSONArray incomingAnnouncements = new JSONArray();
+ for (Iterator<Announcement> it = incomings.iterator(); it.hasNext();) {
+ Announcement incoming = it.next();
+ incomingAnnouncements.put(incoming.asJSONObject(filterTimes));
+ }
+ announcement.put("topologyAnnouncements", incomingAnnouncements);
+ return announcement;
+ }
+
+ /** Create an announcement form json **/
+ public static Announcement fromJSON(final String topologyAnnouncementJSON)
+ throws JSONException {
+ JSONObject announcement = new JSONObject(topologyAnnouncementJSON);
+ final String ownerId = announcement.getString("ownerId");
+ final int protocolVersion;
+ if (!announcement.has("protocolVersion")) {
+ protocolVersion = -1;
+ } else {
+ protocolVersion = announcement.getInt("protocolVersion");
+ }
+ final Announcement result = new Announcement(ownerId, protocolVersion);
+ if (announcement.has("backoffInterval")) {
+ long backoffInterval = announcement.getLong("backoffInterval");
+ result.backoffInterval = backoffInterval;
+ }
+ if (announcement.has("resetBackoff")) {
+ boolean resetBackoff = announcement.getBoolean("resetBackoff");
+ result.resetBackoff = resetBackoff;
+ }
+ if (announcement.has("loop") && announcement.getBoolean("loop")) {
+ result.setLoop(true);
+ return result;
+ }
+ final String localClusterViewJSON = announcement
+ .getString("localClusterView");
+ final ClusterView localClusterView = asClusterView(localClusterViewJSON);
+ final JSONArray subAnnouncements = announcement
+ .getJSONArray("topologyAnnouncements");
+
+ if (announcement.has("inherited")) {
+ final Boolean inherited = announcement.getBoolean("inherited");
+ result.inherited = inherited;
+ }
+ if (announcement.has("serverInfo")) {
+ String serverInfo = announcement.getString("serverInfo");
+ result.serverInfo = serverInfo;
+ }
+ result.setLocalCluster(localClusterView);
+ for (int i = 0; i < subAnnouncements.length(); i++) {
+ String subAnnouncementJSON = subAnnouncements.getString(i);
+ result.addIncomingTopologyAnnouncement(fromJSON(subAnnouncementJSON));
+ }
+ return result;
+ }
+
+ /** create a clusterview from json **/
+ private static ClusterView asClusterView(final String localClusterViewJSON)
+ throws JSONException {
+ JSONObject obj = new JSONObject(localClusterViewJSON);
+ DefaultClusterView clusterView = new DefaultClusterView(
+ obj.getString("id"));
+ JSONArray instancesObj = obj.getJSONArray("instances");
+
+ for (int i = 0; i < instancesObj.length(); i++) {
+ JSONObject anInstance = instancesObj.getJSONObject(i);
+ clusterView.addInstanceDescription(asInstance(anInstance));
+ }
+
+ return clusterView;
+ }
+
+ /** convert a clusterview into json **/
+ private static JSONObject asJSON(final ClusterView clusterView)
+ throws JSONException {
+ JSONObject obj = new JSONObject();
+ obj.put("id", clusterView.getId());
+ JSONArray instancesObj = new JSONArray();
+ List<InstanceDescription> instances = clusterView.getInstances();
+ for (Iterator<InstanceDescription> it = instances.iterator(); it
+ .hasNext();) {
+ InstanceDescription instanceDescription = it.next();
+ instancesObj.put(asJSON(instanceDescription));
+ }
+ obj.put("instances", instancesObj);
+ return obj;
+ }
+
+ /** create an instancedescription from json **/
+ private static DefaultInstanceDescription asInstance(
+ final JSONObject anInstance) throws JSONException {
+ final boolean isLeader = anInstance.getBoolean("isLeader");
+ final String slingId = anInstance.getString("slingId");
+
+ final JSONObject propertiesObj = anInstance.getJSONObject("properties");
+ Iterator<String> it = propertiesObj.keys();
+ Map<String, String> properties = new HashMap<String, String>();
+ while (it.hasNext()) {
+ String key = it.next();
+ properties.put(key, propertiesObj.getString(key));
+ }
+
+ NonLocalInstanceDescription instance = new NonLocalInstanceDescription(
+ null, isLeader, slingId, properties);
+ return instance;
+ }
+
+ /** convert an instance description into a json object **/
+ private static JSONObject asJSON(final InstanceDescription instanceDescription)
+ throws JSONException {
+ JSONObject obj = new JSONObject();
+ obj.put("slingId", instanceDescription.getSlingId());
+ obj.put("isLeader", instanceDescription.isLeader());
+ ClusterView cluster = instanceDescription.getClusterView();
+ if (cluster != null) {
+ obj.put("cluster", cluster.getId());
+ }
+ JSONObject propertiesObj = new JSONObject();
+ Map<String, String> propertiesMap = instanceDescription.getProperties();
+ for (Iterator<Entry<String, String>> it = propertiesMap.entrySet()
+ .iterator(); it.hasNext();) {
+ Entry<String, String> entry = it.next();
+ propertiesObj.put(entry.getKey(), entry.getValue());
+ }
+ obj.put("properties", propertiesObj);
+ return obj;
+ }
+
+ /** sets the local clusterview **/
+ public void setLocalCluster(ClusterView localCluster) {
+ this.localCluster = localCluster;
+ }
+
+ /** adds an incoming announcement to this announcement **/
+ public void addIncomingTopologyAnnouncement(
+ Announcement incomingTopologyAnnouncement) {
+ incomings.add(incomingTopologyAnnouncement);
+ }
+
+ /** Convert this announcement into json **/
+ public String asJSON() throws JSONException {
+ return asJSONObject().toString();
+ }
+
+ /** the key which is unique to this announcement **/
+ public String getPrimaryKey() {
+ return ownerId;
+ }
+
+ /** Returns the list of instances that are contained in this announcement **/
+ public Collection<InstanceDescription> listInstances() {
+ Collection<InstanceDescription> instances = new LinkedList<InstanceDescription>();
+ instances.addAll(localCluster.getInstances());
+
+ for (Iterator<Announcement> it = incomings.iterator(); it.hasNext();) {
+ Announcement incomingAnnouncement = it.next();
+ instances.addAll(incomingAnnouncement.listInstances());
+ }
+ return instances;
+ }
+
+ /**
+ * Persists this announcement using the given 'announcements' resource,
+ * under which a node with the primary key is created
+ **/
+ public void persistTo(Resource announcementsResource)
+ throws PersistenceException, JSONException {
+ Resource announcementChildResource = announcementsResource.getChild(getPrimaryKey());
+
+ // SLING-2967 used to introduce 'resetting the created time' here
+ // in order to become machine-clock independent.
+ // With introduction of SLING-3389, where we dont store any
+ // announcement-heartbeat-dates anymore at all, this resetting here
+ // became unnecessary.
+
+ final String announcementJson = asJSON();
+ if (announcementChildResource==null) {
+ final ResourceResolver resourceResolver = announcementsResource.getResourceResolver();
+ Map<String, Object> properties = new HashMap<String, Object>();
+ properties.put("topologyAnnouncement", announcementJson);
+ resourceResolver.create(announcementsResource, getPrimaryKey(), properties);
+ } else {
+ final ModifiableValueMap announcementChildMap = announcementChildResource.adaptTo(ModifiableValueMap.class);
+ announcementChildMap.put("topologyAnnouncement", announcementJson);
+ }
+ }
+
+ /**
+ * Remove all announcements that match the given owner Id
+ */
+ public void removeInherited(final String ownerId) {
+ for (Iterator<Announcement> it = incomings.iterator(); it.hasNext();) {
+ Announcement anIncomingAnnouncement = it.next();
+ if (anIncomingAnnouncement.isInherited()
+ && anIncomingAnnouncement.getOwnerId().equals(ownerId)) {
+ // then filter this
+ it.remove();
+ }
+
+ }
+ }
+
+ /**
+ * Compare this Announcement with another one, ignoring the 'created'
+ * property - which gets added to the JSON object automatically due
+ * to SLING-3389 wire-backwards-compatibility - and backoffInterval
+ * introduced as part of SLING-3382
+ */
+ public boolean correspondsTo(Announcement announcement) throws JSONException {
+ final JSONObject myJson = asJSONObject(true);
+ final JSONObject otherJson = announcement.asJSONObject(true);
+ return myJson.toString().equals(otherJson.toString());
+ }
+
+}
Propchange: sling/trunk/bundles/extensions/discovery/base/src/main/java/org/apache/sling/discovery/base/connectors/announcement/Announcement.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: sling/trunk/bundles/extensions/discovery/base/src/main/java/org/apache/sling/discovery/base/connectors/announcement/AnnouncementFilter.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/discovery/base/src/main/java/org/apache/sling/discovery/base/connectors/announcement/AnnouncementFilter.java?rev=1709601&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/discovery/base/src/main/java/org/apache/sling/discovery/base/connectors/announcement/AnnouncementFilter.java (added)
+++ sling/trunk/bundles/extensions/discovery/base/src/main/java/org/apache/sling/discovery/base/connectors/announcement/AnnouncementFilter.java Tue Oct 20 14:12:31 2015
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.discovery.base.connectors.announcement;
+
+/**
+ * Filter used during announcement processing internally
+ **/
+public interface AnnouncementFilter {
+
+ /**
+ * Check if the provided announcement, which was received by the provided
+ * slingId can be accepted or not.
+ **/
+ boolean accept(String receivingSlingId, Announcement announcement);
+
+}
Propchange: sling/trunk/bundles/extensions/discovery/base/src/main/java/org/apache/sling/discovery/base/connectors/announcement/AnnouncementFilter.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: sling/trunk/bundles/extensions/discovery/base/src/main/java/org/apache/sling/discovery/base/connectors/announcement/AnnouncementRegistry.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/discovery/base/src/main/java/org/apache/sling/discovery/base/connectors/announcement/AnnouncementRegistry.java?rev=1709601&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/discovery/base/src/main/java/org/apache/sling/discovery/base/connectors/announcement/AnnouncementRegistry.java (added)
+++ sling/trunk/bundles/extensions/discovery/base/src/main/java/org/apache/sling/discovery/base/connectors/announcement/AnnouncementRegistry.java Tue Oct 20 14:12:31 2015
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.discovery.base.connectors.announcement;
+
+import java.util.Collection;
+
+import org.apache.sling.discovery.ClusterView;
+import org.apache.sling.discovery.InstanceDescription;
+
+/**
+ * The announcement registry keeps track of all the announcement that this
+ * instance either received by a joined topology connector or that a topology
+ * connector inherited from the counterpart (the topology connector servlet)
+ */
+public interface AnnouncementRegistry {
+
+ /**
+ * Register the given announcement - and returns the backoff interval (in seconds)
+ * for stable connectors
+ * - or -1 if the registration was not successful (likely indicating a loop)
+ * @return the backoff interval (in seconds) for stable connectors
+ * - or -1 if the registration was not successful (likely indicating a loop)
+ */
+ long registerAnnouncement(Announcement topologyAnnouncement);
+
+ /** list all announcements that were received by instances in the local cluster **/
+ Collection<Announcement> listAnnouncementsInSameCluster(ClusterView localClusterView);
+
+ /** list all announcements that were received (incoming or inherited) by this instance **/
+ Collection<Announcement> listLocalAnnouncements();
+
+ /** list all announcements that this instance received (incoming) **/
+ Collection<CachedAnnouncement> listLocalIncomingAnnouncements();
+
+ /** Check for expired announcements and remove any if applicable **/
+ void checkExpiredAnnouncements();
+
+ /** Returns the list of instances contained in all non-expired announcements of this registry **/
+ Collection<InstanceDescription> listInstances(ClusterView localClusterView);
+
+ /** Add all registered announcements to the given target announcement that are accepted by the given filter **/
+ void addAllExcept(Announcement target, ClusterView localClusterView, AnnouncementFilter filter);
+
+ /** Unregister the announcement owned by the given slingId **/
+ void unregisterAnnouncement(String ownerId);
+
+ /** Whether or not the given owner has an active (ie not expired) announcement registered **/
+ boolean hasActiveAnnouncement(String ownerId);
+
+}
Propchange: sling/trunk/bundles/extensions/discovery/base/src/main/java/org/apache/sling/discovery/base/connectors/announcement/AnnouncementRegistry.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: sling/trunk/bundles/extensions/discovery/base/src/main/java/org/apache/sling/discovery/base/connectors/announcement/AnnouncementRegistryImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/discovery/base/src/main/java/org/apache/sling/discovery/base/connectors/announcement/AnnouncementRegistryImpl.java?rev=1709601&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/discovery/base/src/main/java/org/apache/sling/discovery/base/connectors/announcement/AnnouncementRegistryImpl.java (added)
+++ sling/trunk/bundles/extensions/discovery/base/src/main/java/org/apache/sling/discovery/base/connectors/announcement/AnnouncementRegistryImpl.java Tue Oct 20 14:12:31 2015
@@ -0,0 +1,593 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.discovery.base.connectors.announcement;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.Service;
+import org.apache.sling.api.resource.LoginException;
+import org.apache.sling.api.resource.PersistenceException;
+import org.apache.sling.api.resource.Resource;
+import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.api.resource.ResourceResolverFactory;
+import org.apache.sling.api.resource.ValueMap;
+import org.apache.sling.commons.json.JSONException;
+import org.apache.sling.discovery.ClusterView;
+import org.apache.sling.discovery.InstanceDescription;
+import org.apache.sling.discovery.base.connectors.BaseConfig;
+import org.apache.sling.discovery.commons.providers.util.ResourceHelper;
+import org.apache.sling.settings.SlingSettingsService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Default implementation of the AnnouncementRegistry which
+ * handles JSON-backed announcements and does so by storing
+ * them in a local like /var/discovery/impl/clusterNodes/$slingId/announcement.
+ */
+@Component
+@Service(value = AnnouncementRegistry.class)
+public class AnnouncementRegistryImpl implements AnnouncementRegistry {
+
+ private final Logger logger = LoggerFactory.getLogger(this.getClass());
+
+ @Reference
+ private ResourceResolverFactory resourceResolverFactory;
+
+ @Reference
+ private SlingSettingsService settingsService;
+
+ private String slingId;
+
+ @Reference
+ private BaseConfig config;
+
+ public static AnnouncementRegistryImpl testConstructorAndActivate(ResourceResolverFactory resourceResolverFactory,
+ SlingSettingsService slingSettingsService, BaseConfig config) {
+ AnnouncementRegistryImpl registry = testConstructor(resourceResolverFactory, slingSettingsService, config);
+ registry.activate();
+ return registry;
+ }
+
+ public static AnnouncementRegistryImpl testConstructor(ResourceResolverFactory resourceResolverFactory,
+ SlingSettingsService slingSettingsService, BaseConfig config) {
+ AnnouncementRegistryImpl registry = new AnnouncementRegistryImpl();
+ registry.resourceResolverFactory = resourceResolverFactory;
+ registry.settingsService = slingSettingsService;
+ registry.config = config;
+ return registry;
+ }
+
+ @Activate
+ protected void activate() {
+ slingId = settingsService.getSlingId();
+ }
+
+ private final Map<String,CachedAnnouncement> ownAnnouncementsCache =
+ new HashMap<String,CachedAnnouncement>();
+
+ public synchronized void unregisterAnnouncement(final String ownerId) {
+ if (ownerId==null || ownerId.length()==0) {
+ throw new IllegalArgumentException("ownerId must not be null or empty");
+ }
+ // remove from the cache - even if there's an error afterwards
+ ownAnnouncementsCache.remove(ownerId);
+
+ if (resourceResolverFactory == null) {
+ logger.error("unregisterAnnouncement: resourceResolverFactory is null");
+ return;
+ }
+ ResourceResolver resourceResolver = null;
+ try {
+ resourceResolver = resourceResolverFactory
+ .getAdministrativeResourceResolver(null);
+
+ final String path = config.getClusterInstancesPath()
+ + "/"
+ + slingId
+ + "/announcements/" + ownerId;
+ final Resource announcementsResource = resourceResolver.getResource(path);
+ if (announcementsResource!=null) {
+ resourceResolver.delete(announcementsResource);
+ resourceResolver.commit();
+ }
+
+ } catch (LoginException e) {
+ logger.error(
+ "unregisterAnnouncement: could not log in administratively: "
+ + e, e);
+ throw new RuntimeException("Could not log in to repository (" + e
+ + ")", e);
+ } catch (PersistenceException e) {
+ logger.error("unregisterAnnouncement: got a PersistenceException: "
+ + e, e);
+ throw new RuntimeException(
+ "Exception while talking to repository (" + e + ")", e);
+ } finally {
+ if (resourceResolver != null) {
+ resourceResolver.close();
+ }
+ }
+ }
+
+ public synchronized Collection<Announcement> listLocalAnnouncements() {
+ return fillWithCachedAnnouncements(new LinkedList<Announcement>());
+ }
+
+ public synchronized Collection<CachedAnnouncement> listLocalIncomingAnnouncements() {
+ Collection<CachedAnnouncement> result = new LinkedList<CachedAnnouncement>(ownAnnouncementsCache.values());
+ for (Iterator<CachedAnnouncement> it = result.iterator(); it.hasNext();) {
+ CachedAnnouncement cachedAnnouncement = it.next();
+ if (cachedAnnouncement.getAnnouncement().isInherited()) {
+ it.remove();
+ continue;
+ }
+ if (cachedAnnouncement.hasExpired()) {
+ it.remove();
+ continue;
+ }
+ }
+ return result;
+ }
+
+ private final InstanceDescription getLocalInstanceDescription(final ClusterView localClusterView) {
+ for (Iterator<InstanceDescription> it = localClusterView.getInstances().iterator(); it
+ .hasNext();) {
+ InstanceDescription id = it.next();
+ if (id.isLocal()) {
+ return id;
+ }
+ }
+ return null;
+ }
+
+ public synchronized Collection<Announcement> listAnnouncementsInSameCluster(final ClusterView localClusterView) {
+ logger.debug("listAnnouncementsInSameCluster: start. localClusterView: {}", localClusterView);
+ if (localClusterView==null) {
+ throw new IllegalArgumentException("clusterView must not be null");
+ }
+ ResourceResolver resourceResolver = null;
+ final Collection<Announcement> incomingAnnouncements = new LinkedList<Announcement>();
+ final InstanceDescription localInstance = getLocalInstanceDescription(localClusterView);
+ try {
+ resourceResolver = resourceResolverFactory
+ .getAdministrativeResourceResolver(null);
+
+ Resource clusterInstancesResource = ResourceHelper
+ .getOrCreateResource(
+ resourceResolver,
+ config.getClusterInstancesPath());
+
+ Iterator<Resource> it0 = clusterInstancesResource.getChildren()
+ .iterator();
+ while (it0.hasNext()) {
+ Resource aClusterInstanceResource = it0.next();
+ final String instanceId = aClusterInstanceResource.getName();
+ logger.debug("listAnnouncementsInSameCluster: handling clusterInstance: {}", instanceId);
+ if (localInstance!=null && localInstance.getSlingId().equals(instanceId)) {
+ // this is the local instance then - which we serve from the cache only
+ logger.debug("listAnnouncementsInSameCluster: matched localInstance, filling with cache: {}", instanceId);
+ fillWithCachedAnnouncements(incomingAnnouncements);
+ continue;
+ }
+
+ //TODO: add ClusterView.contains(instanceSlingId) for convenience to next api change
+ if (!contains(localClusterView, instanceId)) {
+ logger.debug("listAnnouncementsInSameCluster: instance is not in my view, ignoring: {}", instanceId);
+ // then the instance is not in my view, hence ignore its announcements
+ // (corresponds to earlier expiry-handling)
+ continue;
+ }
+ final Resource announcementsResource = aClusterInstanceResource
+ .getChild("announcements");
+ if (announcementsResource == null) {
+ logger.debug("listAnnouncementsInSameCluster: instance has no announcements: {}", instanceId);
+ continue;
+ }
+ logger.debug("listAnnouncementsInSameCluster: instance has announcements: {}", instanceId);
+ Iterator<Resource> it = announcementsResource.getChildren()
+ .iterator();
+ Announcement topologyAnnouncement;
+ while (it.hasNext()) {
+ Resource anAnnouncement = it.next();
+ topologyAnnouncement = Announcement
+ .fromJSON(anAnnouncement
+ .adaptTo(ValueMap.class).get(
+ "topologyAnnouncement",
+ String.class));
+ logger.debug("listAnnouncementsInSameCluster: found announcement: {}", topologyAnnouncement);
+ incomingAnnouncements.add(topologyAnnouncement);
+ // SLING-3389: no longer check for expired announcements -
+ // instead make use of the fact that this instance
+ // has a clusterView and that every live instance
+ // is responsible of cleaning up expired announcements
+ // with the repository
+ }
+ }
+ // since SLING-3389 this method does only read operations, hence
+ // no commit necessary anymore - close happens in below finally block
+ } catch (LoginException e) {
+ logger.error(
+ "listAnnouncementsInSameCluster: could not log in administratively: " + e, e);
+ throw new RuntimeException("Could not log in to repository (" + e
+ + ")", e);
+ } catch (PersistenceException e) {
+ logger.error("listAnnouncementsInSameCluster: got a PersistenceException: " + e, e);
+ throw new RuntimeException(
+ "Exception while talking to repository (" + e + ")", e);
+ } catch (JSONException e) {
+ logger.error("listAnnouncementsInSameCluster: got a JSONException: " + e, e);
+ throw new RuntimeException("Exception while converting json (" + e
+ + ")", e);
+ } finally {
+ if (resourceResolver != null) {
+ resourceResolver.close();
+ }
+ }
+ if (logger.isDebugEnabled()) {
+ logger.debug("listAnnouncementsInSameCluster: result: "+incomingAnnouncements.size());
+ }
+ return incomingAnnouncements;
+ }
+
+ private final Collection<Announcement> fillWithCachedAnnouncements(
+ final Collection<Announcement> incomingAnnouncements) {
+ for (Iterator<Entry<String, CachedAnnouncement>> it = ownAnnouncementsCache.entrySet().iterator(); it
+ .hasNext();) {
+ final Entry<String, CachedAnnouncement> entry = it.next();
+ if (entry.getValue().hasExpired()) {
+ // filter this one out then
+ continue;
+ }
+ incomingAnnouncements.add(entry.getValue().getAnnouncement());
+ }
+ return incomingAnnouncements;
+ }
+
+ private final boolean contains(final ClusterView clusterView, final String instanceId) {
+ for (Iterator<InstanceDescription> it = clusterView.getInstances().iterator(); it
+ .hasNext();) {
+ InstanceDescription instance = it.next();
+ if (instance.getSlingId().equals(instanceId)) {
+ // fine, then the instance is in the view
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public synchronized boolean hasActiveAnnouncement(final String ownerId) {
+ if (ownerId==null || ownerId.length()==0) {
+ throw new IllegalArgumentException("ownerId must not be null or empty: "+ownerId);
+ }
+ final CachedAnnouncement cachedAnnouncement = ownAnnouncementsCache.get(ownerId);
+ if (cachedAnnouncement==null) {
+ return false;
+ }
+
+ return !cachedAnnouncement.hasExpired();
+ }
+
+ public synchronized long registerAnnouncement(final Announcement topologyAnnouncement) {
+ if (topologyAnnouncement==null) {
+ throw new IllegalArgumentException("topologyAnnouncement must not be null");
+ }
+ if (!topologyAnnouncement.isValid()) {
+ logger.warn("topologyAnnouncement is not valid");
+ return -1;
+ }
+ if (resourceResolverFactory == null) {
+ logger.error("registerAnnouncement: resourceResolverFactory is null");
+ return -1;
+ }
+
+ final CachedAnnouncement cachedAnnouncement =
+ ownAnnouncementsCache.get(topologyAnnouncement.getOwnerId());
+ if (cachedAnnouncement!=null) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("registerAnnouncement: got existing cached announcement for ownerId="+topologyAnnouncement.getOwnerId());
+ }
+ try{
+ if (topologyAnnouncement.correspondsTo(cachedAnnouncement.getAnnouncement())) {
+ // then nothing has changed with this announcement, so just update
+ // the heartbeat and fine is.
+ // this should actually be the normal case for a stable connector
+ logger.debug("registerAnnouncement: nothing has changed, only updating heartbeat in-memory.");
+ return cachedAnnouncement.registerPing(topologyAnnouncement, config);
+ }
+ logger.debug("registerAnnouncement: incoming announcement differs from existing one!");
+
+ } catch(JSONException e) {
+ logger.error("registerAnnouncement: got JSONException while converting incoming announcement to JSON: "+e, e);
+ }
+ // otherwise the repository and the cache require to be updated
+ // resetting the cache therefore at this point already
+ ownAnnouncementsCache.remove(topologyAnnouncement.getOwnerId());
+ } else {
+ logger.debug("registerAnnouncement: no cached announcement yet for ownerId="+topologyAnnouncement.getOwnerId());
+ }
+
+ logger.debug("registerAnnouncement: getting the list of all local announcements");
+ final Collection<Announcement> announcements = new LinkedList<Announcement>();
+ fillWithCachedAnnouncements(announcements);
+ if (logger.isDebugEnabled()) {
+ logger.debug("registerAnnouncement: list returned: "+(announcements==null ? "null" : announcements.size()));
+ }
+ for (Iterator<Announcement> it1 = announcements.iterator(); it1
+ .hasNext();) {
+ Announcement announcement = it1.next();
+ if (announcement.getOwnerId().equals(
+ topologyAnnouncement.getOwnerId())) {
+ // then this is from the same owner - skip this
+ continue;
+ }
+ // analyse to see if any of the instances in the announcement
+ // include the new owner
+ Collection<InstanceDescription> attachedInstances = announcement
+ .listInstances();
+ for (Iterator<InstanceDescription> it2 = attachedInstances
+ .iterator(); it2.hasNext();) {
+ InstanceDescription instanceDescription = it2.next();
+ if (topologyAnnouncement.getOwnerId().equals(
+ instanceDescription.getSlingId())) {
+ logger.info("registerAnnouncement: already have this instance attached: "
+ + instanceDescription.getSlingId());
+ return -1;
+ }
+ }
+ }
+
+ ResourceResolver resourceResolver = null;
+ try {
+ resourceResolver = resourceResolverFactory
+ .getAdministrativeResourceResolver(null);
+
+ final Resource announcementsResource = ResourceHelper
+ .getOrCreateResource(
+ resourceResolver,
+ config.getClusterInstancesPath()
+ + "/"
+ + slingId
+ + "/announcements");
+
+ topologyAnnouncement.persistTo(announcementsResource);
+ resourceResolver.commit();
+ ownAnnouncementsCache.put(topologyAnnouncement.getOwnerId(),
+ new CachedAnnouncement(topologyAnnouncement, config));
+ } catch (LoginException e) {
+ logger.error(
+ "registerAnnouncement: could not log in administratively: "
+ + e, e);
+ throw new RuntimeException("Could not log in to repository (" + e
+ + ")", e);
+ } catch (PersistenceException e) {
+ logger.error("registerAnnouncement: got a PersistenceException: "
+ + e, e);
+ throw new RuntimeException(
+ "Exception while talking to repository (" + e + ")", e);
+ } catch (JSONException e) {
+ logger.error("registerAnnouncement: got a JSONException: " + e, e);
+ throw new RuntimeException("Exception while converting json (" + e
+ + ")", e);
+ } finally {
+ if (resourceResolver != null) {
+ resourceResolver.close();
+ }
+ }
+ return 0;
+ }
+
+ public synchronized void addAllExcept(final Announcement target, final ClusterView clusterView,
+ final AnnouncementFilter filter) {
+ ResourceResolver resourceResolver = null;
+ try {
+ resourceResolver = resourceResolverFactory
+ .getAdministrativeResourceResolver(null);
+
+ final Resource clusterInstancesResource = ResourceHelper
+ .getOrCreateResource(
+ resourceResolver,
+ config.getClusterInstancesPath());
+
+ final Iterator<Resource> it0 = clusterInstancesResource.getChildren()
+ .iterator();
+ Resource announcementsResource;
+ while (it0.hasNext()) {
+ final Resource aClusterInstanceResource = it0.next();
+ final String instanceId = aClusterInstanceResource.getName();
+ //TODO: add ClusterView.contains(instanceSlingId) for convenience to next api change
+ if (!contains(clusterView, instanceId)) {
+ // then the instance is not in my view, hence dont propagate
+ // its announcements
+ // (corresponds to earlier expiry-handling)
+ continue;
+ }
+ announcementsResource = aClusterInstanceResource
+ .getChild("announcements");
+ if (announcementsResource == null) {
+ continue;
+ }
+ Iterator<Resource> it = announcementsResource.getChildren()
+ .iterator();
+ while (it.hasNext()) {
+ Resource anAnnouncement = it.next();
+ if (logger.isDebugEnabled()) {
+ logger.debug("addAllExcept: anAnnouncement="
+ + anAnnouncement);
+ }
+ Announcement topologyAnnouncement;
+ topologyAnnouncement = Announcement.fromJSON(anAnnouncement
+ .adaptTo(ValueMap.class).get(
+ "topologyAnnouncement", String.class));
+ if (filter != null && !filter.accept(aClusterInstanceResource.getName(), topologyAnnouncement)) {
+ continue;
+ }
+ target.addIncomingTopologyAnnouncement(topologyAnnouncement);
+ }
+ }
+ // even before SLING-3389 this method only did read operations,
+ // hence no commit was ever necessary. The close happens in the finally block
+ } catch (LoginException e) {
+ logger.error(
+ "handleEvent: could not log in administratively: " + e, e);
+ throw new RuntimeException("Could not log in to repository (" + e
+ + ")", e);
+ } catch (PersistenceException e) {
+ logger.error("handleEvent: got a PersistenceException: " + e, e);
+ throw new RuntimeException(
+ "Exception while talking to repository (" + e + ")", e);
+ } catch (JSONException e) {
+ logger.error("handleEvent: got a JSONException: " + e, e);
+ throw new RuntimeException("Exception while converting json (" + e
+ + ")", e);
+ } finally {
+ if (resourceResolver != null) {
+ resourceResolver.close();
+ }
+ }
+ }
+
+ public synchronized void checkExpiredAnnouncements() {
+ for (Iterator<Entry<String, CachedAnnouncement>> it =
+ ownAnnouncementsCache.entrySet().iterator(); it.hasNext();) {
+ final Entry<String, CachedAnnouncement> entry = it.next();
+ if (entry.getValue().hasExpired()) {
+ // then we have an expiry
+ it.remove();
+
+ final String instanceId = entry.getKey();
+ logger.info("checkExpiredAnnouncements: topology connector of "+instanceId+
+ " (to me="+slingId+
+ ", inherited="+entry.getValue().getAnnouncement().isInherited()+") has expired.");
+ deleteAnnouncementsOf(instanceId);
+ }
+ }
+ //SLING-4139 : also make sure there are no stale announcements
+ // in the repository (from a crash or any other action).
+ // The ownAnnouncementsCache is the authorative set
+ // of announcements that are registered to this
+ // instance's registry - and the repository must not
+ // contain any additional announcements
+ ResourceResolver resourceResolver = null;
+ try {
+ resourceResolver = resourceResolverFactory
+ .getAdministrativeResourceResolver(null);
+ final Resource announcementsResource = ResourceHelper
+ .getOrCreateResource(
+ resourceResolver,
+ config.getClusterInstancesPath()
+ + "/"
+ + slingId
+ + "/announcements");
+ final Iterator<Resource> it = announcementsResource.getChildren().iterator();
+ while(it.hasNext()) {
+ final Resource res = it.next();
+ final String ownerId = res.getName();
+ // ownerId is the slingId of the owner of the announcement (ie of the peer of the connector).
+ // let's check if the we have that owner's announcement in the cache
+
+ if (ownAnnouncementsCache.containsKey(ownerId)) {
+ // fine then, we'll leave this announcement untouched
+ continue;
+ }
+ // otherwise this announcement is likely from an earlier incarnation
+ // of this instance - hence stale - hence we must remove it now
+ // (SLING-4139)
+ ResourceHelper.deleteResource(resourceResolver,
+ res.getPath());
+ }
+ resourceResolver.commit();
+ resourceResolver.close();
+ resourceResolver = null;
+ } catch (LoginException e) {
+ logger.error(
+ "checkExpiredAnnouncements: could not log in administratively when checking "
+ + "for expired announcements of slingId="+slingId+": " + e, e);
+ } catch (PersistenceException e) {
+ logger.error(
+ "checkExpiredAnnouncements: got PersistenceException when checking "
+ + "for expired announcements of slingId="+slingId+": " + e, e);
+ } finally {
+ if (resourceResolver!=null) {
+ resourceResolver.revert();
+ resourceResolver.close();
+ resourceResolver = null;
+ }
+ }
+ }
+
+ private final void deleteAnnouncementsOf(final String instanceId) {
+ ResourceResolver resourceResolver = null;
+ try {
+ resourceResolver = resourceResolverFactory
+ .getAdministrativeResourceResolver(null);
+ ResourceHelper.deleteResource(resourceResolver,
+ config.getClusterInstancesPath()
+ + "/"
+ + slingId
+ + "/announcements/"
+ + instanceId);
+ resourceResolver.commit();
+ resourceResolver.close();
+ resourceResolver = null;
+ } catch (LoginException e) {
+ logger.error(
+ "deleteAnnouncementsOf: could not log in administratively when deleting "
+ + "announcements of instanceId="+instanceId+": " + e, e);
+ } catch (PersistenceException e) {
+ logger.error(
+ "deleteAnnouncementsOf: got PersistenceException when deleting "
+ + "announcements of instanceId="+instanceId+": " + e, e);
+ } finally {
+ if (resourceResolver!=null) {
+ resourceResolver.revert();
+ resourceResolver.close();
+ resourceResolver = null;
+ }
+ }
+ }
+
+ public synchronized Collection<InstanceDescription> listInstances(final ClusterView localClusterView) {
+ logger.debug("listInstances: start. localClusterView: {}", localClusterView);
+ final Collection<InstanceDescription> instances = new LinkedList<InstanceDescription>();
+
+ final Collection<Announcement> announcements = listAnnouncementsInSameCluster(localClusterView);
+ if (announcements == null) {
+ logger.debug("listInstances: no announcement found. end. instances: {}", instances);
+ return instances;
+ }
+
+ for (Iterator<Announcement> it = announcements.iterator(); it.hasNext();) {
+ final Announcement announcement = it.next();
+ logger.debug("listInstances: adding announcement: {}", announcement);
+ instances.addAll(announcement.listInstances());
+ }
+ logger.debug("listInstances: announcements added. end. instances: {}", instances);
+ return instances;
+ }
+
+}
Propchange: sling/trunk/bundles/extensions/discovery/base/src/main/java/org/apache/sling/discovery/base/connectors/announcement/AnnouncementRegistryImpl.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: sling/trunk/bundles/extensions/discovery/base/src/main/java/org/apache/sling/discovery/base/connectors/announcement/CachedAnnouncement.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/discovery/base/src/main/java/org/apache/sling/discovery/base/connectors/announcement/CachedAnnouncement.java?rev=1709601&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/discovery/base/src/main/java/org/apache/sling/discovery/base/connectors/announcement/CachedAnnouncement.java (added)
+++ sling/trunk/bundles/extensions/discovery/base/src/main/java/org/apache/sling/discovery/base/connectors/announcement/CachedAnnouncement.java Tue Oct 20 14:12:31 2015
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.discovery.base.connectors.announcement;
+
+import org.apache.sling.discovery.base.connectors.BaseConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * With SLING-3389 the Announcement itself doesn't use the created
+ * (ie timeout) field anymore (it still has it currently for backwards
+ * compatibility on the wire-level) - hence that's why there's this
+ * small in-memory wrapper object which contains an Announcement and
+ * carries a lastHeartbeat property.
+ */
+public class CachedAnnouncement {
+
+ private final static Logger logger = LoggerFactory.getLogger(CachedAnnouncement.class);
+
+ private long lastPing = System.currentTimeMillis();
+
+ private final Announcement announcement;
+
+ private long firstPing = System.currentTimeMillis();
+
+ private long backoffIntervalSeconds = -1;
+
+ private final BaseConfig config;
+
+ CachedAnnouncement(final Announcement announcement, final BaseConfig config) {
+ this.announcement = announcement;
+ this.config = config;
+ }
+
+ private long getConfiguredConnectorTimeout() {
+ return config.getConnectorPingTimeout();
+ }
+
+ private long getConfiguredConnectorInterval() {
+ return config.getConnectorPingInterval();
+ }
+
+ public final boolean hasExpired() {
+ final long now = System.currentTimeMillis();
+ final long diff = now-lastPing;
+ if (diff<1000*getEffectiveHeartbeatTimeout()) {
+ return false;
+ } else {
+ return true;
+ }
+ }
+
+ public final long getLastPing() {
+ return lastPing;
+ }
+
+ /** Returns the second until the next heartbeat is expected, otherwise the timeout will hit **/
+ public final long getSecondsUntilTimeout() {
+ final long now = System.currentTimeMillis();
+ final long diff = now-lastPing;
+ final long left = 1000*getEffectiveHeartbeatTimeout() - diff;
+ return left/1000;
+ }
+
+
+ private final long getEffectiveHeartbeatTimeout() {
+ final long configuredGoodwill = getConfiguredConnectorTimeout() - getConfiguredConnectorInterval();
+ return Math.max(getConfiguredConnectorTimeout(), backoffIntervalSeconds + configuredGoodwill);
+ }
+
+ /** Registers a heartbeat event, and returns the new resulting backoff interval -
+ * or 0 if no backoff is applicable yet.
+ * @param incomingAnnouncement
+ * @return the new resulting backoff interval -
+ * or 0 if no backoff is applicable yet.
+ */
+ final long registerPing(Announcement incomingAnnouncement, BaseConfig config) {
+ lastPing = System.currentTimeMillis();
+ if (incomingAnnouncement.isInherited()) {
+ // then we are the client, we inherited this announcement from the server
+ // hence we have no power to do any backoff instructions towards the server
+ // (since the server decides about backoff-ing). hence returning 0 here
+ // but taking note of what the server instructed us in terms of backoff
+ backoffIntervalSeconds = incomingAnnouncement.getBackoffInterval();
+ logger.debug("registerPing: inherited announcement - hence returning 0");
+ return 0;
+ }
+ if (incomingAnnouncement.getResetBackoff()) {
+ // on resetBackoff we reset the firstHeartbeat and start
+ // from 0 again
+ firstPing = lastPing;
+ logger.debug("registerPing: got a resetBackoff - hence returning 0");
+ return 0;
+ }
+ final long stableSince = lastPing - firstPing;
+ final long numStableTimeouts = stableSince / (1000 * config.getConnectorPingTimeout());
+ final long backoffFactor = Math.min(numStableTimeouts, config.getBackoffStableFactor());
+ backoffIntervalSeconds = backoffFactor * config.getConnectorPingInterval();
+ return backoffIntervalSeconds;
+ }
+
+ public final Announcement getAnnouncement() {
+ return announcement;
+ }
+
+}
Propchange: sling/trunk/bundles/extensions/discovery/base/src/main/java/org/apache/sling/discovery/base/connectors/announcement/CachedAnnouncement.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: sling/trunk/bundles/extensions/discovery/base/src/main/java/org/apache/sling/discovery/base/connectors/announcement/package-info.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/discovery/base/src/main/java/org/apache/sling/discovery/base/connectors/announcement/package-info.java?rev=1709601&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/discovery/base/src/main/java/org/apache/sling/discovery/base/connectors/announcement/package-info.java (added)
+++ sling/trunk/bundles/extensions/discovery/base/src/main/java/org/apache/sling/discovery/base/connectors/announcement/package-info.java Tue Oct 20 14:12:31 2015
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Provides topology announcement implementations for discovery
+ * implementors that choose to extend from discovery.base
+ *
+ * @version 1.0.0
+ */
+@Version("1.0.0")
+package org.apache.sling.discovery.base.connectors.announcement;
+
+import aQute.bnd.annotation.Version;
+
Propchange: sling/trunk/bundles/extensions/discovery/base/src/main/java/org/apache/sling/discovery/base/connectors/announcement/package-info.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: sling/trunk/bundles/extensions/discovery/base/src/main/java/org/apache/sling/discovery/base/connectors/package-info.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/discovery/base/src/main/java/org/apache/sling/discovery/base/connectors/package-info.java?rev=1709601&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/discovery/base/src/main/java/org/apache/sling/discovery/base/connectors/package-info.java (added)
+++ sling/trunk/bundles/extensions/discovery/base/src/main/java/org/apache/sling/discovery/base/connectors/package-info.java Tue Oct 20 14:12:31 2015
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Provides topology connector related classes for discovery
+ * implementors that choose to extend from discovery.base
+ *
+ * @version 1.0.0
+ */
+@Version("1.0.0")
+package org.apache.sling.discovery.base.connectors;
+
+import aQute.bnd.annotation.Version;
+
Propchange: sling/trunk/bundles/extensions/discovery/base/src/main/java/org/apache/sling/discovery/base/connectors/package-info.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: sling/trunk/bundles/extensions/discovery/base/src/main/java/org/apache/sling/discovery/base/connectors/ping/ConnectorRegistry.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/discovery/base/src/main/java/org/apache/sling/discovery/base/connectors/ping/ConnectorRegistry.java?rev=1709601&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/discovery/base/src/main/java/org/apache/sling/discovery/base/connectors/ping/ConnectorRegistry.java (added)
+++ sling/trunk/bundles/extensions/discovery/base/src/main/java/org/apache/sling/discovery/base/connectors/ping/ConnectorRegistry.java Tue Oct 20 14:12:31 2015
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.discovery.base.connectors.ping;
+
+import java.net.URL;
+import java.util.Collection;
+
+import org.apache.sling.discovery.base.commons.ClusterViewService;
+
+/**
+ * Registry for topology connector clients
+ */
+public interface ConnectorRegistry {
+
+ /** Register an outgoing topology connector using the provided endpoint url **/
+ TopologyConnectorClientInformation registerOutgoingConnector(
+ ClusterViewService clusterViewService, URL topologyConnectorEndpoint);
+
+ /** Lists all outgoing topology connectors **/
+ Collection<TopologyConnectorClientInformation> listOutgoingConnectors();
+
+ /** ping all outgoing topology connectors **/
+ void pingOutgoingConnectors(boolean force);
+
+ /** Unregister an outgoing topology connector identified by the given (connector) id **/
+ boolean unregisterOutgoingConnector(String id);
+
+}
Propchange: sling/trunk/bundles/extensions/discovery/base/src/main/java/org/apache/sling/discovery/base/connectors/ping/ConnectorRegistry.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: sling/trunk/bundles/extensions/discovery/base/src/main/java/org/apache/sling/discovery/base/connectors/ping/ConnectorRegistryImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/discovery/base/src/main/java/org/apache/sling/discovery/base/connectors/ping/ConnectorRegistryImpl.java?rev=1709601&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/discovery/base/src/main/java/org/apache/sling/discovery/base/connectors/ping/ConnectorRegistryImpl.java (added)
+++ sling/trunk/bundles/extensions/discovery/base/src/main/java/org/apache/sling/discovery/base/connectors/ping/ConnectorRegistryImpl.java Tue Oct 20 14:12:31 2015
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.discovery.base.connectors.ping;
+
+import java.net.InetAddress;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.Service;
+import org.apache.sling.discovery.base.commons.ClusterViewService;
+import org.apache.sling.discovery.base.connectors.BaseConfig;
+import org.apache.sling.discovery.base.connectors.announcement.AnnouncementRegistry;
+import org.osgi.service.component.ComponentContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Default implementation of the ConnectorRegistry which
+ * keeps a list of outgoing connectors and is capable of
+ * pinging them.
+ */
+@Component
+@Service(value = ConnectorRegistry.class)
+public class ConnectorRegistryImpl implements ConnectorRegistry {
+
+ private final Logger logger = LoggerFactory.getLogger(this.getClass());
+
+ /** A map of id-> topology connector clients currently registered/activate **/
+ private final Map<String, TopologyConnectorClient> outgoingClientsMap = new HashMap<String, TopologyConnectorClient>();
+
+ @Reference
+ private AnnouncementRegistry announcementRegistry;
+
+ @Reference
+ private BaseConfig config;
+
+ /** the local port is added to the announcement as the serverInfo object **/
+ private String port = "";
+
+ public static ConnectorRegistry testConstructor(AnnouncementRegistry announcementRegistry,
+ BaseConfig config) {
+ ConnectorRegistryImpl registry = new ConnectorRegistryImpl();
+ registry.announcementRegistry = announcementRegistry;
+ registry.config = config;
+ // Note that port is not set - but that is only for information purpose
+ // and not useful for testing
+ return registry;
+ }
+
+ @Activate
+ protected void activate(final ComponentContext cc) {
+ port = cc.getBundleContext().getProperty("org.osgi.service.http.port");
+ }
+
+ @Deactivate
+ protected void deactivate() {
+ synchronized (outgoingClientsMap) {
+ for (Iterator<TopologyConnectorClient> it = outgoingClientsMap.values().iterator(); it.hasNext();) {
+ final TopologyConnectorClient client = it.next();
+ client.disconnect();
+ it.remove();
+ }
+ }
+ }
+
+ public TopologyConnectorClientInformation registerOutgoingConnector(
+ final ClusterViewService clusterViewService, final URL connectorUrl) {
+ if (announcementRegistry == null) {
+ logger.error("registerOutgoingConnection: announcementRegistry is null");
+ return null;
+ }
+ TopologyConnectorClient client;
+ synchronized (outgoingClientsMap) {
+ for (Iterator<Entry<String, TopologyConnectorClient>> it = outgoingClientsMap
+ .entrySet().iterator(); it.hasNext();) {
+ Entry<String, TopologyConnectorClient> entry = it.next();
+ if (entry.getValue().getConnectorUrl().toExternalForm().equals(connectorUrl.toExternalForm())) {
+ it.remove();
+ logger.info("registerOutgoingConnection: re-registering connector: "+connectorUrl);
+ }
+ }
+ String serverInfo;
+ try {
+ serverInfo = InetAddress.getLocalHost().getCanonicalHostName()
+ + ":" + port;
+ } catch (Exception e) {
+ serverInfo = "localhost:" + port;
+ }
+ client = new TopologyConnectorClient(clusterViewService,
+ announcementRegistry, config, connectorUrl,
+ serverInfo);
+ outgoingClientsMap.put(client.getId(), client);
+ }
+ client.ping(false);
+ return client;
+ }
+
+ public Collection<TopologyConnectorClientInformation> listOutgoingConnectors() {
+ final List<TopologyConnectorClientInformation> result = new ArrayList<TopologyConnectorClientInformation>();
+ synchronized (outgoingClientsMap) {
+ result.addAll(outgoingClientsMap.values());
+ }
+ return result;
+ }
+
+ public boolean unregisterOutgoingConnector(final String id) {
+ if (id == null || id.length() == 0) {
+ throw new IllegalArgumentException("id must not be null");
+ }
+ synchronized (outgoingClientsMap) {
+ TopologyConnectorClient client = outgoingClientsMap.remove(id);
+ if (client != null) {
+ client.disconnect();
+ }
+ return client != null;
+ }
+ }
+
+ public void pingOutgoingConnectors(boolean force) {
+ List<TopologyConnectorClient> outgoingTemplatesClone;
+ synchronized (outgoingClientsMap) {
+ outgoingTemplatesClone = new ArrayList<TopologyConnectorClient>(
+ outgoingClientsMap.values());
+ }
+ for (Iterator<TopologyConnectorClient> it = outgoingTemplatesClone
+ .iterator(); it.hasNext();) {
+ it.next().ping(force);
+ }
+ }
+
+}
Propchange: sling/trunk/bundles/extensions/discovery/base/src/main/java/org/apache/sling/discovery/base/connectors/ping/ConnectorRegistryImpl.java
------------------------------------------------------------------------------
svn:eol-style = native