You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by st...@apache.org on 2015/10/20 16:12:34 UTC

svn commit: r1709601 [2/11] - in /sling/trunk/bundles/extensions/discovery: base/ base/src/ base/src/main/ base/src/main/java/ base/src/main/java/org/ base/src/main/java/org/apache/ base/src/main/java/org/apache/sling/ base/src/main/java/org/apache/sli...

Added: sling/trunk/bundles/extensions/discovery/base/src/main/java/org/apache/sling/discovery/base/connectors/announcement/Announcement.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/discovery/base/src/main/java/org/apache/sling/discovery/base/connectors/announcement/Announcement.java?rev=1709601&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/discovery/base/src/main/java/org/apache/sling/discovery/base/connectors/announcement/Announcement.java (added)
+++ sling/trunk/bundles/extensions/discovery/base/src/main/java/org/apache/sling/discovery/base/connectors/announcement/Announcement.java Tue Oct 20 14:12:31 2015
@@ -0,0 +1,460 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.discovery.base.connectors.announcement;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.sling.api.resource.ModifiableValueMap;
+import org.apache.sling.api.resource.PersistenceException;
+import org.apache.sling.api.resource.Resource;
+import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.commons.json.JSONArray;
+import org.apache.sling.commons.json.JSONException;
+import org.apache.sling.commons.json.JSONObject;
+import org.apache.sling.discovery.ClusterView;
+import org.apache.sling.discovery.InstanceDescription;
+import org.apache.sling.discovery.commons.providers.DefaultClusterView;
+import org.apache.sling.discovery.commons.providers.DefaultInstanceDescription;
+import org.apache.sling.discovery.commons.providers.NonLocalInstanceDescription;
+
+/**
+ * An announcement is the information exchanged by the topology connector and
+ * contains all clusters and instances which both the topology connector client
+ * and servlet see (in their part before joining the two worlds).
+ * <p>
+ * An announcement is exchanged in json format and carries a timeout.
+ */
+public class Announcement {
+
+    /** the protocol version this announcement currently represents. Mismatching protocol versions are
+     * used to detect incompatible topology connectors
+     */
+    private final static int PROTOCOL_VERSION = 1;
+
+    /** the sling id of the owner of this announcement. the owner is where this announcement comes from **/
+    private final String ownerId;
+
+    /** announcement protocol version **/
+    private final int protocolVersion;
+
+    /** the local cluster view **/
+    private ClusterView localCluster;
+
+    /** the incoming instances **/
+    private List<Announcement> incomings = new LinkedList<Announcement>();
+
+    /** whether or not this annoucement was inherited (response of a connect) or incoming (the connect) **/
+    private boolean inherited = false;
+
+    /** some information about the server where this announcement came from **/
+    private String serverInfo;
+
+    /** whether or not this announcement represents a loop detected in the topology connectors **/
+    private boolean loop = false;
+
+    /** SLING-3382: Sets the backoffInterval which the connector servlets passes back to the client to use as the next heartbeatInterval **/
+    private long backoffInterval = -1;
+
+    /** SLING-3382: the resetBackoff flag is sent from client to server and indicates that the client wants to start from (backoff) scratch **/
+    private boolean resetBackoff = false;
+
+    public Announcement(final String ownerId) {
+        this(ownerId, PROTOCOL_VERSION);
+    }
+
+    public Announcement(final String ownerId, int protocolVersion) {
+        if (ownerId==null || ownerId.length()==0) {
+            throw new IllegalArgumentException("ownerId must not be null or empty");
+        }
+        this.ownerId = ownerId;
+        this.protocolVersion = protocolVersion;
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder incomingList = new StringBuilder();
+        for (Iterator<Announcement> it = incomings.iterator(); it.hasNext();) {
+            Announcement anIncomingAnnouncement = it.next();
+            if (incomingList.length()!=0) {
+                incomingList.append(", ");
+            }
+            incomingList.append(anIncomingAnnouncement);
+        }
+        return "Announcement[ownerId="+getOwnerId()+
+                ", protocolVersion="+protocolVersion+
+                ", inherited="+isInherited()+
+                ", loop="+loop+
+                ", incomings="+incomingList+"]";
+    }
+
+    /** check whether this is announcement contains the valid protocol version **/
+    public boolean isCorrectVersion() {
+        return (protocolVersion==PROTOCOL_VERSION);
+    }
+
+    /** check whether this is a valid announcement, containing the minimal information **/
+    public boolean isValid() {
+        if (ownerId==null || ownerId.length()==0) {
+            return false;
+        }
+        if (loop) {
+            return true;
+        }
+        if (!isCorrectVersion()) {
+            return false;
+        }
+        if (localCluster==null) {
+            return false;
+        }
+        try{
+            List<InstanceDescription> instances = localCluster.getInstances();
+            if (instances==null || instances.size()==0) {
+                return false;
+            }
+            boolean isOwnerMemberOfLocalCluster = false;
+            for (Iterator<InstanceDescription> it = instances.iterator(); it.hasNext();) {
+                InstanceDescription instanceDescription = it.next();
+                if (instanceDescription.getSlingId().equals(ownerId)) {
+                    isOwnerMemberOfLocalCluster = true;
+                }
+            }
+            if (!isOwnerMemberOfLocalCluster) {
+                return false;
+            }
+        } catch(Exception ise) {
+            return false;
+        }
+        return true;
+    }
+
+    /** set the inherited flag - if true this means this announcement is the response of a topology connect **/
+    public void setInherited(final boolean inherited) {
+        this.inherited = inherited;
+    }
+
+    /** Returns the inherited flag - if true this means that this announcement is the response of a topology connect **/
+    public boolean isInherited() {
+        return inherited;
+    }
+
+    /** Sets the loop falg - set true when this announcement should represent a loop detected in the topology connectors **/
+    public void setLoop(final boolean loop) {
+        this.loop = loop;
+    }
+    
+    /** Sets the backoffInterval which the connector servlets passes back to the client to use as the next heartbeatInterval **/
+    public void setBackoffInterval(long backoffInterval) {
+        this.backoffInterval = backoffInterval;
+    }
+    
+    /** Gets the backoffInterval which the connector servlets passes back to the client to use as the next heartbeatInterval **/
+    public long getBackoffInterval() {
+        return this.backoffInterval;
+    }
+    
+    /** sets the resetBackoff flag **/
+    public void setResetBackoff(boolean resetBackoff) {
+        this.resetBackoff = resetBackoff;
+    }
+    
+    /** gets the resetBackoff flag **/
+    public boolean getResetBackoff() {
+        return resetBackoff;
+    }
+
+    /** Returns the loop flag - set when this announcement represents a loop detected in the topology connectors **/
+    public boolean isLoop() {
+        return loop;
+    }
+
+    /** Returns the protocolVersion of this announcement **/
+    public int getProtocolVersion() {
+        return protocolVersion;
+    }
+
+    /** sets the information about the server where this announcement came from **/
+    public void setServerInfo(final String serverInfo) {
+        this.serverInfo = serverInfo;
+    }
+
+    /** the information about the server where this announcement came from **/
+    public String getServerInfo() {
+        return serverInfo;
+    }
+
+    /**
+     * Returns the slingid of the owner of this announcement.
+     * <p>
+     * The owner is the instance which initiated the topology connection
+     */
+    public String getOwnerId() {
+        return ownerId;
+    }
+
+    /** Convert this announcement into a json object **/
+    public JSONObject asJSONObject() throws JSONException {
+        return asJSONObject(false);
+    }
+    
+    /** Convert this announcement into a json object **/
+    private JSONObject asJSONObject(boolean filterTimes) throws JSONException {
+        JSONObject announcement = new JSONObject();
+        announcement.put("ownerId", ownerId);
+        announcement.put("protocolVersion", protocolVersion);
+        // SLING-3389: leaving the 'created' property in the announcement
+        // for backwards compatibility!
+        if (!filterTimes) {
+            announcement.put("created", System.currentTimeMillis());
+        }
+        announcement.put("inherited", inherited);
+        if (loop) {
+            announcement.put("loop", loop);
+        }
+        if (serverInfo != null) {
+            announcement.put("serverInfo", serverInfo);
+        }
+        if (localCluster!=null) {
+            announcement.put("localClusterView", asJSON(localCluster));
+        }
+        if (!filterTimes && backoffInterval>0) {
+            announcement.put("backoffInterval", backoffInterval);
+        }
+        if (resetBackoff) {
+            announcement.put("resetBackoff", resetBackoff);
+        }
+        JSONArray incomingAnnouncements = new JSONArray();
+        for (Iterator<Announcement> it = incomings.iterator(); it.hasNext();) {
+            Announcement incoming = it.next();
+            incomingAnnouncements.put(incoming.asJSONObject(filterTimes));
+        }
+        announcement.put("topologyAnnouncements", incomingAnnouncements);
+        return announcement;
+    }
+
+    /** Create an announcement form json **/
+    public static Announcement fromJSON(final String topologyAnnouncementJSON)
+            throws JSONException {
+        JSONObject announcement = new JSONObject(topologyAnnouncementJSON);
+        final String ownerId = announcement.getString("ownerId");
+        final int protocolVersion;
+        if (!announcement.has("protocolVersion")) {
+            protocolVersion = -1;
+        } else {
+            protocolVersion = announcement.getInt("protocolVersion");
+        }
+        final Announcement result = new Announcement(ownerId, protocolVersion);
+        if (announcement.has("backoffInterval")) {
+            long backoffInterval = announcement.getLong("backoffInterval");
+            result.backoffInterval = backoffInterval;
+        }
+        if (announcement.has("resetBackoff")) {
+            boolean resetBackoff = announcement.getBoolean("resetBackoff");
+            result.resetBackoff = resetBackoff;
+        }
+        if (announcement.has("loop") && announcement.getBoolean("loop")) {
+            result.setLoop(true);
+            return result;
+        }
+        final String localClusterViewJSON = announcement
+                .getString("localClusterView");
+        final ClusterView localClusterView = asClusterView(localClusterViewJSON);
+        final JSONArray subAnnouncements = announcement
+                .getJSONArray("topologyAnnouncements");
+
+        if (announcement.has("inherited")) {
+            final Boolean inherited = announcement.getBoolean("inherited");
+            result.inherited = inherited;
+        }
+        if (announcement.has("serverInfo")) {
+            String serverInfo = announcement.getString("serverInfo");
+            result.serverInfo = serverInfo;
+        }
+        result.setLocalCluster(localClusterView);
+        for (int i = 0; i < subAnnouncements.length(); i++) {
+            String subAnnouncementJSON = subAnnouncements.getString(i);
+            result.addIncomingTopologyAnnouncement(fromJSON(subAnnouncementJSON));
+        }
+        return result;
+    }
+
+    /** create a clusterview from json **/
+    private static ClusterView asClusterView(final String localClusterViewJSON)
+            throws JSONException {
+        JSONObject obj = new JSONObject(localClusterViewJSON);
+        DefaultClusterView clusterView = new DefaultClusterView(
+                obj.getString("id"));
+        JSONArray instancesObj = obj.getJSONArray("instances");
+
+        for (int i = 0; i < instancesObj.length(); i++) {
+            JSONObject anInstance = instancesObj.getJSONObject(i);
+            clusterView.addInstanceDescription(asInstance(anInstance));
+        }
+
+        return clusterView;
+    }
+
+    /** convert a clusterview into json **/
+    private static JSONObject asJSON(final ClusterView clusterView)
+            throws JSONException {
+        JSONObject obj = new JSONObject();
+        obj.put("id", clusterView.getId());
+        JSONArray instancesObj = new JSONArray();
+        List<InstanceDescription> instances = clusterView.getInstances();
+        for (Iterator<InstanceDescription> it = instances.iterator(); it
+                .hasNext();) {
+            InstanceDescription instanceDescription = it.next();
+            instancesObj.put(asJSON(instanceDescription));
+        }
+        obj.put("instances", instancesObj);
+        return obj;
+    }
+
+    /** create an instancedescription from json **/
+    private static DefaultInstanceDescription asInstance(
+            final JSONObject anInstance) throws JSONException {
+        final boolean isLeader = anInstance.getBoolean("isLeader");
+        final String slingId = anInstance.getString("slingId");
+
+        final JSONObject propertiesObj = anInstance.getJSONObject("properties");
+        Iterator<String> it = propertiesObj.keys();
+        Map<String, String> properties = new HashMap<String, String>();
+        while (it.hasNext()) {
+            String key = it.next();
+            properties.put(key, propertiesObj.getString(key));
+        }
+
+        NonLocalInstanceDescription instance = new NonLocalInstanceDescription(
+                null, isLeader, slingId, properties);
+        return instance;
+    }
+
+    /** convert an instance description into a json object **/
+    private static JSONObject asJSON(final InstanceDescription instanceDescription)
+            throws JSONException {
+        JSONObject obj = new JSONObject();
+        obj.put("slingId", instanceDescription.getSlingId());
+        obj.put("isLeader", instanceDescription.isLeader());
+        ClusterView cluster = instanceDescription.getClusterView();
+        if (cluster != null) {
+            obj.put("cluster", cluster.getId());
+        }
+        JSONObject propertiesObj = new JSONObject();
+        Map<String, String> propertiesMap = instanceDescription.getProperties();
+        for (Iterator<Entry<String, String>> it = propertiesMap.entrySet()
+                .iterator(); it.hasNext();) {
+            Entry<String, String> entry = it.next();
+            propertiesObj.put(entry.getKey(), entry.getValue());
+        }
+        obj.put("properties", propertiesObj);
+        return obj;
+    }
+
+    /** sets the local clusterview **/
+    public void setLocalCluster(ClusterView localCluster) {
+        this.localCluster = localCluster;
+    }
+
+    /** adds an incoming announcement to this announcement **/
+    public void addIncomingTopologyAnnouncement(
+            Announcement incomingTopologyAnnouncement) {
+        incomings.add(incomingTopologyAnnouncement);
+    }
+
+    /** Convert this announcement into json **/
+    public String asJSON() throws JSONException {
+        return asJSONObject().toString();
+    }
+
+    /** the key which is unique to this announcement **/
+    public String getPrimaryKey() {
+        return ownerId;
+    }
+
+    /** Returns the list of instances that are contained in this announcement **/
+    public Collection<InstanceDescription> listInstances() {
+        Collection<InstanceDescription> instances = new LinkedList<InstanceDescription>();
+        instances.addAll(localCluster.getInstances());
+
+        for (Iterator<Announcement> it = incomings.iterator(); it.hasNext();) {
+            Announcement incomingAnnouncement = it.next();
+            instances.addAll(incomingAnnouncement.listInstances());
+        }
+        return instances;
+    }
+
+    /**
+     * Persists this announcement using the given 'announcements' resource,
+     * under which a node with the primary key is created
+     **/
+    public void persistTo(Resource announcementsResource)
+            throws PersistenceException, JSONException {
+        Resource announcementChildResource = announcementsResource.getChild(getPrimaryKey());
+        
+        // SLING-2967 used to introduce 'resetting the created time' here
+        // in order to become machine-clock independent.
+        // With introduction of SLING-3389, where we dont store any
+        // announcement-heartbeat-dates anymore at all, this resetting here
+        // became unnecessary.
+        
+        final String announcementJson = asJSON();
+		if (announcementChildResource==null) {
+            final ResourceResolver resourceResolver = announcementsResource.getResourceResolver();
+            Map<String, Object> properties = new HashMap<String, Object>();
+            properties.put("topologyAnnouncement", announcementJson);
+            resourceResolver.create(announcementsResource, getPrimaryKey(), properties);
+        } else {
+            final ModifiableValueMap announcementChildMap = announcementChildResource.adaptTo(ModifiableValueMap.class);
+            announcementChildMap.put("topologyAnnouncement", announcementJson);
+        }
+    }
+
+	/**
+     * Remove all announcements that match the given owner Id
+     */
+    public void removeInherited(final String ownerId) {
+        for (Iterator<Announcement> it = incomings.iterator(); it.hasNext();) {
+            Announcement anIncomingAnnouncement = it.next();
+            if (anIncomingAnnouncement.isInherited()
+                    && anIncomingAnnouncement.getOwnerId().equals(ownerId)) {
+                // then filter this
+                it.remove();
+            }
+
+        }
+    }
+
+    /**
+     * Compare this Announcement with another one, ignoring the 'created'
+     * property - which gets added to the JSON object automatically due
+     * to SLING-3389 wire-backwards-compatibility - and backoffInterval
+     * introduced as part of SLING-3382
+     */
+    public boolean correspondsTo(Announcement announcement) throws JSONException {
+        final JSONObject myJson = asJSONObject(true);
+        final JSONObject otherJson = announcement.asJSONObject(true);
+        return myJson.toString().equals(otherJson.toString());
+    }
+
+}

Propchange: sling/trunk/bundles/extensions/discovery/base/src/main/java/org/apache/sling/discovery/base/connectors/announcement/Announcement.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: sling/trunk/bundles/extensions/discovery/base/src/main/java/org/apache/sling/discovery/base/connectors/announcement/AnnouncementFilter.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/discovery/base/src/main/java/org/apache/sling/discovery/base/connectors/announcement/AnnouncementFilter.java?rev=1709601&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/discovery/base/src/main/java/org/apache/sling/discovery/base/connectors/announcement/AnnouncementFilter.java (added)
+++ sling/trunk/bundles/extensions/discovery/base/src/main/java/org/apache/sling/discovery/base/connectors/announcement/AnnouncementFilter.java Tue Oct 20 14:12:31 2015
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.discovery.base.connectors.announcement;
+
+/**
+ * Filter used during announcement processing internally 
+ **/
+public interface AnnouncementFilter {
+
+    /**
+     * Check if the provided announcement, which was received by the provided
+     * slingId can be accepted or not.
+     **/
+    boolean accept(String receivingSlingId, Announcement announcement);
+
+}

Propchange: sling/trunk/bundles/extensions/discovery/base/src/main/java/org/apache/sling/discovery/base/connectors/announcement/AnnouncementFilter.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: sling/trunk/bundles/extensions/discovery/base/src/main/java/org/apache/sling/discovery/base/connectors/announcement/AnnouncementRegistry.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/discovery/base/src/main/java/org/apache/sling/discovery/base/connectors/announcement/AnnouncementRegistry.java?rev=1709601&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/discovery/base/src/main/java/org/apache/sling/discovery/base/connectors/announcement/AnnouncementRegistry.java (added)
+++ sling/trunk/bundles/extensions/discovery/base/src/main/java/org/apache/sling/discovery/base/connectors/announcement/AnnouncementRegistry.java Tue Oct 20 14:12:31 2015
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.discovery.base.connectors.announcement;
+
+import java.util.Collection;
+
+import org.apache.sling.discovery.ClusterView;
+import org.apache.sling.discovery.InstanceDescription;
+
+/**
+ * The announcement registry keeps track of all the announcement that this
+ * instance either received by a joined topology connector or that a topology
+ * connector inherited from the counterpart (the topology connector servlet)
+ */
+public interface AnnouncementRegistry {
+
+    /** 
+     * Register the given announcement - and returns the backoff interval (in seconds)
+     * for stable connectors
+     * - or -1 if the registration was not successful (likely indicating a loop) 
+     * @return the backoff interval (in seconds) for stable connectors
+     * - or -1 if the registration was not successful (likely indicating a loop) 
+     */
+    long registerAnnouncement(Announcement topologyAnnouncement);
+    
+    /** list all announcements that were received by instances in the local cluster **/
+    Collection<Announcement> listAnnouncementsInSameCluster(ClusterView localClusterView);
+    
+    /** list all announcements that were received (incoming or inherited) by this instance **/
+    Collection<Announcement> listLocalAnnouncements();
+    
+    /** list all announcements that this instance received (incoming) **/ 
+    Collection<CachedAnnouncement> listLocalIncomingAnnouncements();
+    
+    /** Check for expired announcements and remove any if applicable **/
+    void checkExpiredAnnouncements();
+
+    /** Returns the list of instances contained in all non-expired announcements of this registry **/
+    Collection<InstanceDescription> listInstances(ClusterView localClusterView);
+
+    /** Add all registered announcements to the given target announcement that are accepted by the given filter **/
+    void addAllExcept(Announcement target, ClusterView localClusterView, AnnouncementFilter filter);
+
+    /** Unregister the announcement owned by the given slingId **/
+    void unregisterAnnouncement(String ownerId);
+
+    /** Whether or not the given owner has an active (ie not expired) announcement registered **/
+    boolean hasActiveAnnouncement(String ownerId);
+
+}

Propchange: sling/trunk/bundles/extensions/discovery/base/src/main/java/org/apache/sling/discovery/base/connectors/announcement/AnnouncementRegistry.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: sling/trunk/bundles/extensions/discovery/base/src/main/java/org/apache/sling/discovery/base/connectors/announcement/AnnouncementRegistryImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/discovery/base/src/main/java/org/apache/sling/discovery/base/connectors/announcement/AnnouncementRegistryImpl.java?rev=1709601&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/discovery/base/src/main/java/org/apache/sling/discovery/base/connectors/announcement/AnnouncementRegistryImpl.java (added)
+++ sling/trunk/bundles/extensions/discovery/base/src/main/java/org/apache/sling/discovery/base/connectors/announcement/AnnouncementRegistryImpl.java Tue Oct 20 14:12:31 2015
@@ -0,0 +1,593 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.discovery.base.connectors.announcement;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.Service;
+import org.apache.sling.api.resource.LoginException;
+import org.apache.sling.api.resource.PersistenceException;
+import org.apache.sling.api.resource.Resource;
+import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.api.resource.ResourceResolverFactory;
+import org.apache.sling.api.resource.ValueMap;
+import org.apache.sling.commons.json.JSONException;
+import org.apache.sling.discovery.ClusterView;
+import org.apache.sling.discovery.InstanceDescription;
+import org.apache.sling.discovery.base.connectors.BaseConfig;
+import org.apache.sling.discovery.commons.providers.util.ResourceHelper;
+import org.apache.sling.settings.SlingSettingsService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Default implementation of the AnnouncementRegistry which
+ * handles JSON-backed announcements and does so by storing
+ * them in a local like /var/discovery/impl/clusterNodes/$slingId/announcement.
+ */
+@Component
+@Service(value = AnnouncementRegistry.class)
+public class AnnouncementRegistryImpl implements AnnouncementRegistry {
+
+    private final Logger logger = LoggerFactory.getLogger(this.getClass());
+
+    @Reference
+    private ResourceResolverFactory resourceResolverFactory;
+
+    @Reference
+    private SlingSettingsService settingsService;
+
+    private String slingId;
+    
+    @Reference
+    private BaseConfig config;
+    
+    public static AnnouncementRegistryImpl testConstructorAndActivate(ResourceResolverFactory resourceResolverFactory,
+            SlingSettingsService slingSettingsService, BaseConfig config) {
+        AnnouncementRegistryImpl registry = testConstructor(resourceResolverFactory, slingSettingsService, config);
+        registry.activate();
+        return registry;
+    }
+    
+    public static AnnouncementRegistryImpl testConstructor(ResourceResolverFactory resourceResolverFactory,
+            SlingSettingsService slingSettingsService, BaseConfig config) {
+        AnnouncementRegistryImpl registry = new AnnouncementRegistryImpl();
+        registry.resourceResolverFactory = resourceResolverFactory;
+        registry.settingsService = slingSettingsService;
+        registry.config = config;
+        return registry;
+    }
+    
+    @Activate
+    protected void activate() {
+        slingId = settingsService.getSlingId();
+    }
+    
+    private final Map<String,CachedAnnouncement> ownAnnouncementsCache = 
+            new HashMap<String,CachedAnnouncement>();
+
+    public synchronized void unregisterAnnouncement(final String ownerId) {
+        if (ownerId==null || ownerId.length()==0) {
+            throw new IllegalArgumentException("ownerId must not be null or empty");
+        }
+        // remove from the cache - even if there's an error afterwards
+        ownAnnouncementsCache.remove(ownerId);
+        
+        if (resourceResolverFactory == null) {
+            logger.error("unregisterAnnouncement: resourceResolverFactory is null");
+            return;
+        }
+        ResourceResolver resourceResolver = null;
+        try {
+            resourceResolver = resourceResolverFactory
+                    .getAdministrativeResourceResolver(null);
+
+            final String path = config.getClusterInstancesPath()
+                    + "/"
+                    + slingId
+                    + "/announcements/" + ownerId;
+            final Resource announcementsResource = resourceResolver.getResource(path);
+            if (announcementsResource!=null) {
+                resourceResolver.delete(announcementsResource);
+                resourceResolver.commit();
+            }
+
+        } catch (LoginException e) {
+            logger.error(
+                    "unregisterAnnouncement: could not log in administratively: "
+                            + e, e);
+            throw new RuntimeException("Could not log in to repository (" + e
+                    + ")", e);
+        } catch (PersistenceException e) {
+            logger.error("unregisterAnnouncement: got a PersistenceException: "
+                    + e, e);
+            throw new RuntimeException(
+                    "Exception while talking to repository (" + e + ")", e);
+        } finally {
+            if (resourceResolver != null) {
+                resourceResolver.close();
+            }
+        }
+    }
+
+    public synchronized Collection<Announcement> listLocalAnnouncements() {
+        return fillWithCachedAnnouncements(new LinkedList<Announcement>());
+    }
+    
+    public synchronized Collection<CachedAnnouncement> listLocalIncomingAnnouncements() {
+        Collection<CachedAnnouncement> result = new LinkedList<CachedAnnouncement>(ownAnnouncementsCache.values());
+        for (Iterator<CachedAnnouncement> it = result.iterator(); it.hasNext();) {
+            CachedAnnouncement cachedAnnouncement = it.next();
+            if (cachedAnnouncement.getAnnouncement().isInherited()) {
+                it.remove();
+                continue;
+            }
+            if (cachedAnnouncement.hasExpired()) {
+                it.remove();
+                continue;
+            }
+        }
+        return result;
+    }
+    
+    private final InstanceDescription getLocalInstanceDescription(final ClusterView localClusterView) {
+        for (Iterator<InstanceDescription> it = localClusterView.getInstances().iterator(); it
+                .hasNext();) {
+            InstanceDescription id = it.next();
+            if (id.isLocal()) {
+                return id;
+            }
+        }
+        return null;
+    }
+
+    public synchronized Collection<Announcement> listAnnouncementsInSameCluster(final ClusterView localClusterView) {
+        logger.debug("listAnnouncementsInSameCluster: start. localClusterView: {}", localClusterView);
+        if (localClusterView==null) {
+            throw new IllegalArgumentException("clusterView must not be null");
+        }
+        ResourceResolver resourceResolver = null;
+        final Collection<Announcement> incomingAnnouncements = new LinkedList<Announcement>();
+        final InstanceDescription localInstance = getLocalInstanceDescription(localClusterView);
+        try {
+            resourceResolver = resourceResolverFactory
+                    .getAdministrativeResourceResolver(null);
+
+            Resource clusterInstancesResource = ResourceHelper
+                    .getOrCreateResource(
+                            resourceResolver,
+                            config.getClusterInstancesPath());
+
+            Iterator<Resource> it0 = clusterInstancesResource.getChildren()
+                    .iterator();
+            while (it0.hasNext()) {
+                Resource aClusterInstanceResource = it0.next();
+                final String instanceId = aClusterInstanceResource.getName();
+                logger.debug("listAnnouncementsInSameCluster: handling clusterInstance: {}", instanceId);
+                if (localInstance!=null && localInstance.getSlingId().equals(instanceId)) {
+                    // this is the local instance then - which we serve from the cache only
+                    logger.debug("listAnnouncementsInSameCluster: matched localInstance, filling with cache: {}", instanceId);
+                    fillWithCachedAnnouncements(incomingAnnouncements);
+                    continue;
+                }
+                
+                //TODO: add ClusterView.contains(instanceSlingId) for convenience to next api change
+                if (!contains(localClusterView, instanceId)) {
+                    logger.debug("listAnnouncementsInSameCluster: instance is not in my view, ignoring: {}", instanceId);
+                    // then the instance is not in my view, hence ignore its announcements
+                    // (corresponds to earlier expiry-handling)
+                    continue;
+                }
+                final Resource announcementsResource = aClusterInstanceResource
+                        .getChild("announcements");
+                if (announcementsResource == null) {
+                    logger.debug("listAnnouncementsInSameCluster: instance has no announcements: {}", instanceId);
+                    continue;
+                }
+                logger.debug("listAnnouncementsInSameCluster: instance has announcements: {}", instanceId);
+                Iterator<Resource> it = announcementsResource.getChildren()
+                        .iterator();
+                Announcement topologyAnnouncement;
+                while (it.hasNext()) {
+                    Resource anAnnouncement = it.next();
+                    topologyAnnouncement = Announcement
+                            .fromJSON(anAnnouncement
+                                    .adaptTo(ValueMap.class).get(
+                                            "topologyAnnouncement",
+                                            String.class));
+                    logger.debug("listAnnouncementsInSameCluster: found announcement: {}", topologyAnnouncement);
+                    incomingAnnouncements.add(topologyAnnouncement);
+                    // SLING-3389: no longer check for expired announcements - 
+                    // instead make use of the fact that this instance
+                    // has a clusterView and that every live instance
+                    // is responsible of cleaning up expired announcements
+                    // with the repository
+                }
+            }
+            // since SLING-3389 this method does only read operations, hence
+            // no commit necessary anymore - close happens in below finally block
+        } catch (LoginException e) {
+            logger.error(
+                    "listAnnouncementsInSameCluster: could not log in administratively: " + e, e);
+            throw new RuntimeException("Could not log in to repository (" + e
+                    + ")", e);
+        } catch (PersistenceException e) {
+            logger.error("listAnnouncementsInSameCluster: got a PersistenceException: " + e, e);
+            throw new RuntimeException(
+                    "Exception while talking to repository (" + e + ")", e);
+        } catch (JSONException e) {
+            logger.error("listAnnouncementsInSameCluster: got a JSONException: " + e, e);
+            throw new RuntimeException("Exception while converting json (" + e
+                    + ")", e);
+        } finally {
+            if (resourceResolver != null) {
+                resourceResolver.close();
+            }
+        }
+    	if (logger.isDebugEnabled()) {
+    		logger.debug("listAnnouncementsInSameCluster: result: "+incomingAnnouncements.size());
+    	}
+        return incomingAnnouncements;
+    }
+    
+    private final Collection<Announcement> fillWithCachedAnnouncements(
+            final Collection<Announcement> incomingAnnouncements) {
+        for (Iterator<Entry<String, CachedAnnouncement>> it = ownAnnouncementsCache.entrySet().iterator(); it
+                .hasNext();) {
+            final Entry<String, CachedAnnouncement> entry = it.next();
+            if (entry.getValue().hasExpired()) {
+                // filter this one out then
+                continue;
+            }
+            incomingAnnouncements.add(entry.getValue().getAnnouncement());
+        }
+        return incomingAnnouncements;
+    }
+
+    private final boolean contains(final ClusterView clusterView, final String instanceId) {
+        for (Iterator<InstanceDescription> it = clusterView.getInstances().iterator(); it
+                .hasNext();) {
+            InstanceDescription instance = it.next();
+            if (instance.getSlingId().equals(instanceId)) {
+                // fine, then the instance is in the view
+                return true;
+            }
+        }
+        return false;
+    }
+
+    public synchronized boolean hasActiveAnnouncement(final String ownerId) {
+        if (ownerId==null || ownerId.length()==0) {
+            throw new IllegalArgumentException("ownerId must not be null or empty: "+ownerId);
+        }
+        final CachedAnnouncement cachedAnnouncement = ownAnnouncementsCache.get(ownerId);
+        if (cachedAnnouncement==null) {
+            return false;
+        }
+        
+        return !cachedAnnouncement.hasExpired();
+    }
+
+    public synchronized long registerAnnouncement(final Announcement topologyAnnouncement) {
+        if (topologyAnnouncement==null) {
+            throw new IllegalArgumentException("topologyAnnouncement must not be null");
+        }
+        if (!topologyAnnouncement.isValid()) {
+            logger.warn("topologyAnnouncement is not valid");
+            return -1;
+        }
+        if (resourceResolverFactory == null) {
+            logger.error("registerAnnouncement: resourceResolverFactory is null");
+            return -1;
+        }
+        
+        final CachedAnnouncement cachedAnnouncement = 
+                ownAnnouncementsCache.get(topologyAnnouncement.getOwnerId());
+        if (cachedAnnouncement!=null) {
+            if (logger.isDebugEnabled()) {
+                logger.debug("registerAnnouncement: got existing cached announcement for ownerId="+topologyAnnouncement.getOwnerId());
+            }
+            try{
+                if (topologyAnnouncement.correspondsTo(cachedAnnouncement.getAnnouncement())) {
+                    // then nothing has changed with this announcement, so just update
+                    // the heartbeat and fine is.
+                    // this should actually be the normal case for a stable connector
+                    logger.debug("registerAnnouncement: nothing has changed, only updating heartbeat in-memory.");
+                    return cachedAnnouncement.registerPing(topologyAnnouncement, config);
+                }
+                logger.debug("registerAnnouncement: incoming announcement differs from existing one!");
+                
+            } catch(JSONException e) {
+                logger.error("registerAnnouncement: got JSONException while converting incoming announcement to JSON: "+e, e);
+            }
+            // otherwise the repository and the cache require to be updated
+            // resetting the cache therefore at this point already
+            ownAnnouncementsCache.remove(topologyAnnouncement.getOwnerId());
+        } else {
+            logger.debug("registerAnnouncement: no cached announcement yet for ownerId="+topologyAnnouncement.getOwnerId());
+        }
+
+        logger.debug("registerAnnouncement: getting the list of all local announcements");
+        final Collection<Announcement> announcements = new LinkedList<Announcement>();
+        fillWithCachedAnnouncements(announcements);
+        if (logger.isDebugEnabled()) {
+            logger.debug("registerAnnouncement: list returned: "+(announcements==null ? "null" : announcements.size()));
+        }
+        for (Iterator<Announcement> it1 = announcements.iterator(); it1
+                .hasNext();) {
+            Announcement announcement = it1.next();
+            if (announcement.getOwnerId().equals(
+                    topologyAnnouncement.getOwnerId())) {
+                // then this is from the same owner - skip this
+                continue;
+            }
+            // analyse to see if any of the instances in the announcement
+            // include the new owner
+            Collection<InstanceDescription> attachedInstances = announcement
+                    .listInstances();
+            for (Iterator<InstanceDescription> it2 = attachedInstances
+                    .iterator(); it2.hasNext();) {
+                InstanceDescription instanceDescription = it2.next();
+                if (topologyAnnouncement.getOwnerId().equals(
+                        instanceDescription.getSlingId())) {
+                    logger.info("registerAnnouncement: already have this instance attached: "
+                            + instanceDescription.getSlingId());
+                    return -1;
+                }
+            }
+        }
+
+        ResourceResolver resourceResolver = null;
+        try {
+            resourceResolver = resourceResolverFactory
+                    .getAdministrativeResourceResolver(null);
+
+            final Resource announcementsResource = ResourceHelper
+                    .getOrCreateResource(
+                            resourceResolver,
+                            config.getClusterInstancesPath()
+                                    + "/"
+                                    + slingId
+                                    + "/announcements");
+
+            topologyAnnouncement.persistTo(announcementsResource);
+            resourceResolver.commit();
+            ownAnnouncementsCache.put(topologyAnnouncement.getOwnerId(), 
+                    new CachedAnnouncement(topologyAnnouncement, config));
+        } catch (LoginException e) {
+            logger.error(
+                    "registerAnnouncement: could not log in administratively: "
+                            + e, e);
+            throw new RuntimeException("Could not log in to repository (" + e
+                    + ")", e);
+        } catch (PersistenceException e) {
+            logger.error("registerAnnouncement: got a PersistenceException: "
+                    + e, e);
+            throw new RuntimeException(
+                    "Exception while talking to repository (" + e + ")", e);
+        } catch (JSONException e) {
+            logger.error("registerAnnouncement: got a JSONException: " + e, e);
+            throw new RuntimeException("Exception while converting json (" + e
+                    + ")", e);
+        } finally {
+            if (resourceResolver != null) {
+                resourceResolver.close();
+            }
+        }
+        return 0;
+    }
+
+    public synchronized void addAllExcept(final Announcement target, final ClusterView clusterView, 
+            final AnnouncementFilter filter) {
+        ResourceResolver resourceResolver = null;
+        try {
+            resourceResolver = resourceResolverFactory
+                    .getAdministrativeResourceResolver(null);
+
+            final Resource clusterInstancesResource = ResourceHelper
+                    .getOrCreateResource(
+                            resourceResolver,
+                            config.getClusterInstancesPath());
+
+            final Iterator<Resource> it0 = clusterInstancesResource.getChildren()
+                    .iterator();
+            Resource announcementsResource;
+            while (it0.hasNext()) {
+                final Resource aClusterInstanceResource = it0.next();
+                final String instanceId = aClusterInstanceResource.getName();
+                //TODO: add ClusterView.contains(instanceSlingId) for convenience to next api change
+                if (!contains(clusterView, instanceId)) {
+                    // then the instance is not in my view, hence dont propagate
+                    // its announcements
+                    // (corresponds to earlier expiry-handling)
+                    continue;
+                }
+                announcementsResource = aClusterInstanceResource
+                        .getChild("announcements");
+                if (announcementsResource == null) {
+                    continue;
+                }
+                Iterator<Resource> it = announcementsResource.getChildren()
+                        .iterator();
+                while (it.hasNext()) {
+                    Resource anAnnouncement = it.next();
+                	if (logger.isDebugEnabled()) {
+	                    logger.debug("addAllExcept: anAnnouncement="
+	                            + anAnnouncement);
+                	}
+                    Announcement topologyAnnouncement;
+                    topologyAnnouncement = Announcement.fromJSON(anAnnouncement
+                            .adaptTo(ValueMap.class).get(
+                                    "topologyAnnouncement", String.class));
+                    if (filter != null && !filter.accept(aClusterInstanceResource.getName(), topologyAnnouncement)) {
+                        continue;
+                    }
+                    target.addIncomingTopologyAnnouncement(topologyAnnouncement);
+                }
+            }
+            // even before SLING-3389 this method only did read operations,
+            // hence no commit was ever necessary. The close happens in the finally block
+        } catch (LoginException e) {
+            logger.error(
+                    "handleEvent: could not log in administratively: " + e, e);
+            throw new RuntimeException("Could not log in to repository (" + e
+                    + ")", e);
+        } catch (PersistenceException e) {
+            logger.error("handleEvent: got a PersistenceException: " + e, e);
+            throw new RuntimeException(
+                    "Exception while talking to repository (" + e + ")", e);
+        } catch (JSONException e) {
+            logger.error("handleEvent: got a JSONException: " + e, e);
+            throw new RuntimeException("Exception while converting json (" + e
+                    + ")", e);
+        } finally {
+            if (resourceResolver != null) {
+                resourceResolver.close();
+            }
+        }
+    }
+
+    public synchronized void checkExpiredAnnouncements() {
+        for (Iterator<Entry<String, CachedAnnouncement>> it = 
+                ownAnnouncementsCache.entrySet().iterator(); it.hasNext();) {
+            final Entry<String, CachedAnnouncement> entry = it.next();
+            if (entry.getValue().hasExpired()) {
+                // then we have an expiry
+                it.remove();
+                
+                final String instanceId = entry.getKey();
+                logger.info("checkExpiredAnnouncements: topology connector of "+instanceId+
+                        " (to me="+slingId+
+                        ", inherited="+entry.getValue().getAnnouncement().isInherited()+") has expired.");
+                deleteAnnouncementsOf(instanceId);
+            }
+        }
+        //SLING-4139 : also make sure there are no stale announcements
+        //             in the repository (from a crash or any other action).
+        //             The ownAnnouncementsCache is the authorative set
+        //             of announcements that are registered to this
+        //             instance's registry - and the repository must not
+        //             contain any additional announcements
+        ResourceResolver resourceResolver = null;
+        try {
+            resourceResolver = resourceResolverFactory
+                    .getAdministrativeResourceResolver(null);
+            final Resource announcementsResource = ResourceHelper
+                    .getOrCreateResource(
+                            resourceResolver,
+                            config.getClusterInstancesPath()
+                                    + "/"
+                                    + slingId
+                                    + "/announcements");
+            final Iterator<Resource> it = announcementsResource.getChildren().iterator();
+            while(it.hasNext()) {
+            	final Resource res = it.next();
+            	final String ownerId = res.getName();
+            	// ownerId is the slingId of the owner of the announcement (ie of the peer of the connector).
+            	// let's check if the we have that owner's announcement in the cache
+            	
+            	if (ownAnnouncementsCache.containsKey(ownerId)) {
+            		// fine then, we'll leave this announcement untouched
+            		continue;
+            	}
+            	// otherwise this announcement is likely from an earlier incarnation
+            	// of this instance - hence stale - hence we must remove it now
+            	//  (SLING-4139)
+            	ResourceHelper.deleteResource(resourceResolver, 
+            			res.getPath());
+            }
+            resourceResolver.commit();
+            resourceResolver.close();
+            resourceResolver = null;
+        } catch (LoginException e) {
+            logger.error(
+                    "checkExpiredAnnouncements: could not log in administratively when checking "
+                    + "for expired announcements of slingId="+slingId+": " + e, e);
+        } catch (PersistenceException e) {
+            logger.error(
+                    "checkExpiredAnnouncements: got PersistenceException when checking "
+                    + "for expired announcements of slingId="+slingId+": " + e, e);
+        } finally {
+            if (resourceResolver!=null) {
+                resourceResolver.revert();
+                resourceResolver.close();
+                resourceResolver = null;
+            }
+        }
+    }
+
+    private final void deleteAnnouncementsOf(final String instanceId) {
+        ResourceResolver resourceResolver = null;
+        try {
+            resourceResolver = resourceResolverFactory
+                    .getAdministrativeResourceResolver(null);
+            ResourceHelper.deleteResource(resourceResolver, 
+                    config.getClusterInstancesPath()
+                                + "/"
+                                + slingId
+                                + "/announcements/"
+                                + instanceId);
+            resourceResolver.commit();
+            resourceResolver.close();
+            resourceResolver = null;
+        } catch (LoginException e) {
+            logger.error(
+                    "deleteAnnouncementsOf: could not log in administratively when deleting "
+                    + "announcements of instanceId="+instanceId+": " + e, e);
+        } catch (PersistenceException e) {
+            logger.error(
+                    "deleteAnnouncementsOf: got PersistenceException when deleting "
+                    + "announcements of instanceId="+instanceId+": " + e, e);
+        } finally {
+            if (resourceResolver!=null) {
+                resourceResolver.revert();
+                resourceResolver.close();
+                resourceResolver = null;
+            }
+        }
+    }
+
+    public synchronized Collection<InstanceDescription> listInstances(final ClusterView localClusterView) {
+        logger.debug("listInstances: start. localClusterView: {}", localClusterView);
+        final Collection<InstanceDescription> instances = new LinkedList<InstanceDescription>();
+
+        final Collection<Announcement> announcements = listAnnouncementsInSameCluster(localClusterView);
+        if (announcements == null) {
+            logger.debug("listInstances: no announcement found. end. instances: {}", instances);
+            return instances;
+        }
+
+        for (Iterator<Announcement> it = announcements.iterator(); it.hasNext();) {
+            final Announcement announcement = it.next();
+            logger.debug("listInstances: adding announcement: {}", announcement);
+            instances.addAll(announcement.listInstances());
+        }
+        logger.debug("listInstances: announcements added. end. instances: {}", instances);
+        return instances;
+    }
+
+}

Propchange: sling/trunk/bundles/extensions/discovery/base/src/main/java/org/apache/sling/discovery/base/connectors/announcement/AnnouncementRegistryImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: sling/trunk/bundles/extensions/discovery/base/src/main/java/org/apache/sling/discovery/base/connectors/announcement/CachedAnnouncement.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/discovery/base/src/main/java/org/apache/sling/discovery/base/connectors/announcement/CachedAnnouncement.java?rev=1709601&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/discovery/base/src/main/java/org/apache/sling/discovery/base/connectors/announcement/CachedAnnouncement.java (added)
+++ sling/trunk/bundles/extensions/discovery/base/src/main/java/org/apache/sling/discovery/base/connectors/announcement/CachedAnnouncement.java Tue Oct 20 14:12:31 2015
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.discovery.base.connectors.announcement;
+
+import org.apache.sling.discovery.base.connectors.BaseConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * With SLING-3389 the Announcement itself doesn't use the created
+ * (ie timeout) field anymore (it still has it currently for backwards
+ * compatibility on the wire-level) - hence that's why there's this
+ * small in-memory wrapper object which contains an Announcement and 
+ * carries a lastHeartbeat property.
+ */
+public class CachedAnnouncement {
+    
+    private final static Logger logger = LoggerFactory.getLogger(CachedAnnouncement.class);
+
+    private long lastPing = System.currentTimeMillis();
+
+    private final Announcement announcement;
+    
+    private long firstPing = System.currentTimeMillis();
+
+    private long backoffIntervalSeconds = -1;
+
+    private final BaseConfig config;
+    
+    CachedAnnouncement(final Announcement announcement, final BaseConfig config) {
+        this.announcement = announcement;
+        this.config = config;
+    }
+    
+    private long getConfiguredConnectorTimeout() {
+        return config.getConnectorPingTimeout();
+    }
+    
+    private long getConfiguredConnectorInterval() {
+        return config.getConnectorPingInterval();
+    }
+
+    public final boolean hasExpired() {
+        final long now = System.currentTimeMillis();
+        final long diff = now-lastPing;
+        if (diff<1000*getEffectiveHeartbeatTimeout()) {
+            return false;
+        } else {
+            return true;
+        }
+    }
+    
+    public final long getLastPing() {
+        return lastPing;
+    }
+    
+    /** Returns the second until the next heartbeat is expected, otherwise the timeout will hit **/
+    public final long getSecondsUntilTimeout() {
+        final long now = System.currentTimeMillis();
+        final long diff = now-lastPing;
+        final long left = 1000*getEffectiveHeartbeatTimeout() - diff;
+        return left/1000;
+    }
+    
+    
+    private final long getEffectiveHeartbeatTimeout() {
+        final long configuredGoodwill = getConfiguredConnectorTimeout() - getConfiguredConnectorInterval();
+        return Math.max(getConfiguredConnectorTimeout(), backoffIntervalSeconds + configuredGoodwill);
+    }
+
+    /** Registers a heartbeat event, and returns the new resulting backoff interval -
+     * or 0 if no backoff is applicable yet.
+     * @param incomingAnnouncement 
+     * @return the new resulting backoff interval -
+     * or 0 if no backoff is applicable yet.
+     */
+    final long registerPing(Announcement incomingAnnouncement, BaseConfig config) {
+        lastPing = System.currentTimeMillis();
+        if (incomingAnnouncement.isInherited()) {
+            // then we are the client, we inherited this announcement from the server
+            // hence we have no power to do any backoff instructions towards the server
+            // (since the server decides about backoff-ing). hence returning 0 here
+            // but taking note of what the server instructed us in terms of backoff
+            backoffIntervalSeconds = incomingAnnouncement.getBackoffInterval();
+            logger.debug("registerPing: inherited announcement - hence returning 0");
+            return 0;
+        }
+        if (incomingAnnouncement.getResetBackoff()) {
+            // on resetBackoff we reset the firstHeartbeat and start 
+            // from 0 again
+            firstPing = lastPing;
+            logger.debug("registerPing: got a resetBackoff - hence returning 0");
+            return 0;
+        }
+        final long stableSince = lastPing - firstPing;
+        final long numStableTimeouts = stableSince / (1000 * config.getConnectorPingTimeout());
+        final long backoffFactor = Math.min(numStableTimeouts, config.getBackoffStableFactor());
+        backoffIntervalSeconds = backoffFactor * config.getConnectorPingInterval();
+        return backoffIntervalSeconds;
+    }
+
+    public final Announcement getAnnouncement() {
+        return announcement;
+    }
+
+}

Propchange: sling/trunk/bundles/extensions/discovery/base/src/main/java/org/apache/sling/discovery/base/connectors/announcement/CachedAnnouncement.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: sling/trunk/bundles/extensions/discovery/base/src/main/java/org/apache/sling/discovery/base/connectors/announcement/package-info.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/discovery/base/src/main/java/org/apache/sling/discovery/base/connectors/announcement/package-info.java?rev=1709601&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/discovery/base/src/main/java/org/apache/sling/discovery/base/connectors/announcement/package-info.java (added)
+++ sling/trunk/bundles/extensions/discovery/base/src/main/java/org/apache/sling/discovery/base/connectors/announcement/package-info.java Tue Oct 20 14:12:31 2015
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Provides topology announcement implementations for discovery
+ * implementors that choose to extend from discovery.base
+ *
+ * @version 1.0.0
+ */
+@Version("1.0.0")
+package org.apache.sling.discovery.base.connectors.announcement;
+
+import aQute.bnd.annotation.Version;
+

Propchange: sling/trunk/bundles/extensions/discovery/base/src/main/java/org/apache/sling/discovery/base/connectors/announcement/package-info.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: sling/trunk/bundles/extensions/discovery/base/src/main/java/org/apache/sling/discovery/base/connectors/package-info.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/discovery/base/src/main/java/org/apache/sling/discovery/base/connectors/package-info.java?rev=1709601&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/discovery/base/src/main/java/org/apache/sling/discovery/base/connectors/package-info.java (added)
+++ sling/trunk/bundles/extensions/discovery/base/src/main/java/org/apache/sling/discovery/base/connectors/package-info.java Tue Oct 20 14:12:31 2015
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Provides topology connector related classes for discovery
+ * implementors that choose to extend from discovery.base
+ *
+ * @version 1.0.0
+ */
+@Version("1.0.0")
+package org.apache.sling.discovery.base.connectors;
+
+import aQute.bnd.annotation.Version;
+

Propchange: sling/trunk/bundles/extensions/discovery/base/src/main/java/org/apache/sling/discovery/base/connectors/package-info.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: sling/trunk/bundles/extensions/discovery/base/src/main/java/org/apache/sling/discovery/base/connectors/ping/ConnectorRegistry.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/discovery/base/src/main/java/org/apache/sling/discovery/base/connectors/ping/ConnectorRegistry.java?rev=1709601&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/discovery/base/src/main/java/org/apache/sling/discovery/base/connectors/ping/ConnectorRegistry.java (added)
+++ sling/trunk/bundles/extensions/discovery/base/src/main/java/org/apache/sling/discovery/base/connectors/ping/ConnectorRegistry.java Tue Oct 20 14:12:31 2015
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.discovery.base.connectors.ping;
+
+import java.net.URL;
+import java.util.Collection;
+
+import org.apache.sling.discovery.base.commons.ClusterViewService;
+
+/**
+ * Registry for topology connector clients
+ */
+public interface ConnectorRegistry {
+
+    /** Register an outgoing topology connector using the provided endpoint url **/
+    TopologyConnectorClientInformation registerOutgoingConnector(
+            ClusterViewService clusterViewService, URL topologyConnectorEndpoint);
+
+    /** Lists all outgoing topology connectors **/
+    Collection<TopologyConnectorClientInformation> listOutgoingConnectors();
+
+    /** ping all outgoing topology connectors **/
+    void pingOutgoingConnectors(boolean force);
+
+    /** Unregister an outgoing topology connector identified by the given (connector) id **/
+    boolean unregisterOutgoingConnector(String id);
+
+}

Propchange: sling/trunk/bundles/extensions/discovery/base/src/main/java/org/apache/sling/discovery/base/connectors/ping/ConnectorRegistry.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: sling/trunk/bundles/extensions/discovery/base/src/main/java/org/apache/sling/discovery/base/connectors/ping/ConnectorRegistryImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/discovery/base/src/main/java/org/apache/sling/discovery/base/connectors/ping/ConnectorRegistryImpl.java?rev=1709601&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/discovery/base/src/main/java/org/apache/sling/discovery/base/connectors/ping/ConnectorRegistryImpl.java (added)
+++ sling/trunk/bundles/extensions/discovery/base/src/main/java/org/apache/sling/discovery/base/connectors/ping/ConnectorRegistryImpl.java Tue Oct 20 14:12:31 2015
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.discovery.base.connectors.ping;
+
+import java.net.InetAddress;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.Service;
+import org.apache.sling.discovery.base.commons.ClusterViewService;
+import org.apache.sling.discovery.base.connectors.BaseConfig;
+import org.apache.sling.discovery.base.connectors.announcement.AnnouncementRegistry;
+import org.osgi.service.component.ComponentContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Default implementation of the ConnectorRegistry which
+ * keeps a list of outgoing connectors and is capable of
+ * pinging them.
+ */
+@Component
+@Service(value = ConnectorRegistry.class)
+public class ConnectorRegistryImpl implements ConnectorRegistry {
+
+    private final Logger logger = LoggerFactory.getLogger(this.getClass());
+
+    /** A map of id-> topology connector clients currently registered/activate **/
+    private final Map<String, TopologyConnectorClient> outgoingClientsMap = new HashMap<String, TopologyConnectorClient>();
+
+    @Reference
+    private AnnouncementRegistry announcementRegistry;
+
+    @Reference
+    private BaseConfig config;
+
+    /** the local port is added to the announcement as the serverInfo object **/
+    private String port = "";
+
+    public static ConnectorRegistry testConstructor(AnnouncementRegistry announcementRegistry,
+            BaseConfig config) {
+        ConnectorRegistryImpl registry = new ConnectorRegistryImpl();
+        registry.announcementRegistry = announcementRegistry;
+        registry.config = config;
+        // Note that port is not set - but that is only for information purpose
+        // and not useful for testing
+        return registry;
+    }
+    
+    @Activate
+    protected void activate(final ComponentContext cc) {
+        port = cc.getBundleContext().getProperty("org.osgi.service.http.port");
+    }
+    
+    @Deactivate
+    protected void deactivate() {
+        synchronized (outgoingClientsMap) {
+            for (Iterator<TopologyConnectorClient> it = outgoingClientsMap.values().iterator(); it.hasNext();) {
+                final TopologyConnectorClient client = it.next();
+                client.disconnect();
+                it.remove();
+            }
+        }
+    }
+    
+    public TopologyConnectorClientInformation registerOutgoingConnector(
+            final ClusterViewService clusterViewService, final URL connectorUrl) {
+        if (announcementRegistry == null) {
+            logger.error("registerOutgoingConnection: announcementRegistry is null");
+            return null;
+        }
+        TopologyConnectorClient client;
+        synchronized (outgoingClientsMap) {
+            for (Iterator<Entry<String, TopologyConnectorClient>> it = outgoingClientsMap
+                    .entrySet().iterator(); it.hasNext();) {
+                Entry<String, TopologyConnectorClient> entry = it.next();
+                if (entry.getValue().getConnectorUrl().toExternalForm().equals(connectorUrl.toExternalForm())) {
+                    it.remove();
+                    logger.info("registerOutgoingConnection: re-registering connector: "+connectorUrl);
+                }
+            }
+            String serverInfo;
+            try {
+                serverInfo = InetAddress.getLocalHost().getCanonicalHostName()
+                        + ":" + port;
+            } catch (Exception e) {
+                serverInfo = "localhost:" + port;
+            }
+            client = new TopologyConnectorClient(clusterViewService,
+                    announcementRegistry, config, connectorUrl,
+                    serverInfo);
+            outgoingClientsMap.put(client.getId(), client);
+        }
+        client.ping(false);
+        return client;
+    }
+
+    public Collection<TopologyConnectorClientInformation> listOutgoingConnectors() {
+        final List<TopologyConnectorClientInformation> result = new ArrayList<TopologyConnectorClientInformation>();
+        synchronized (outgoingClientsMap) {
+            result.addAll(outgoingClientsMap.values());
+        }
+        return result;
+    }
+
+    public boolean unregisterOutgoingConnector(final String id) {
+        if (id == null || id.length() == 0) {
+            throw new IllegalArgumentException("id must not be null");
+        }
+        synchronized (outgoingClientsMap) {
+            TopologyConnectorClient client = outgoingClientsMap.remove(id);
+            if (client != null) {
+                client.disconnect();
+            }
+            return client != null;
+        }
+    }
+
+    public void pingOutgoingConnectors(boolean force) {
+        List<TopologyConnectorClient> outgoingTemplatesClone;
+        synchronized (outgoingClientsMap) {
+            outgoingTemplatesClone = new ArrayList<TopologyConnectorClient>(
+                    outgoingClientsMap.values());
+        }
+        for (Iterator<TopologyConnectorClient> it = outgoingTemplatesClone
+                .iterator(); it.hasNext();) {
+            it.next().ping(force);
+        }
+    }
+
+}

Propchange: sling/trunk/bundles/extensions/discovery/base/src/main/java/org/apache/sling/discovery/base/connectors/ping/ConnectorRegistryImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native