You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by ro...@apache.org on 2017/11/07 09:27:36 UTC
[sling-org-apache-sling-discovery-oak] 01/12: SLING-4603 : added
discovery.oak to the project - this is based on discovery.base,
which handles topology connectors and contains a few other base classes that
were factored out of discovery.impl for exactly this reuse
This is an automated email from the ASF dual-hosted git repository.
rombert pushed a commit to annotated tag org.apache.sling.discovery.oak-1.0.0
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-discovery-oak.git
commit d1c3dced7e62178a17e46aa3be065486f84dc9f0
Author: Stefan Egli <st...@apache.org>
AuthorDate: Wed Oct 21 15:50:37 2015 +0000
SLING-4603 : added discovery.oak to the project - this is based on discovery.base, which handles topology connectors and contains a few other base classes that were factored out of discovery.impl for exactly this reuse
git-svn-id: https://svn.apache.org/repos/asf/sling/trunk/bundles/extensions/discovery/oak@1709867 13f79535-47bb-0310-9956-ffa450edef68
---
pom.xml | 323 ++++++
.../org/apache/sling/discovery/oak/Config.java | 470 +++++++++
.../sling/discovery/oak/OakDiscoveryService.java | 642 ++++++++++++
.../discovery/oak/TopologyWebConsolePlugin.java | 1048 ++++++++++++++++++++
.../oak/cluster/OakClusterViewService.java | 227 +++++
.../sling/discovery/oak/pinger/OakViewChecker.java | 322 ++++++
.../OSGI-INF/metatype/metatype.properties | 121 +++
.../discovery/oak/OakDiscoveryServiceTest.java | 116 +++
.../discovery/oak/its/OakClusterLoadTest.java | 32 +
.../sling/discovery/oak/its/OakClusterTest.java | 32 +
.../discovery/oak/its/OakSingleInstanceTest.java | 32 +
.../discovery/oak/its/OakTopologyEventTest.java | 43 +
.../discovery/oak/its/setup/OakTestConfig.java | 80 ++
.../oak/its/setup/OakVirtualInstanceBuilder.java | 259 +++++
.../discovery/oak/its/setup/SimulatedLease.java | 59 ++
.../oak/its/setup/SimulatedLeaseCollection.java | 102 ++
src/test/resources/log4j.properties | 26 +
17 files changed, 3934 insertions(+)
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..5ee0709
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,323 @@
+<?xml version="1.0" encoding="ISO-8859-1"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.sling</groupId>
+ <artifactId>sling</artifactId>
+ <version>25</version>
+ <relativePath />
+ </parent>
+
+ <artifactId>org.apache.sling.discovery.oak</artifactId>
+ <packaging>bundle</packaging>
+ <version>1.0.0-SNAPSHOT</version>
+
+ <name>Apache Sling Oak-Based Discovery Service</name>
+ <description>Implementation of Apache Sling Discovery based on Jackrabbit Oak using its discovery-lite descriptor for in-cluster view detection and a TopologyView through HTTP POST heartbeats announcing sub-topologies to each other.</description>
+
+ <scm>
+ <connection>scm:svn:http://svn.apache.org/repos/asf/sling/trunk/bundles/extensions/discovery/oak</connection>
+ <developerConnection>scm:svn:https://svn.apache.org/repos/asf/sling/trunk/bundles/extensions/discovery/oak</developerConnection>
+ <url>http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/discovery/oak</url>
+ </scm>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <redirectTestOutputToFile>false</redirectTestOutputToFile>
+ <argLine>-Xmx2048m</argLine>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.felix</groupId>
+ <artifactId>maven-scr-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.felix</groupId>
+ <artifactId>maven-bundle-plugin</artifactId>
+ <extensions>true</extensions>
+ <configuration>
+ <instructions>
+ <Embed-Dependency>
+ commons-net;inline=org/apache/commons/net/util/SubnetUtils*
+ </Embed-Dependency>
+ </instructions>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.felix</groupId>
+ <artifactId>org.apache.felix.scr.annotations</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>biz.aQute</groupId>
+ <artifactId>bndlib</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>1.6.1</version>
+ </dependency>
+ <dependency>
+ <groupId>org.osgi</groupId>
+ <artifactId>org.osgi.core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.osgi</groupId>
+ <artifactId>org.osgi.compendium</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.sling</groupId>
+ <artifactId>org.apache.sling.jcr.api</artifactId>
+ <version>2.1.0</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>javax.jcr</groupId>
+ <artifactId>jcr</artifactId>
+ <version>2.0</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.jackrabbit</groupId>
+ <artifactId>jackrabbit-api</artifactId>
+ <version>2.2.4</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.sling</groupId>
+ <artifactId>org.apache.sling.commons.osgi</artifactId>
+ <version>2.1.0</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.sling</groupId>
+ <artifactId>org.apache.sling.settings</artifactId>
+ <version>1.2.2</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.sling</groupId>
+ <artifactId>org.apache.sling.discovery.api</artifactId>
+ <version>1.0.0</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.sling</groupId>
+ <artifactId>org.apache.sling.discovery.commons</artifactId>
+ <version>1.0.0-SNAPSHOT</version>
+ <scope>provided</scope>
+ </dependency>
+ <!-- besides including discovery.commons' normal jar above,
+ for testing a few test helper classes are also reused.
+ in order to achieve that, also adding a test/test-jar dependency: -->
+ <dependency>
+ <groupId>org.apache.sling</groupId>
+ <artifactId>org.apache.sling.discovery.commons</artifactId>
+ <version>1.0.0-SNAPSHOT</version>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.sling</groupId>
+ <artifactId>org.apache.sling.discovery.base</artifactId>
+ <version>1.0.0-SNAPSHOT</version>
+ <scope>provided</scope>
+ </dependency>
+ <!-- besides including discovery.base' normal jar above,
+ for testing a few test helper classes are also reused.
+ in order to achieve that, also adding a test/test-jar dependency: -->
+ <dependency>
+ <groupId>org.apache.sling</groupId>
+ <artifactId>org.apache.sling.discovery.base</artifactId>
+ <version>1.0.0-SNAPSHOT</version>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.sling</groupId>
+ <artifactId>org.apache.sling.api</artifactId>
+ <version>2.4.0</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.sling</groupId>
+ <artifactId>org.apache.sling.commons.scheduler</artifactId>
+ <version>2.3.4</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.felix</groupId>
+ <artifactId>org.apache.felix.webconsole</artifactId>
+ <version>3.0.0</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-servlet</artifactId>
+ <version>8.1.2.v20120308</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>javax.servlet</groupId>
+ <artifactId>servlet-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpclient-osgi</artifactId>
+ <version>4.3.5</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.sling</groupId>
+ <artifactId>org.apache.sling.commons.json</artifactId>
+ <version>2.0.6</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.sling</groupId>
+ <artifactId>org.apache.sling.launchpad.api</artifactId>
+ <version>1.1.0</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
+ <version>2.4</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>commons-net</groupId>
+ <artifactId>commons-net</artifactId>
+ <version>3.3</version>
+ <scope>provided</scope>
+ </dependency>
+ <!-- Testing -->
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>junit-addons</groupId>
+ <artifactId>junit-addons</artifactId>
+ <version>1.4</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.jmock</groupId>
+ <artifactId>jmock-junit4</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <version>1.9.5</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.sling</groupId>
+ <artifactId>org.apache.sling.commons.testing</artifactId>
+ <version>2.0.16</version>
+ <scope>test</scope>
+ <exclusions>
+ <!-- slf4j simple implementation logs INFO + higher to stdout (we don't want that behaviour) -->
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-simple</artifactId>
+ </exclusion>
+ <!-- also excluding jcl-over-slf4j as we need a newer vesion of this which is compatible with slf4j 1.6 -->
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>jcl-over-slf4j</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <!-- using log4j under slf4j to allow fine-grained logging config (see src/test/resources/log4j.properties) -->
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <version>1.7.5</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.sling</groupId>
+ <artifactId>org.apache.sling.jcr.resource</artifactId>
+ <version>2.3.8</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ <version>1.2.13</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.sling</groupId>
+ <artifactId>org.apache.sling.commons.threads</artifactId>
+ <version>3.1.0</version>
+ <type>bundle</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.sling</groupId>
+ <artifactId>org.apache.sling.testing.tools</artifactId>
+ <version>1.0.2</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.sling</groupId>
+ <artifactId>org.apache.sling.testing.sling-mock</artifactId>
+ <version>1.2.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.jackrabbit</groupId>
+ <artifactId>jackrabbit-jcr-commons</artifactId>
+ <version>2.11.0</version>
+ <type>bundle</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.jackrabbit</groupId>
+ <artifactId>jackrabbit-api</artifactId>
+ <version>2.11.0</version>
+ <type>bundle</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.jackrabbit</groupId>
+ <artifactId>oak-core</artifactId>
+ <version>1.3.7</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.jackrabbit</groupId>
+ <artifactId>oak-jcr</artifactId>
+ <version>1.3.7</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+</project>
diff --git a/src/main/java/org/apache/sling/discovery/oak/Config.java b/src/main/java/org/apache/sling/discovery/oak/Config.java
new file mode 100644
index 0000000..d2eaa4a
--- /dev/null
+++ b/src/main/java/org/apache/sling/discovery/oak/Config.java
@@ -0,0 +1,470 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.discovery.oak;
+
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Property;
+import org.apache.felix.scr.annotations.Service;
+import org.apache.sling.commons.osgi.PropertiesUtil;
+import org.apache.sling.discovery.base.connectors.BaseConfig;
+import org.apache.sling.discovery.commons.providers.spi.base.DiscoveryLiteConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Configuration object used as a central config point for the discovery service
+ * implementation
+ * <p>
+ * The properties are described below under.
+ */
+@Component(metatype = true, label="%config.name", description="%config.description")
+@Service(value = { Config.class, BaseConfig.class, DiscoveryLiteConfig.class })
+public class Config implements BaseConfig, DiscoveryLiteConfig {
+
+ private final Logger logger = LoggerFactory.getLogger(this.getClass());
+
+ /** resource used to keep instance information such as last heartbeat, properties, incoming announcements **/
+ private static final String CLUSTERINSTANCES_RESOURCE = "clusterInstances";
+
+ /** resource used to store the sync tokens as part of a topology change **/
+ private static final String SYNC_TOKEN_RESOURCE = "syncTokens";
+
+ /** resource used to store the clusterNodeIds to slingIds map **/
+ private static final String ID_MAP_RESOURCE = "idMap";
+
+ /** Configure the timeout (in seconds) after which an instance is considered dead/crashed. */
+ public static final long DEFAULT_TOPOLOGY_CONNECTOR_TIMEOUT = 120;
+ @Property(longValue=DEFAULT_TOPOLOGY_CONNECTOR_TIMEOUT)
+ public static final String TOPOLOGY_CONNECTOR_TIMEOUT_KEY = "connectorPingTimeout";
+ protected long connectorPingTimeout = DEFAULT_TOPOLOGY_CONNECTOR_TIMEOUT;
+
+ /** Configure the interval (in seconds) according to which the heartbeats are exchanged in the topology. */
+ public static final long DEFAULT_TOPOLOGY_CONNECTOR_INTERVAL = 30;
+ @Property(longValue=DEFAULT_TOPOLOGY_CONNECTOR_INTERVAL)
+ public static final String TOPOLOGY_CONNECTOR_INTERVAL_KEY = "connectorPingInterval";
+ protected long connectorPingInterval = DEFAULT_TOPOLOGY_CONNECTOR_INTERVAL;
+
+ /** Configure the interval (in seconds) according to which the heartbeats are exchanged in the topology. */
+ public static final long DEFAULT_DISCOVERY_LITE_CHECK_INTERVAL = 2;
+ @Property(longValue=DEFAULT_DISCOVERY_LITE_CHECK_INTERVAL)
+ public static final String DISCOVERY_LITE_CHECK_INTERVAL_KEY = "discoveryLiteCheckInterval";
+ protected long discoveryLiteCheckInterval = DEFAULT_DISCOVERY_LITE_CHECK_INTERVAL;
+
+ /** Configure the time (in seconds) which must be passed at minimum between sending TOPOLOGY_CHANGING/_CHANGED (avoid flooding). */
+ public static final int DEFAULT_MIN_EVENT_DELAY = 3;
+ @Property(intValue=DEFAULT_MIN_EVENT_DELAY)
+ public static final String MIN_EVENT_DELAY_KEY = "minEventDelay";
+ protected int minEventDelay = DEFAULT_MIN_EVENT_DELAY;
+
+ /** Configure the socket connect timeout for topology connectors. */
+ public static final int DEFAULT_SOCKET_CONNECT_TIMEOUT = 10;
+ @Property(intValue=DEFAULT_SOCKET_CONNECT_TIMEOUT)
+ public static final String SOCKET_CONNECT_TIMEOUT_KEY = "socketConnectTimeout";
+ private int socketConnectTimeout = DEFAULT_SOCKET_CONNECT_TIMEOUT;
+
+ /** Configure the socket read timeout (SO_TIMEOUT) for topology connectors. */
+ public static final int DEFAULT_SO_TIMEOUT = 10;
+ @Property(intValue=DEFAULT_SO_TIMEOUT)
+ public static final String SO_TIMEOUT_KEY = "soTimeout";
+ private int soTimeout = DEFAULT_SO_TIMEOUT;
+
+ /** URLs where to join a topology, eg http://localhost:4502/libs/sling/topology/connector */
+ @Property(cardinality=1024)
+ public static final String TOPOLOGY_CONNECTOR_URLS_KEY = "topologyConnectorUrls";
+ private URL[] topologyConnectorUrls = {null};
+
+ /** list of ips and/or hostnames which are allowed to connect to /libs/sling/topology/connector */
+ private static final String[] DEFAULT_TOPOLOGY_CONNECTOR_WHITELIST = {"localhost","127.0.0.1"};
+ @Property(value={"localhost","127.0.0.1"})
+ public static final String TOPOLOGY_CONNECTOR_WHITELIST_KEY = "topologyConnectorWhitelist";
+ protected String[] topologyConnectorWhitelist = DEFAULT_TOPOLOGY_CONNECTOR_WHITELIST;
+
+ /** Path of resource where to keep discovery information, e.g /var/discovery/oak/ */
+ private static final String DEFAULT_DISCOVERY_RESOURCE_PATH = "/var/discovery/oak/";
+ @Property(value=DEFAULT_DISCOVERY_RESOURCE_PATH, propertyPrivate=true)
+ public static final String DISCOVERY_RESOURCE_PATH_KEY = "discoveryResourcePath";
+ protected String discoveryResourcePath = DEFAULT_DISCOVERY_RESOURCE_PATH;
+
+ /**
+ * If set to true, local-loops of topology connectors are automatically stopped when detected so.
+ */
+ @Property(boolValue=false)
+ private static final String AUTO_STOP_LOCAL_LOOP_ENABLED = "autoStopLocalLoopEnabled";
+
+ /**
+ * If set to true, request body will be gzipped - only works if counter-part accepts gzip-requests!
+ */
+ @Property(boolValue=false)
+ private static final String GZIP_CONNECTOR_REQUESTS_ENABLED = "gzipConnectorRequestsEnabled";
+
+ /**
+ * If set to true, hmac is enabled and the white list is disabled.
+ */
+ @Property(boolValue=false)
+ private static final String HMAC_ENABLED = "hmacEnabled";
+
+ /**
+ * If set to true, and the whitelist is disabled, messages will be encrypted.
+ */
+ @Property(boolValue=false)
+ private static final String ENCRYPTION_ENABLED = "enableEncryption";
+
+ /**
+ * The value fo the shared key, shared amongst all instances in the same cluster.
+ */
+ @Property
+ private static final String SHARED_KEY = "sharedKey";
+
+ /**
+ * The default lifetime of a HMAC shared key in ms. (4h)
+ */
+ private static final long DEFAULT_SHARED_KEY_INTERVAL = 3600*1000*4;
+
+ @Property(longValue=DEFAULT_SHARED_KEY_INTERVAL)
+ private static final String SHARED_KEY_INTERVAL = "hmacSharedKeyTTL";
+
+ /**
+ * The property for defining the backoff factor for standby (loop) connectors
+ */
+ @Property
+ private static final String BACKOFF_STANDBY_FACTOR = "backoffStandbyFactor";
+ private static final int DEFAULT_BACKOFF_STANDBY_FACTOR = 5;
+
+ /**
+ * The property for defining the maximum backoff factor for stable connectors
+ */
+ @Property
+ private static final String BACKOFF_STABLE_FACTOR = "backoffStableFactor";
+ private static final int DEFAULT_BACKOFF_STABLE_FACTOR = 5;
+
+ /** True when auto-stop of a local-loop is enabled. Default is false. **/
+ private boolean autoStopLocalLoopEnabled;
+
+ /**
+ * True when the hmac is enabled and signing is disabled.
+ */
+ private boolean hmacEnabled;
+
+ /**
+ * the shared key.
+ */
+ private String sharedKey;
+
+ /**
+ * The key interval.
+ */
+ private long keyInterval;
+
+ /**
+ * true when encryption is enabled.
+ */
+ private boolean encryptionEnabled;
+
+ /**
+ * true when topology connector requests should be gzipped
+ */
+ private boolean gzipConnectorRequestsEnabled;
+
+ /** the backoff factor to be used for standby (loop) connectors **/
+ private int backoffStandbyFactor = DEFAULT_BACKOFF_STANDBY_FACTOR;
+
+ /** the maximum backoff factor to be used for stable connectors **/
+ private int backoffStableFactor = DEFAULT_BACKOFF_STABLE_FACTOR;
+
+ @Activate
+ protected void activate(final Map<String, Object> properties) {
+ logger.debug("activate: config activated.");
+ configure(properties);
+ }
+
+ protected void configure(final Map<String, Object> properties) {
+ this.connectorPingTimeout = PropertiesUtil.toLong(
+ properties.get(TOPOLOGY_CONNECTOR_TIMEOUT_KEY),
+ DEFAULT_TOPOLOGY_CONNECTOR_TIMEOUT);
+ logger.debug("configure: connectorPingTimeout='{}'",
+ this.connectorPingTimeout);
+
+ this.connectorPingInterval = PropertiesUtil.toLong(
+ properties.get(TOPOLOGY_CONNECTOR_INTERVAL_KEY),
+ DEFAULT_TOPOLOGY_CONNECTOR_INTERVAL);
+ logger.debug("configure: connectorPingInterval='{}'",
+ this.connectorPingInterval);
+
+ this.discoveryLiteCheckInterval = PropertiesUtil.toLong(
+ properties.get(DISCOVERY_LITE_CHECK_INTERVAL_KEY),
+ DEFAULT_DISCOVERY_LITE_CHECK_INTERVAL);
+ logger.debug("configure: discoveryLiteCheckInterval='{}'",
+ this.discoveryLiteCheckInterval);
+
+
+ this.minEventDelay = PropertiesUtil.toInteger(
+ properties.get(MIN_EVENT_DELAY_KEY),
+ DEFAULT_MIN_EVENT_DELAY);
+ logger.debug("configure: minEventDelay='{}'",
+ this.minEventDelay);
+
+ this.socketConnectTimeout = PropertiesUtil.toInteger(
+ properties.get(SOCKET_CONNECT_TIMEOUT_KEY),
+ DEFAULT_SOCKET_CONNECT_TIMEOUT);
+ logger.debug("configure: socketConnectTimeout='{}'",
+ this.socketConnectTimeout);
+
+ this.soTimeout = PropertiesUtil.toInteger(
+ properties.get(SO_TIMEOUT_KEY),
+ DEFAULT_SO_TIMEOUT);
+ logger.debug("configure: soTimeout='{}'",
+ this.soTimeout);
+
+
+ String[] topologyConnectorUrlsStr = PropertiesUtil.toStringArray(
+ properties.get(TOPOLOGY_CONNECTOR_URLS_KEY), null);
+ if (topologyConnectorUrlsStr!=null && topologyConnectorUrlsStr.length > 0) {
+ List<URL> urls = new LinkedList<URL>();
+ for (int i = 0; i < topologyConnectorUrlsStr.length; i++) {
+ String anUrlStr = topologyConnectorUrlsStr[i];
+ try {
+ if (anUrlStr!=null && anUrlStr.length()>0) {
+ URL url = new URL(anUrlStr);
+ logger.debug("configure: a topologyConnectorbUrl='{}'",
+ url);
+ urls.add(url);
+ }
+ } catch (MalformedURLException e) {
+ logger.error("configure: could not set a topologyConnectorUrl: " + e,
+ e);
+ }
+ }
+ if (urls.size()>0) {
+ this.topologyConnectorUrls = urls.toArray(new URL[urls.size()]);
+ logger.debug("configure: number of topologyConnectorUrls='{}''",
+ urls.size());
+ } else {
+ this.topologyConnectorUrls = null;
+ logger.debug("configure: no (valid) topologyConnectorUrls configured");
+ }
+ } else {
+ this.topologyConnectorUrls = null;
+ logger.debug("configure: no (valid) topologyConnectorUrls configured");
+ }
+ this.topologyConnectorWhitelist = PropertiesUtil.toStringArray(
+ properties.get(TOPOLOGY_CONNECTOR_WHITELIST_KEY),
+ DEFAULT_TOPOLOGY_CONNECTOR_WHITELIST);
+ logger.debug("configure: topologyConnectorWhitelist='{}'",
+ this.topologyConnectorWhitelist);
+
+ this.discoveryResourcePath = PropertiesUtil.toString(
+ properties.get(DISCOVERY_RESOURCE_PATH_KEY),
+ "");
+ while(this.discoveryResourcePath.endsWith("/")) {
+ this.discoveryResourcePath = this.discoveryResourcePath.substring(0,
+ this.discoveryResourcePath.length()-1);
+ }
+ this.discoveryResourcePath = this.discoveryResourcePath + "/";
+ if (this.discoveryResourcePath==null || this.discoveryResourcePath.length()<=1) {
+ // if the path is empty, or /, then use the default
+ this.discoveryResourcePath = DEFAULT_DISCOVERY_RESOURCE_PATH;
+ }
+ logger.debug("configure: discoveryResourcePath='{}'",
+ this.discoveryResourcePath);
+
+ autoStopLocalLoopEnabled = PropertiesUtil.toBoolean(properties.get(AUTO_STOP_LOCAL_LOOP_ENABLED), false);
+ gzipConnectorRequestsEnabled = PropertiesUtil.toBoolean(properties.get(GZIP_CONNECTOR_REQUESTS_ENABLED), false);
+
+ hmacEnabled = PropertiesUtil.toBoolean(properties.get(HMAC_ENABLED), true);
+ encryptionEnabled = PropertiesUtil.toBoolean(properties.get(ENCRYPTION_ENABLED), false);
+ sharedKey = PropertiesUtil.toString(properties.get(SHARED_KEY), null);
+ keyInterval = PropertiesUtil.toLong(SHARED_KEY_INTERVAL, DEFAULT_SHARED_KEY_INTERVAL);
+
+ backoffStandbyFactor = PropertiesUtil.toInteger(properties.get(BACKOFF_STANDBY_FACTOR),
+ DEFAULT_BACKOFF_STANDBY_FACTOR);
+ backoffStableFactor = PropertiesUtil.toInteger(properties.get(BACKOFF_STABLE_FACTOR),
+ DEFAULT_BACKOFF_STABLE_FACTOR);
+ }
+
+ /**
+ * Returns the socket connect() timeout used by the topology connector, 0 disables the timeout
+ * @return the socket connect() timeout used by the topology connector, 0 disables the timeout
+ */
+ public int getSocketConnectTimeout() {
+ return socketConnectTimeout;
+ }
+
+ /**
+ * Returns the socket read timeout (SO_TIMEOUT) used by the topology connector, 0 disables the timeout
+ * @return the socket read timeout (SO_TIMEOUT) used by the topology connector, 0 disables the timeout
+ */
+ public int getSoTimeout() {
+ return soTimeout;
+ }
+
+ /**
+ * Returns the minimum time (in seconds) between sending TOPOLOGY_CHANGING/_CHANGED events - to avoid flooding
+ * @return the minimum time (in seconds) between sending TOPOLOGY_CHANGING/_CHANGED events - to avoid flooding
+ */
+ public int getMinEventDelay() {
+ return minEventDelay;
+ }
+
+ /**
+ * Returns the URLs to which to open a topology connector - or null/empty if no topology connector
+ * is configured (default is null)
+ * @return the URLs to which to open a topology connector - or null/empty if no topology connector
+ * is configured
+ */
+ public URL[] getTopologyConnectorURLs() {
+ return topologyConnectorUrls;
+ }
+
+ /**
+ * Returns a comma separated list of hostnames and/or ip addresses which are allowed as
+ * remote hosts to open connections to the topology connector servlet
+ * @return a comma separated list of hostnames and/or ip addresses which are allowed as
+ * remote hosts to open connections to the topology connector servlet
+ */
+ public String[] getTopologyConnectorWhitelist() {
+ return topologyConnectorWhitelist;
+ }
+
+ protected String getDiscoveryResourcePath() {
+ return discoveryResourcePath;
+ }
+
+ /**
+ * Returns the resource path where cluster instance informations are stored.
+ * @return the resource path where cluster instance informations are stored
+ */
+ public String getClusterInstancesPath() {
+ return getDiscoveryResourcePath() + CLUSTERINSTANCES_RESOURCE;
+ }
+
+ @Override
+ public String getSyncTokenPath() {
+ return getDiscoveryResourcePath() + SYNC_TOKEN_RESOURCE;
+ }
+
+ @Override
+ public String getIdMapPath() {
+ return getDiscoveryResourcePath() + ID_MAP_RESOURCE;
+ }
+
+ /**
+ * @return true if hmac is enabled.
+ */
+ public boolean isHmacEnabled() {
+ return hmacEnabled;
+ }
+
+ /**
+ * @return the shared key
+ */
+ public String getSharedKey() {
+ return sharedKey;
+ }
+
+ /**
+ * @return the interval of the shared key for hmac.
+ */
+ public long getKeyInterval() {
+ return keyInterval;
+ }
+
+ /**
+ * @return true if encryption is enabled.
+ */
+ public boolean isEncryptionEnabled() {
+ return encryptionEnabled;
+ }
+
+ /**
+ * @return true if requests on the topology connector should be gzipped
+ * (which only works if the server accepts that.. ie discovery.impl 1.0.4+)
+ */
+ public boolean isGzipConnectorRequestsEnabled() {
+ return gzipConnectorRequestsEnabled;
+ }
+
+ /**
+ * @return true if the auto-stopping of local-loop topology connectors is enabled.
+ */
+ public boolean isAutoStopLocalLoopEnabled() {
+ return autoStopLocalLoopEnabled;
+ }
+
+ /**
+ * Returns the backoff factor to be used for standby (loop) connectors
+ * @return the backoff factor to be used for standby (loop) connectors
+ */
+ public int getBackoffStandbyFactor() {
+ return backoffStandbyFactor;
+ }
+
+ /**
+ * Returns the (maximum) backoff factor to be used for stable connectors
+ * @return the (maximum) backoff factor to be used for stable connectors
+ */
+ public int getBackoffStableFactor() {
+ return backoffStableFactor;
+ }
+
+ /**
+ * Returns the backoff interval for standby (loop) connectors in seconds
+ * @return the backoff interval for standby (loop) connectors in seconds
+ */
+ public long getBackoffStandbyInterval() {
+ final int factor = getBackoffStandbyFactor();
+ if (factor<=1) {
+ return -1;
+ } else {
+ return factor * getConnectorPingInterval();
+ }
+ }
+
+ @Override
+ public long getConnectorPingInterval() {
+ return connectorPingInterval;
+ }
+
+ @Override
+ public long getConnectorPingTimeout() {
+ return connectorPingTimeout;
+ }
+
+ public long getDiscoveryLiteCheckInterval() {
+ return discoveryLiteCheckInterval;
+ }
+
+ @Override
+ public long getBgTimeoutMillis() {
+ // TODO: currently hard coded
+ return -1;
+ }
+
+ @Override
+ public long getBgIntervalMillis() {
+ // TODO: currently hard coded
+ return 1000;
+ }
+}
diff --git a/src/main/java/org/apache/sling/discovery/oak/OakDiscoveryService.java b/src/main/java/org/apache/sling/discovery/oak/OakDiscoveryService.java
new file mode 100644
index 0000000..084fe50
--- /dev/null
+++ b/src/main/java/org/apache/sling/discovery/oak/OakDiscoveryService.java
@@ -0,0 +1,642 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.discovery.oak;
+
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.ReferencePolicy;
+import org.apache.felix.scr.annotations.Service;
+import org.apache.sling.api.resource.LoginException;
+import org.apache.sling.api.resource.ModifiableValueMap;
+import org.apache.sling.api.resource.PersistenceException;
+import org.apache.sling.api.resource.Resource;
+import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.api.resource.ResourceResolverFactory;
+import org.apache.sling.commons.scheduler.Scheduler;
+import org.apache.sling.discovery.DiscoveryService;
+import org.apache.sling.discovery.InstanceDescription;
+import org.apache.sling.discovery.PropertyProvider;
+import org.apache.sling.discovery.TopologyEventListener;
+import org.apache.sling.discovery.base.commons.BaseDiscoveryService;
+import org.apache.sling.discovery.base.commons.ClusterViewService;
+import org.apache.sling.discovery.base.commons.DefaultTopologyView;
+import org.apache.sling.discovery.base.connectors.announcement.AnnouncementRegistry;
+import org.apache.sling.discovery.base.connectors.ping.ConnectorRegistry;
+import org.apache.sling.discovery.commons.providers.BaseTopologyView;
+import org.apache.sling.discovery.commons.providers.DefaultClusterView;
+import org.apache.sling.discovery.commons.providers.DefaultInstanceDescription;
+import org.apache.sling.discovery.commons.providers.ViewStateManager;
+import org.apache.sling.discovery.commons.providers.base.ViewStateManagerFactory;
+import org.apache.sling.discovery.commons.providers.spi.base.IdMapService;
+import org.apache.sling.discovery.commons.providers.spi.base.OakSyncTokenConsistencyService;
+import org.apache.sling.discovery.commons.providers.util.PropertyNameHelper;
+import org.apache.sling.discovery.commons.providers.util.ResourceHelper;
+import org.apache.sling.discovery.oak.pinger.OakViewChecker;
+import org.apache.sling.settings.SlingSettingsService;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.Constants;
+import org.osgi.framework.ServiceRegistration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This implementation of the cross-cluster service uses the view manager
+ * implementation for detecting changes in a cluster and only supports one
+ * cluster (of which this instance is part of).
+ */
+@Component(immediate = true)
+@Service(value = { DiscoveryService.class, OakDiscoveryService.class })
+public class OakDiscoveryService extends BaseDiscoveryService {
+
+ private final static Logger logger = LoggerFactory.getLogger(OakDiscoveryService.class);
+
+ @Reference
+ private SlingSettingsService settingsService;
+
+ @Reference(cardinality = ReferenceCardinality.OPTIONAL_MULTIPLE, policy = ReferencePolicy.DYNAMIC, referenceInterface = TopologyEventListener.class)
+ private TopologyEventListener[] eventListeners = new TopologyEventListener[0];
+
+ /**
+ * All property providers.
+ */
+ @Reference(cardinality = ReferenceCardinality.OPTIONAL_MULTIPLE, policy = ReferencePolicy.DYNAMIC, referenceInterface = PropertyProvider.class, updated = "updatedPropertyProvider")
+ private List<ProviderInfo> providerInfos = new ArrayList<ProviderInfo>();
+
+ /** lock object used for synching bind/unbind and topology event sending **/
+ private final Object lock = new Object();
+
+ /**
+ * whether or not this service is activated - necessary to avoid sending
+ * events to discovery awares before activate is done
+ **/
+ private boolean activated = false;
+
+ @Reference
+ private ResourceResolverFactory resourceResolverFactory;
+
+ @Reference
+ private Scheduler scheduler;
+
+ @Reference
+ private OakViewChecker oakViewChecker;
+
+ @Reference
+ private AnnouncementRegistry announcementRegistry;
+
+ @Reference
+ private ConnectorRegistry connectorRegistry;
+
+ @Reference
+ private ClusterViewService clusterViewService;
+
+ @Reference
+ private Config config;
+
+ @Reference
+ private IdMapService idMapService;
+
+ @Reference
+ private OakSyncTokenConsistencyService consistencyService;
+
+ /** the slingId of the local instance **/
+ private String slingId;
+
+ private ServiceRegistration mbeanRegistration;
+
+ private ViewStateManager viewStateManager;
+
+ private final ReentrantLock viewStateManagerLock = new ReentrantLock();
+
+ private final List<TopologyEventListener> pendingListeners = new LinkedList<TopologyEventListener>();
+
+ public static OakDiscoveryService testConstructor(SlingSettingsService settingsService,
+ AnnouncementRegistry announcementRegistry,
+ ConnectorRegistry connectorRegistry,
+ ClusterViewService clusterViewService,
+ Config config,
+ OakViewChecker connectorPinger,
+ Scheduler scheduler,
+ IdMapService idMapService,
+ OakSyncTokenConsistencyService consistencyService,
+ ResourceResolverFactory factory) {
+ OakDiscoveryService discoService = new OakDiscoveryService();
+ discoService.settingsService = settingsService;
+ discoService.announcementRegistry = announcementRegistry;
+ discoService.connectorRegistry = connectorRegistry;
+ discoService.clusterViewService = clusterViewService;
+ discoService.config = config;
+ discoService.oakViewChecker = connectorPinger;
+ discoService.scheduler = scheduler;
+ discoService.idMapService = idMapService;
+ discoService.consistencyService = consistencyService;
+ discoService.resourceResolverFactory = factory;
+ return discoService;
+ }
+
+ protected void handleIsolatedFromTopology() {
+ if (oakViewChecker!=null) {
+ // SLING-5030 part 2: when we detect being isolated we should
+ // step at the end of the leader-election queue and
+ // that can be achieved by resetting the leaderElectionId
+ // (which will in turn take effect on the next round of
+ // voting, or also double-checked when the local instance votes)
+ //
+ //TODO:
+ // Note that when the local instance doesn't notice
+ // an 'ISOLATED_FROM_TOPOLOGY' case, then the leaderElectionId
+ // will not be reset. Which means that it then could potentially
+ // regain leadership.
+ if (oakViewChecker.resetLeaderElectionId()) {
+ logger.info("getTopology: reset leaderElectionId to force this instance to the end of the instance order (thus incl not to remain leader)");
+ }
+ }
+ }
+
+ /**
+ * Activate this service
+ */
+ @Activate
+ protected void activate(final BundleContext bundleContext) {
+ logger.debug("OakDiscoveryService activating...");
+
+ if (settingsService == null) {
+ throw new IllegalStateException("settingsService not found");
+ }
+ if (oakViewChecker == null) {
+ throw new IllegalStateException("heartbeatHandler not found");
+ }
+
+ slingId = settingsService.getSlingId();
+
+ //TODO: this should fail as bind can now run into viewStateManager==null
+ viewStateManager = ViewStateManagerFactory.newViewStateManager(viewStateManagerLock, consistencyService);
+
+ if (config.getMinEventDelay()>0) {
+ viewStateManager.installMinEventDelayHandler(this, scheduler, config.getMinEventDelay());
+ }
+
+ final String isolatedClusterId = UUID.randomUUID().toString();
+ {
+ // create a pre-voting/isolated topologyView which would be used
+ // until the first voting has finished.
+ // this way for the single-instance case the clusterId can
+ // remain the same between a getTopology() that is invoked before
+ // the first TOPOLOGY_INIT and afterwards
+ DefaultClusterView isolatedCluster = new DefaultClusterView(isolatedClusterId);
+ Map<String, String> emptyProperties = new HashMap<String, String>();
+ DefaultInstanceDescription isolatedInstance =
+ new DefaultInstanceDescription(isolatedCluster, true, true, slingId, emptyProperties);
+ Collection<InstanceDescription> col = new ArrayList<InstanceDescription>();
+ col.add(isolatedInstance);
+ final DefaultTopologyView topology = new DefaultTopologyView();
+ topology.addInstances(col);
+ topology.setNotCurrent();
+ setOldView(topology);
+ }
+ setOldView((DefaultTopologyView) getTopology());
+ getOldView().setNotCurrent();
+
+ // make sure the first heartbeat is issued as soon as possible - which
+ // is right after this service starts. since the two (discoveryservice
+ // and heartbeatHandler need to know each other, the discoveryservice
+ // is passed on to the heartbeatHandler in this initialize call).
+ oakViewChecker.initialize(this);
+
+ viewStateManagerLock.lock();
+ try{
+ viewStateManager.handleActivated();
+
+ doUpdateProperties();
+
+ DefaultTopologyView newView = (DefaultTopologyView) getTopology();
+ if (newView.isCurrent()) {
+ viewStateManager.handleNewView(newView);
+ } else {
+ // SLING-3750: just issue a log.info about the delaying
+ logger.info("activate: this instance is in isolated mode and must yet finish voting before it can send out TOPOLOGY_INIT.");
+ }
+ activated = true;
+ setOldView(newView);
+
+ // in case bind got called before activate we now have pending listeners,
+ // bind them to the viewstatemanager too
+ for (TopologyEventListener listener : pendingListeners) {
+ viewStateManager.bind(listener);
+ }
+ pendingListeners.clear();
+ } finally {
+ if (viewStateManagerLock!=null) {
+ viewStateManagerLock.unlock();
+ }
+ }
+
+ URL[] topologyConnectorURLs = config.getTopologyConnectorURLs();
+ if (topologyConnectorURLs != null) {
+ for (int i = 0; i < topologyConnectorURLs.length; i++) {
+ final URL aURL = topologyConnectorURLs[i];
+ if (aURL!=null) {
+ try{
+ logger.info("activate: registering outgoing topology connector to "+aURL);
+ connectorRegistry.registerOutgoingConnector(clusterViewService, aURL);
+ } catch (final Exception e) {
+ logger.info("activate: could not register url: "+aURL+" due to: "+e, e);
+ }
+ }
+ }
+ }
+
+ logger.debug("OakDiscoveryService activated.");
+ }
+
+ /**
+ * Deactivate this service
+ */
+ @Deactivate
+ protected void deactivate() {
+ logger.debug("OakDiscoveryService deactivated.");
+ viewStateManagerLock.lock();
+ try{
+ viewStateManager.handleDeactivated();
+
+ activated = false;
+ } finally {
+ if (viewStateManagerLock!=null) {
+ viewStateManagerLock.unlock();
+ }
+ }
+ try{
+ if ( this.mbeanRegistration != null ) {
+ this.mbeanRegistration.unregister();
+ this.mbeanRegistration = null;
+ }
+ } catch(Exception e) {
+ logger.error("deactivate: Error on unregister: "+e, e);
+ }
+ }
+
+ /**
+ * bind a topology event listener
+ */
+ protected void bindTopologyEventListener(final TopologyEventListener eventListener) {
+ viewStateManagerLock.lock();
+ try{
+ if (!activated) {
+ pendingListeners.add(eventListener);
+ } else {
+ viewStateManager.bind(eventListener);
+ }
+ } finally {
+ if (viewStateManagerLock!=null) {
+ viewStateManagerLock.unlock();
+ }
+ }
+ }
+
+ /**
+ * Unbind a topology event listener
+ */
+ protected void unbindTopologyEventListener(final TopologyEventListener eventListener) {
+ viewStateManagerLock.lock();
+ try{
+ if (!activated) {
+ pendingListeners.remove(eventListener);
+ } else {
+ viewStateManager.unbind(eventListener);
+ }
+ } finally {
+ if (viewStateManagerLock!=null) {
+ viewStateManagerLock.unlock();
+ }
+ }
+ }
+
+ /**
+ * Bind a new property provider.
+ */
+ protected void bindPropertyProvider(final PropertyProvider propertyProvider,
+ final Map<String, Object> props) {
+ logger.debug("bindPropertyProvider: Binding PropertyProvider {}",
+ propertyProvider);
+
+ synchronized (lock) {
+ this.bindPropertyProviderInteral(propertyProvider, props);
+ }
+ }
+
+ /**
+ * Bind a new property provider.
+ */
+ private void bindPropertyProviderInteral(final PropertyProvider propertyProvider,
+ final Map<String, Object> props) {
+ final ProviderInfo info = new ProviderInfo(propertyProvider, props);
+ this.providerInfos.add(info);
+ Collections.sort(this.providerInfos);
+ this.doUpdateProperties();
+ handlePotentialTopologyChange();
+ }
+
+ /**
+ * Update a property provider.
+ */
+ protected void updatedPropertyProvider(final PropertyProvider propertyProvider,
+ final Map<String, Object> props) {
+ logger.debug("bindPropertyProvider: Updating PropertyProvider {}",
+ propertyProvider);
+
+ synchronized (lock) {
+ this.unbindPropertyProviderInternal(propertyProvider, props, false);
+ this.bindPropertyProviderInteral(propertyProvider, props);
+ }
+ }
+
+ /**
+ * Unbind a property provider
+ */
+ protected void unbindPropertyProvider(final PropertyProvider propertyProvider,
+ final Map<String, Object> props) {
+ logger.debug("unbindPropertyProvider: Releasing PropertyProvider {}",
+ propertyProvider);
+ synchronized (lock) {
+ this.unbindPropertyProviderInternal(propertyProvider, props, true);
+ }
+ }
+
+ /**
+ * Unbind a property provider
+ */
+ private void unbindPropertyProviderInternal(
+ final PropertyProvider propertyProvider,
+ final Map<String, Object> props, final boolean update) {
+
+ final ProviderInfo info = new ProviderInfo(propertyProvider, props);
+ if ( this.providerInfos.remove(info) && update ) {
+ this.doUpdateProperties();
+ this.handlePotentialTopologyChange();
+ }
+ }
+
+ /**
+ * Update the properties by inquiring the PropertyProvider's current values.
+ * <p>
+ * This method is invoked regularly by the heartbeatHandler.
+ * The properties are stored in the repository under Config.getClusterInstancesPath()
+ * and announced in the topology.
+ * <p>
+ * @see Config#getClusterInstancesPath()
+ */
+ private void doUpdateProperties() {
+ if (resourceResolverFactory == null) {
+ // cannot update the properties then..
+ logger.debug("doUpdateProperties: too early to update the properties. resourceResolverFactory not yet set.");
+ return;
+ } else {
+ logger.debug("doUpdateProperties: updating properties now..");
+ }
+
+ final Map<String, String> newProps = new HashMap<String, String>();
+ for (final ProviderInfo info : this.providerInfos) {
+ info.refreshProperties();
+ newProps.putAll(info.properties);
+ }
+
+ ResourceResolver resourceResolver = null;
+ try {
+ resourceResolver = resourceResolverFactory
+ .getAdministrativeResourceResolver(null);
+
+ Resource myInstance = ResourceHelper
+ .getOrCreateResource(
+ resourceResolver,
+ config.getClusterInstancesPath()
+ + "/" + slingId + "/properties");
+ // SLING-2879 - revert/refresh resourceResolver here to work
+ // around a potential issue with jackrabbit in a clustered environment
+ resourceResolver.revert();
+ resourceResolver.refresh();
+
+ final ModifiableValueMap myInstanceMap = myInstance.adaptTo(ModifiableValueMap.class);
+ final Set<String> keys = new HashSet<String>(myInstanceMap.keySet());
+ for(final String key : keys) {
+ if (newProps.containsKey(key)) {
+ // perfect
+ continue;
+ } else if (key.indexOf(":")!=-1) {
+ // ignore
+ continue;
+ } else {
+ // remove
+ myInstanceMap.remove(key);
+ }
+ }
+
+ boolean anyChanges = false;
+ for(final Entry<String, String> entry : newProps.entrySet()) {
+ Object existingValue = myInstanceMap.get(entry.getKey());
+ if (entry.getValue().equals(existingValue)) {
+ // SLING-3389: dont rewrite the properties if nothing changed!
+ if (logger.isDebugEnabled()) {
+ logger.debug("doUpdateProperties: unchanged: {}={}", entry.getKey(), entry.getValue());
+ }
+ continue;
+ }
+ if (logger.isDebugEnabled()) {
+ logger.debug("doUpdateProperties: changed: {}={}", entry.getKey(), entry.getValue());
+ }
+ anyChanges = true;
+ myInstanceMap.put(entry.getKey(), entry.getValue());
+ }
+
+ if (anyChanges) {
+ resourceResolver.commit();
+ }
+ } catch (LoginException e) {
+ logger.error(
+ "handleEvent: could not log in administratively: " + e, e);
+ throw new RuntimeException("Could not log in to repository (" + e
+ + ")", e);
+ } catch (PersistenceException e) {
+ logger.error("handleEvent: got a PersistenceException: " + e, e);
+ throw new RuntimeException(
+ "Exception while talking to repository (" + e + ")", e);
+ } finally {
+ if (resourceResolver != null) {
+ resourceResolver.close();
+ }
+ }
+
+ logger.debug("doUpdateProperties: updating properties done.");
+ }
+
+ /**
+ * Update the properties and sent a topology event if applicable
+ */
+ public void updateProperties() {
+ synchronized (lock) {
+ logger.debug("updateProperties: calling doUpdateProperties.");
+ doUpdateProperties();
+ logger.debug("updateProperties: calling handlePotentialTopologyChange.");
+ handlePotentialTopologyChange();
+ logger.debug("updateProperties: done.");
+ }
+ }
+
+ /**
+ * Internal class caching some provider infos like service id and ranking.
+ */
+ private final static class ProviderInfo implements Comparable<ProviderInfo> {
+
+ public final PropertyProvider provider;
+ public final Object propertyProperties;
+ public final int ranking;
+ public final long serviceId;
+ public final Map<String, String> properties = new HashMap<String, String>();
+
+ public ProviderInfo(final PropertyProvider provider,
+ final Map<String, Object> serviceProps) {
+ this.provider = provider;
+ this.propertyProperties = serviceProps.get(PropertyProvider.PROPERTY_PROPERTIES);
+ final Object sr = serviceProps.get(Constants.SERVICE_RANKING);
+ if (sr == null || !(sr instanceof Integer)) {
+ this.ranking = 0;
+ } else {
+ this.ranking = (Integer) sr;
+ }
+ this.serviceId = (Long) serviceProps.get(Constants.SERVICE_ID);
+ refreshProperties();
+ }
+
+ public void refreshProperties() {
+ properties.clear();
+ if (this.propertyProperties instanceof String) {
+ final String val = provider.getProperty((String) this.propertyProperties);
+ if (val != null) {
+ putPropertyIfValid((String) this.propertyProperties, val);
+ }
+ } else if (this.propertyProperties instanceof String[]) {
+ for (final String name : (String[]) this.propertyProperties) {
+ final String val = provider.getProperty(name);
+ if (val != null) {
+ putPropertyIfValid(name, val);
+ }
+ }
+ }
+ }
+
+ /** SLING-2883 : put property only if valid **/
+ private void putPropertyIfValid(final String name, final String val) {
+ if (PropertyNameHelper.isValidPropertyName(name)) {
+ this.properties.put(name, val);
+ }
+ }
+
+ /**
+ * @see java.lang.Comparable#compareTo(java.lang.Object)
+ */
+ public int compareTo(final ProviderInfo o) {
+ // Sort by rank in ascending order.
+ if (this.ranking < o.ranking) {
+ return -1; // lower rank
+ } else if (this.ranking > o.ranking) {
+ return 1; // higher rank
+ }
+ // If ranks are equal, then sort by service id in descending order.
+ return (this.serviceId < o.serviceId) ? 1 : -1;
+ }
+
+ @Override
+ public boolean equals(final Object obj) {
+ if (obj instanceof ProviderInfo) {
+ return ((ProviderInfo) obj).serviceId == this.serviceId;
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return provider.hashCode();
+ }
+ }
+
+ /**
+ * Handle the fact that the topology has likely changed
+ */
+ public void handlePotentialTopologyChange() {
+ viewStateManagerLock.lock();
+ try{
+ if (!activated) {
+ logger.debug("handlePotentialTopologyChange: not yet activated, ignoring");
+ return;
+ }
+ BaseTopologyView t = (BaseTopologyView) getTopology();
+ if (t.isCurrent()) {
+ // if we have a valid view, let the viewStateManager do the
+ // comparison and sending of an event, if necessary
+ viewStateManager.handleNewView(t);
+ } else {
+ // if we don't have a view, then we might have to send
+ // a CHANGING event, let that be decided by the viewStateManager as well
+ viewStateManager.handleChanging();
+ }
+ } finally {
+ if (viewStateManagerLock!=null) {
+ viewStateManagerLock.unlock();
+ }
+ }
+ }
+
+ /**
+ * Handle the fact that the topology has started to change - inform the listeners asap
+ */
+ public void handleTopologyChanging() {
+ logger.debug("handleTopologyChanging: invoking viewStateManager.handlechanging");
+ viewStateManager.handleChanging();
+ }
+
+ protected ClusterViewService getClusterViewService() {
+ return clusterViewService;
+ }
+
+ protected AnnouncementRegistry getAnnouncementRegistry() {
+ return announcementRegistry;
+ }
+
+ /** for testing only
+ * @return */
+ public ViewStateManager getViewStateManager() {
+ return viewStateManager;
+ }
+
+}
diff --git a/src/main/java/org/apache/sling/discovery/oak/TopologyWebConsolePlugin.java b/src/main/java/org/apache/sling/discovery/oak/TopologyWebConsolePlugin.java
new file mode 100644
index 0000000..2c635a4
--- /dev/null
+++ b/src/main/java/org/apache/sling/discovery/oak/TopologyWebConsolePlugin.java
@@ -0,0 +1,1048 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.discovery.oak;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.StringTokenizer;
+
+import javax.servlet.Servlet;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Properties;
+import org.apache.felix.scr.annotations.Property;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.Service;
+import org.apache.felix.webconsole.AbstractWebConsolePlugin;
+import org.apache.felix.webconsole.WebConsoleConstants;
+import org.apache.sling.api.resource.LoginException;
+import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.api.resource.ResourceResolverFactory;
+import org.apache.sling.discovery.ClusterView;
+import org.apache.sling.discovery.InstanceDescription;
+import org.apache.sling.discovery.InstanceFilter;
+import org.apache.sling.discovery.TopologyEvent;
+import org.apache.sling.discovery.TopologyEvent.Type;
+import org.apache.sling.discovery.base.commons.ClusterViewService;
+import org.apache.sling.discovery.base.connectors.announcement.Announcement;
+import org.apache.sling.discovery.base.connectors.announcement.AnnouncementRegistry;
+import org.apache.sling.discovery.base.connectors.announcement.CachedAnnouncement;
+import org.apache.sling.discovery.base.connectors.ping.ConnectorRegistry;
+import org.apache.sling.discovery.base.connectors.ping.TopologyConnectorClientInformation;
+import org.apache.sling.discovery.commons.providers.spi.base.DiscoveryLiteDescriptor;
+import org.apache.sling.discovery.TopologyEventListener;
+import org.apache.sling.discovery.TopologyView;
+import org.osgi.framework.BundleContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Simple webconsole which gives an overview of the topology visible by the
+ * discovery service
+ */
+@Component
+@Service(value = { TopologyEventListener.class, Servlet.class })
+@Properties({
+ @Property(name=org.osgi.framework.Constants.SERVICE_DESCRIPTION,
+ value="Apache Sling Web Console Plugin to display Background servlets and ExecutionEngine status"),
+ @Property(name=WebConsoleConstants.PLUGIN_LABEL, value=TopologyWebConsolePlugin.LABEL),
+ @Property(name=WebConsoleConstants.PLUGIN_TITLE, value=TopologyWebConsolePlugin.TITLE),
+ @Property(name="felix.webconsole.configprinter.modes", value={"zip"})
+})
+@SuppressWarnings("serial")
+public class TopologyWebConsolePlugin extends AbstractWebConsolePlugin implements TopologyEventListener {
+
+ public static final String LABEL = "topology";
+ public static final String TITLE = "Topology Management";
+
+ private final Logger logger = LoggerFactory.getLogger(this.getClass());
+
+ /** the truncated log of topology events, filtered by property change types. shown in webconsole **/
+ private final List<String> propertyChangeLog = new LinkedList<String>();
+
+ /** the truncated log of topology events, shown in webconsole **/
+ private final List<String> topologyLog = new LinkedList<String>();
+
+ /** the date format used in the truncated log of topology events **/
+ private final SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss z");
+
+ @Reference
+ private ClusterViewService clusterViewService;
+
+ @Reference
+ private AnnouncementRegistry announcementRegistry;
+
+ @Reference
+ private ConnectorRegistry connectorRegistry;
+
+ @Reference
+ protected ResourceResolverFactory resourceResolverFactory;
+
+ private TopologyView currentView;
+
+ private List<String> discoveryLiteHistory = new LinkedList<String>();
+
+ @Override
+ public String getLabel() {
+ return LABEL;
+ }
+
+ @Override
+ public String getTitle() {
+ return TITLE;
+ }
+
+ @Activate
+ @Override
+ public void activate(final BundleContext bundleContext) {
+ super.activate(bundleContext);
+ }
+
+ @Deactivate
+ @Override
+ public void deactivate() {
+ super.deactivate();
+ }
+
+ @Override
+ protected void renderContent(final HttpServletRequest req, final HttpServletResponse res)
+ throws ServletException, IOException {
+ Object rawRoot = req.getAttribute(WebConsoleConstants.ATTR_PLUGIN_ROOT);
+ if (!(rawRoot instanceof String)) {
+ throw new ServletException("Illegal attr: "
+ + WebConsoleConstants.ATTR_PLUGIN_ROOT);
+ }
+
+ String root = rawRoot.toString();
+ String pathInfo = req.getRequestURI().substring(root.length());
+
+ final PrintWriter pw = res.getWriter();
+
+ if (pathInfo.equals("")) {
+ if ( this.currentView != null ) {
+ renderOverview(pw, currentView);
+ } else {
+ pw.println("<p class=\"statline ui-state-highlight\">No view available</p>");
+ pw.println("<br/>");
+ pw.println("No TOPOLOGY_INIT received yet, therefore no view available yet.");
+ }
+ } else {
+ StringTokenizer st = new StringTokenizer(pathInfo, "/");
+ final String nodeId = st.nextToken();
+ renderProperties(pw, req.getContextPath(), nodeId);
+ }
+ }
+
+ /**
+ * Render the properties page of a particular instance
+ */
+ private void renderProperties(final PrintWriter pw, final String contextPath, final String nodeId) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("renderProperties: nodeId=" + nodeId);
+ }
+ final TopologyView tv = this.currentView;
+ @SuppressWarnings("unchecked")
+ Set<InstanceDescription> instances = ( tv == null ? (Set<InstanceDescription>)Collections.EMPTY_SET :
+
+ tv.findInstances(new InstanceFilter() {
+
+ public boolean accept(InstanceDescription instance) {
+ String slingId = instance.getSlingId();
+ if (logger.isDebugEnabled()) {
+ logger.debug("renderProperties/picks: slingId={}", slingId);
+ }
+ return (slingId.equals(nodeId));
+ }
+ }));
+
+ if (instances != null && instances.size() == 1) {
+ InstanceDescription instance = instances.iterator().next();
+ pw.println("Properties of " + instance.getSlingId() + ":<br/>");
+
+ pw.println("<table class=\"adapters nicetable ui-widget tablesorter\">");
+ pw.println("<thead>");
+ pw.println("<tr>");
+ pw.println("<th class=\"header ui-widget-header\">Key</th>");
+ pw.println("<th class=\"header ui-widget-header\">Value</th>");
+ pw.println("</tr>");
+ pw.println("</thead>");
+ pw.println("<tbody>");
+ boolean odd = true;
+ for (Iterator<Entry<String, String>> it = instance.getProperties()
+ .entrySet().iterator(); it.hasNext();) {
+ Entry<String, String> entry = it.next();
+ String oddEven = odd ? "odd" : "even";
+ odd = !odd;
+ pw.println("<tr class=\"" + oddEven + " ui-state-default\">");
+
+ pw.println("<td>" + entry.getKey() + "</td>");
+ pw.println("<td>" + entry.getValue() + "</td>");
+
+ pw.println("</tr>");
+ }
+ pw.println("</tbody>");
+ pw.println("</table>");
+ }
+ }
+
+ protected ResourceResolver getResourceResolver() throws LoginException {
+ return resourceResolverFactory.getAdministrativeResourceResolver(null);
+ }
+
+ /**
+ * Render the overview of the entire topology
+ */
+ private void renderOverview(final PrintWriter pw, final TopologyView topology) {
+ pw.println("<p class=\"statline ui-state-highlight\">Configuration</p>");
+ pw.println("<br/>");
+ pw.print("<a href=\"${appRoot}/configMgr/org.apache.sling.discovery.oak.Config\">Configure Discovery.Oak Service</a>");
+ pw.println("<br/>");
+ pw.println("<br/>");
+ final String changing;
+ if (!topology.isCurrent()) {
+ changing = " <b><i>CHANGING!</i> (the view is no longer current!)</b>";
+ } else {
+ changing = "";
+ }
+ pw.println("<p class=\"statline ui-state-highlight\">Topology"+changing+"</p>");
+ pw.println("<div class=\"ui-widget-header ui-corner-top buttonGroup\" style=\"height: 15px;\">");
+ pw.println("<span style=\"float: left; margin-left: 1em;\">Instances in the topology</span>");
+ pw.println("</div>");
+ pw.println("<table class=\"adapters nicetable ui-widget tablesorter\">");
+ pw.println("<thead>");
+ pw.println("<tr>");
+ pw.println("<th class=\"header ui-widget-header\">Sling id (click for properties)</th>");
+ pw.println("<th class=\"header ui-widget-header\">ClusterView id</th>");
+ pw.println("<th class=\"header ui-widget-header\">Local instance</th>");
+ pw.println("<th class=\"header ui-widget-header\">Leader instance</th>");
+ pw.println("<th class=\"header ui-widget-header\">In local cluster</th>");
+ pw.println("<th class=\"header ui-widget-header\">Announced by instance</th>");
+ pw.println("</tr>");
+ pw.println("</thead>");
+ pw.println("<tbody>");
+
+ Set<ClusterView> clusters = topology.getClusterViews();
+ ClusterView myCluster = topology.getLocalInstance().getClusterView();
+ boolean odd = true;
+ renderCluster(pw, myCluster, myCluster, odd, topology.isCurrent());
+
+ for (Iterator<ClusterView> it = clusters.iterator(); it.hasNext();) {
+ ClusterView clusterView = it.next();
+ if (clusterView.equals(myCluster)) {
+ // skip - I already rendered that
+ continue;
+ }
+ odd = !odd;
+ renderCluster(pw, clusterView, myCluster, odd, topology.isCurrent());
+ }
+
+ pw.println("</tbody>");
+ pw.println("</table>");
+
+ pw.println("<br/>");
+ pw.println("<br/>");
+ pw.println("<p class=\"statline ui-state-highlight\">Connectors</p>");
+ listIncomingTopologyConnectors(pw);
+ listOutgoingTopologyConnectors(pw);
+ pw.println("<br/>");
+
+ ResourceResolver resourceResolver = null;
+ pw.println("<p class=\"statline ui-state-highlight\">Discovery-Lite Descriptor History</p>");
+ pw.println("<pre>");
+ for (String discoLiteHistoryEntry : discoveryLiteHistory) {
+ pw.println(discoLiteHistoryEntry);
+ }
+ pw.println("</pre>");
+ pw.println("<br/>");
+ pw.println("<p class=\"statline ui-state-highlight\">Current Discovery-Lite Descriptor Value</p>");
+ pw.println("<pre>");
+ try{
+ resourceResolver = getResourceResolver();
+ DiscoveryLiteDescriptor descriptor = DiscoveryLiteDescriptor.getDescriptorFrom(resourceResolver);
+ final String logEntry = getCurrentDateFormatted() + ": " + descriptor.getDescriptorStr();
+ pw.println(logEntry);
+ } catch(Exception e) {
+ logger.error("renderOverview: Exception: "+e, e);
+ pw.println("Got exception trying to get repository descriptor: "+e);
+ } finally {
+ if (resourceResolver != null) {
+ resourceResolver.close();
+ }
+ }
+ pw.println("</pre>");
+ pw.println("<br/>");
+
+ pw.println("<p class=\"statline ui-state-highlight\">Topology Change History</p>");
+ pw.println("<pre>");
+ for (Iterator<String> it = topologyLog
+ .iterator(); it.hasNext();) {
+ String aLogEntry = it.next();
+ pw.println(aLogEntry);
+ }
+ pw.println("</pre>");
+ pw.println("<br/>");
+ pw.println("<p class=\"statline ui-state-highlight\">Property Change History</p>");
+ pw.println("<pre>");
+ for (Iterator<String> it = propertyChangeLog
+ .iterator(); it.hasNext();) {
+ String aLogEntry = it.next();
+ pw.println(aLogEntry);
+ }
+ pw.println("</pre>");
+ pw.println("</br>");
+ }
+
+ /**
+ * Render a particular cluster (into table rows)
+ */
+ private void renderCluster(final PrintWriter pw, final ClusterView renderCluster, final ClusterView localCluster, final boolean odd, final boolean current) {
+ final Collection<Announcement> announcements = announcementRegistry.listAnnouncementsInSameCluster(localCluster);
+
+ for (Iterator<InstanceDescription> it = renderCluster.getInstances()
+ .iterator(); it.hasNext();) {
+ final InstanceDescription instanceDescription = it.next();
+ final boolean inLocalCluster = renderCluster == localCluster;
+ Announcement parentAnnouncement = null;
+ for (Iterator<Announcement> it2 = announcements.iterator(); it2
+ .hasNext();) {
+ Announcement announcement = it2.next();
+ for (Iterator<InstanceDescription> it3 = announcement
+ .listInstances().iterator(); it3.hasNext();) {
+ InstanceDescription announcedInstance = it3.next();
+ if (announcedInstance.getSlingId().equals(
+ instanceDescription.getSlingId())) {
+ parentAnnouncement = announcement;
+ break;
+ }
+ }
+ }
+
+ final String oddEven = odd ? "odd" : "even";
+
+ if (current && (inLocalCluster || (parentAnnouncement!=null))) {
+ pw.println("<tr class=\"" + oddEven + " ui-state-default\">");
+ } else {
+ pw.println("<tr class=\"" + oddEven + " ui-state-error\">");
+ }
+ final boolean isLocal = instanceDescription.isLocal();
+ final String slingId = instanceDescription.getSlingId();
+ pw.print("<td>");
+ if ( isLocal) {
+ pw.print("<b>");
+ }
+ pw.print("<a href=\"");
+ pw.print(this.getLabel());
+ pw.print('/');
+ pw.print(slingId);
+ pw.print("\">");
+ pw.print(slingId);
+ pw.print("</a>");
+ if ( isLocal) {
+ pw.print("</b>");
+ }
+ pw.println("</td>");
+ pw.println("<td>"
+ + (instanceDescription.getClusterView() == null ? "null"
+ : instanceDescription.getClusterView().getId())
+ + "</td>");
+ pw.println("<td>" + (isLocal ? "<b>true</b>" : "false") + "</td>");
+ pw.println("<td>"
+ + (instanceDescription.isLeader() ? "<b>true</b>" : "false")
+ + "</td>");
+ if (inLocalCluster) {
+ pw.println("<td>local</td>");
+ pw.println("<td>n/a</td>");
+ } else {
+ pw.println("<td>remote</td>");
+ if (parentAnnouncement != null) {
+ pw.println("<td>" + parentAnnouncement.getOwnerId()
+ + "</td>");
+ } else {
+ pw.println("<td><b>(changing)</b></td>");
+ }
+ }
+ pw.println("</tr>");
+ }
+
+ }
+
+ /**
+ * Render the outgoing topology connectors - including the header-div and table
+ */
+ private void listOutgoingTopologyConnectors(final PrintWriter pw) {
+ boolean odd = false;
+ pw.println("<div class=\"ui-widget-header ui-corner-top buttonGroup\" style=\"height: 15px;\">");
+ pw.println("<span style=\"float: left; margin-left: 1em;\">Outgoing topology connectors</span>");
+ pw.println("</div>");
+ pw.println("<table class=\"adapters nicetable ui-widget tablesorter\">");
+ pw.println("<thead>");
+ pw.println("<tr>");
+ pw.println("<th class=\"header ui-widget-header\">Connector url</th>");
+ pw.println("<th class=\"header ui-widget-header\">Connected to slingId</th>");
+ pw.println("<th class=\"header ui-widget-header\">Connector status</th>");
+ pw.println("<th class=\"header ui-widget-header\">Last heartbeat </th>");
+ pw.println("<th class=\"header ui-widget-header\">Next heartbeat </th>");
+ pw.println("<th class=\"header ui-widget-header\">Request encoding </th>");
+ pw.println("<th class=\"header ui-widget-header\">Response encoding </th>");
+ // pw.println("<th class=\"header ui-widget-header\">Fallback connector urls</th>");
+ pw.println("</tr>");
+ pw.println("</thead>");
+ pw.println("<tbody>");
+
+ Collection<TopologyConnectorClientInformation> outgoingConnections = connectorRegistry
+ .listOutgoingConnectors();
+ for (Iterator<TopologyConnectorClientInformation> it = outgoingConnections
+ .iterator(); it.hasNext();) {
+ TopologyConnectorClientInformation topologyConnectorClient = it
+ .next();
+ final String oddEven = odd ? "odd" : "even";
+ odd = !odd;
+ final String remoteSlingId = topologyConnectorClient.getRemoteSlingId();
+ final boolean isConnected = topologyConnectorClient.isConnected() && remoteSlingId != null;
+ final boolean autoStopped = topologyConnectorClient.isAutoStopped();
+ final boolean representsLoop = topologyConnectorClient.representsLoop();
+ if (isConnected || autoStopped || representsLoop) {
+ pw.println("<tr class=\"" + oddEven + " ui-state-default\">");
+ } else {
+ pw.println("<tr class=\"" + oddEven + " ui-state-error\">");
+ }
+ pw.println("<td>"
+ + topologyConnectorClient.getConnectorUrl().toString()
+ + "</td>");
+ if (autoStopped) {
+ pw.println("<td><b>auto-stopped</b></td>");
+ pw.println("<td><b>auto-stopped due to local-loop</b></td>");
+ } else if (isConnected && !representsLoop) {
+ pw.println("<td>" + remoteSlingId + "</td>");
+ pw.println("<td>ok, in use</td>");
+ } else if (representsLoop) {
+ pw.println("<td>" + remoteSlingId + "</td>");
+ pw.println("<td>ok, unused (loop or duplicate): standby</td>");
+ } else {
+ final int statusCode = topologyConnectorClient.getStatusCode();
+ final String statusDetails = topologyConnectorClient.getStatusDetails();
+ final String tooltipText;
+ switch(statusCode) {
+ case HttpServletResponse.SC_UNAUTHORIZED:
+ tooltipText = HttpServletResponse.SC_UNAUTHORIZED +
+ ": possible setup issue of discovery.oak on target instance, or wrong URL";
+ break;
+ case HttpServletResponse.SC_NOT_FOUND:
+ tooltipText = HttpServletResponse.SC_NOT_FOUND +
+ ": possible white list rejection by target instance";
+ break;
+ case -1:
+ tooltipText = "-1: check error log. possible connection refused.";
+ break;
+ default:
+ tooltipText = null;
+ }
+ final String tooltip = tooltipText==null ? "" : (" title=\""+tooltipText+"\"");
+ pw.println("<td><b>not connected</b></td>");
+ pw.println("<td"+tooltip+"><b>not ok (HTTP Status-Code: "+statusCode+", "+statusDetails+")</b></td>");
+ }
+ pw.println("<td>"+beautifiedTimeDiff(topologyConnectorClient.getLastPingSent())+"</td>");
+ pw.println("<td>"+beautifiedDueTime(topologyConnectorClient.getNextPingDue())+"</td>");
+ pw.println("<td>"+topologyConnectorClient.getLastRequestEncoding()+"</td>");
+ pw.println("<td>"+topologyConnectorClient.getLastResponseEncoding()+"</td>");
+ // //TODO fallback urls are not yet implemented!
+ // String fallbackConnectorUrls;
+ // List<String> urls = topologyConnectorClient
+ // .listFallbackConnectorUrls();
+ // if (urls == null || urls.size() == 0) {
+ // fallbackConnectorUrls = "n/a";
+ // } else {
+ // fallbackConnectorUrls = "";
+ // for (Iterator<String> it2 = urls.iterator(); it2.hasNext();) {
+ // String aFallbackConnectorUrl = it2.next();
+ // fallbackConnectorUrls = fallbackConnectorUrls
+ // + aFallbackConnectorUrl + "<br/>";
+ // }
+ // }
+ // pw.println("<td>" + fallbackConnectorUrls + "</td>");
+ }
+
+ pw.println("</tbody>");
+ pw.println("</table>");
+ }
+
+ private String beautifiedDueTime(long secondsDue) {
+ if (secondsDue<-1) {
+ return "overdue";
+ } else if (secondsDue<=0) {
+ return "now-ish";
+ } else if (secondsDue==1) {
+ return "in 1 second";
+ } else {
+ int minsDue = (int) (secondsDue / 60);
+ if (minsDue<5) {
+ return "in "+secondsDue+" seconds";
+ } else {
+ return "in "+minsDue+" minutes";
+ }
+ }
+ }
+
+ private String beautifiedTimeDiff(long heartbeatTime) {
+ final long diff = System.currentTimeMillis() - heartbeatTime;
+ long seconds = (diff/1000);
+ if (heartbeatTime<=0) {
+ return "n/a";
+ } else if (seconds==0) {
+ return diff+" millis ago";
+ } else if (seconds==1) {
+ return "1 second ago";
+ } else if (seconds<300) {
+ // then print seconds
+ return seconds+" seconds ago";
+ } else {
+ // then print seconds
+ return (seconds/60)+" minute ago";
+ }
+ }
+
+ /**
+ * Render the incoming topology connectors - including the header-div and table
+ */
+ private void listIncomingTopologyConnectors(final PrintWriter pw) {
+ boolean odd = false;
+ pw.println("<div class=\"ui-widget-header ui-corner-top buttonGroup\" style=\"height: 15px;\">");
+ pw.println("<span style=\"float: left; margin-left: 1em;\">Incoming topology connectors</span>");
+ pw.println("</div>");
+ pw.println("<table class=\"adapters nicetable ui-widget tablesorter\">");
+ pw.println("<thead>");
+ pw.println("<tr>");
+ pw.println("<th class=\"header ui-widget-header\">Owner slingId</th>");
+ pw.println("<th class=\"header ui-widget-header\">Server info</th>");
+ pw.println("<th class=\"header ui-widget-header\">Last heartbeat</th>");
+ pw.println("<th class=\"header ui-widget-header\">Timeout</th>");
+ pw.println("</tr>");
+ pw.println("</thead>");
+ pw.println("<tbody>");
+
+ Collection<CachedAnnouncement> incomingConnections = announcementRegistry.listLocalIncomingAnnouncements();
+ for (Iterator<CachedAnnouncement> it = incomingConnections.iterator(); it
+ .hasNext();) {
+ CachedAnnouncement incomingCachedAnnouncement = it.next();
+ Announcement incomingAnnouncement = incomingCachedAnnouncement.getAnnouncement();
+ String oddEven = odd ? "odd" : "even";
+ odd = !odd;
+
+ pw.println("<tr class=\"" + oddEven + " ui-state-default\">");
+ pw.println("<td>" + incomingAnnouncement.getOwnerId() + "</td>");
+ if (incomingAnnouncement.getServerInfo() != null) {
+ pw.println("<td>" + incomingAnnouncement.getServerInfo()
+ + "</td>");
+ } else {
+ pw.println("<td><i>n/a</i></td>");
+ }
+ pw.println("<td>"+beautifiedTimeDiff(incomingCachedAnnouncement.getLastPing())+"</td>");
+ pw.println("<td>"+beautifiedDueTime(incomingCachedAnnouncement.getSecondsUntilTimeout())+"</td>");
+
+ pw.println("</tr>");
+ }
+
+ pw.println("</tbody>");
+ pw.println("</table>");
+ pw.println("<br/>");
+ pw.println("<br/>");
+ }
+
+ /**
+ * keep a truncated history of the log events for information purpose (to be shown in the webconsole)
+ */
+ public void handleTopologyEvent(final TopologyEvent event) {
+ if (event.getType() == Type.PROPERTIES_CHANGED) {
+ this.currentView = event.getNewView();
+
+ Set<InstanceDescription> newInstances = event.getNewView()
+ .getInstances();
+ StringBuilder sb = new StringBuilder();
+ for (Iterator<InstanceDescription> it = newInstances.iterator(); it
+ .hasNext();) {
+ final InstanceDescription newInstanceDescription = it.next();
+ InstanceDescription oldInstanceDescription = findInstance(
+ event.getOldView(), newInstanceDescription.getSlingId());
+ if (oldInstanceDescription == null) {
+ logger.error("handleTopologyEvent: got a property changed but did not find instance "
+ + newInstanceDescription
+ + " in oldview.. event="
+ + event);
+ addEventLog(event.getType(), event.getType().toString());
+ return;
+ }
+
+ Map<String, String> oldProps = oldInstanceDescription
+ .getProperties();
+ Map<String, String> newProps = newInstanceDescription
+ .getProperties();
+ StringBuilder diff = diff(oldProps, newProps);
+ if (diff.length() > 0) {
+ if (sb.length() != 0) {
+ sb.append(", ");
+ }
+ sb.append("on instance "
+ + newInstanceDescription.getSlingId() + (newInstanceDescription.isLeader() ? " [isLeader]" : "")
+ + ": " + diff);
+ }
+ }
+
+ addEventLog(event.getType(), sb.toString());
+ } else if (event.getType() == Type.TOPOLOGY_INIT) {
+ this.currentView = event.getNewView();
+ StringBuilder details = new StringBuilder();
+ for (Iterator<InstanceDescription> it = event.getNewView()
+ .getInstances().iterator(); it.hasNext();) {
+ InstanceDescription newInstance = it.next();
+ if (details.length() != 0) {
+ details.append(", ");
+ }
+ details.append(newInstance.getSlingId());
+ if (newInstance.isLeader()) {
+ details.append(" [isLeader]");
+ }
+ }
+ addEventLog(event.getType(),
+ "view: " + shortViewInfo(event.getNewView()) + ". "
+ + details);
+ } else if (event.getType() == Type.TOPOLOGY_CHANGING) {
+ this.currentView = event.getOldView();
+ addEventLog(event.getType(),
+ "old view: " + shortViewInfo(event.getOldView()));
+ } else {
+ this.currentView = event.getNewView();
+ if (event.getOldView() == null) {
+ addEventLog(event.getType(),
+ "new view: " + shortViewInfo(event.getNewView()));
+ } else {
+ StringBuilder details = new StringBuilder();
+ for (Iterator<InstanceDescription> it = event.getNewView()
+ .getInstances().iterator(); it.hasNext();) {
+ InstanceDescription newInstance = it.next();
+ if (findInstance(event.getOldView(),
+ newInstance.getSlingId()) == null) {
+ if (details.length() != 0) {
+ details.append(", ");
+ }
+ details.append(newInstance.getSlingId() + " joined");
+ }
+ }
+ for (Iterator<InstanceDescription> it = event.getOldView()
+ .getInstances().iterator(); it.hasNext();) {
+ InstanceDescription oldInstance = it.next();
+ if (findInstance(event.getNewView(),
+ oldInstance.getSlingId()) == null) {
+ if (details.length() != 0) {
+ details.append(", ");
+ }
+ details.append(oldInstance.getSlingId() + " left");
+ }
+ }
+ final InstanceDescription li = event.getNewView().getLocalInstance();
+ if (li!=null) {
+ ClusterView clusterView = li.getClusterView();
+ if (clusterView!=null) {
+ final InstanceDescription leader = clusterView.getLeader();
+ if (leader!=null) {
+ if (details.length() !=0) {
+ details.append(", ");
+ }
+ details.append("[isLeader: "+leader.getSlingId()+"]");
+ }
+ }
+ }
+
+ addEventLog(
+ event.getType(),
+ "old view: " + shortViewInfo(event.getOldView())
+ + ", new view: "
+ + shortViewInfo(event.getNewView()) + ". "
+ + details);
+ }
+ }
+ addDiscoveryLiteHistoryEntry();
+ }
+
+ /**
+ * find a particular instance in the topology
+ */
+ private InstanceDescription findInstance(final TopologyView view,
+ final String slingId) {
+ Set<InstanceDescription> foundInstances = view
+ .findInstances(new InstanceFilter() {
+
+ public boolean accept(InstanceDescription instance) {
+ return instance.getSlingId().equals(slingId);
+ }
+ });
+ if (foundInstances.size() == 1) {
+ return foundInstances.iterator().next();
+ } else {
+ return null;
+ }
+ }
+
+ /**
+ * add a log entry and truncate the log entries if necessary
+ */
+ private synchronized void addEventLog(final Type type, final String info) {
+ final String logEntry = getCurrentDateFormatted() + ": " + type + ". " + info;
+
+ if (type == Type.PROPERTIES_CHANGED) {
+ propertyChangeLog.add(logEntry);
+ while (propertyChangeLog.size() > 12) {
+ propertyChangeLog.remove(0);
+ }
+ } else {
+ topologyLog.add(logEntry);
+ while (topologyLog.size() > 12) {
+ topologyLog.remove(0);
+ }
+ }
+
+ }
+
+ /**
+ * add a discoveryLite descriptor entry to the history, truncating if necessary
+ */
+ private synchronized void addDiscoveryLiteHistoryEntry() {
+ ResourceResolver resourceResolver = null;
+ try{
+ resourceResolver = getResourceResolver();
+ DiscoveryLiteDescriptor descriptor =
+ DiscoveryLiteDescriptor.getDescriptorFrom(resourceResolver);
+
+ final String logEntry = getCurrentDateFormatted() + ": " + descriptor.getDescriptorStr();
+
+ discoveryLiteHistory.add(logEntry);
+ while (discoveryLiteHistory.size() > 12) {
+ discoveryLiteHistory.remove(0);
+ }
+ } catch(Exception e) {
+ logger.error("addDiscoveryLiteHistoryEntry: Exception: "+e, e);
+ } finally {
+ if (resourceResolver != null) {
+ resourceResolver.close();
+ }
+ }
+
+ }
+
+ /**
+ * compile a short information string of the topology, including
+ * number of clusters and instances
+ */
+ private String shortViewInfo(final TopologyView view) {
+ int clusters = view.getClusterViews().size();
+ int instances = view.getInstances().size();
+ return ((clusters == 1) ? "1 cluster" : clusters + " clusters") + ", "
+ + ((instances == 1) ? "1 instance" : instances + " instances");
+ }
+
+ /**
+ * calculate the difference between two sets of properties
+ */
+ private StringBuilder diff(final Map<String, String> oldProps,
+ final Map<String, String> newProps) {
+ final Set<String> oldKeys = new HashSet<String>(oldProps.keySet());
+ final Set<String> newKeys = new HashSet<String>(newProps.keySet());
+
+ StringBuilder sb = new StringBuilder();
+
+ for (Iterator<String> it = oldKeys.iterator(); it.hasNext();) {
+ String oldKey = it.next();
+ if (newKeys.contains(oldKey)) {
+ if (oldProps.get(oldKey).equals(newProps.get(oldKey))) {
+ // perfect
+ } else {
+ sb.append("(" + oldKey + " changed from "
+ + oldProps.get(oldKey) + " to "
+ + newProps.get(oldKey) + ")");
+ }
+ newKeys.remove(oldKey);
+ } else {
+ sb.append("(" + oldKey + " was removed)");
+ }
+ it.remove();
+ }
+ for (Iterator<String> it = newKeys.iterator(); it.hasNext();) {
+ String newKey = it.next();
+ sb.append("(" + newKey + " was added)");
+ }
+
+ return sb;
+ }
+
+ public void printConfiguration( final PrintWriter pw ) {
+ final TopologyView topology = this.currentView;
+
+ pw.println(TITLE);
+ pw.println("---------------------------------------");
+ pw.println();
+ if ( topology == null ) {
+ pw.println("No topology available yet!");
+ return;
+ }
+ pw.print("Topology");
+ if (!topology.isCurrent()) {
+ pw.print(" CHANGING! (the view is no longer current!)");
+ }
+ pw.println();
+ pw.println();
+
+ final Set<ClusterView> clusters = topology.getClusterViews();
+ final ClusterView myCluster = topology.getLocalInstance().getClusterView();
+ printCluster(pw, myCluster, myCluster);
+
+ for (Iterator<ClusterView> it = clusters.iterator(); it.hasNext();) {
+ ClusterView clusterView = it.next();
+ if (clusterView.equals(myCluster)) {
+ // skip - I already rendered that
+ continue;
+ }
+ printCluster(pw, clusterView, myCluster);
+ }
+
+ pw.println();
+ pw.println();
+
+ final Collection<CachedAnnouncement> incomingConnections = announcementRegistry.listLocalIncomingAnnouncements();
+ if ( incomingConnections.size() > 0 ) {
+ pw.println("Incoming topology connectors");
+ pw.println("---------------------------------------");
+
+ for(final CachedAnnouncement incomingCachedAnnouncement : incomingConnections) {
+ Announcement incomingAnnouncement = incomingCachedAnnouncement.getAnnouncement();
+ pw.print("Owner Sling Id : ");
+ pw.print(incomingAnnouncement.getOwnerId());
+ pw.println();
+ if (incomingAnnouncement.getServerInfo() != null) {
+ pw.print("Server Info : ");
+ pw.print(incomingAnnouncement.getServerInfo());
+ pw.println();
+ }
+ pw.println("Last heartbeat received : "+beautifiedTimeDiff(incomingCachedAnnouncement.getLastPing()));
+ pw.println("Timeout : "+beautifiedDueTime(incomingCachedAnnouncement.getSecondsUntilTimeout()));
+
+ pw.println();
+ }
+ pw.println();
+ pw.println();
+ }
+
+ final Collection<TopologyConnectorClientInformation> outgoingConnections = connectorRegistry.listOutgoingConnectors();
+ if ( outgoingConnections.size() > 0 ) {
+ pw.println("Outgoing topology connectors");
+ pw.println("---------------------------------------");
+
+ for(final TopologyConnectorClientInformation topologyConnectorClient : outgoingConnections) {
+ final String remoteSlingId = topologyConnectorClient.getRemoteSlingId();
+ final boolean autoStopped = topologyConnectorClient.isAutoStopped();
+ final boolean isConnected = topologyConnectorClient.isConnected() && remoteSlingId != null;
+ pw.print("Connector URL : ");
+ pw.print(topologyConnectorClient.getConnectorUrl());
+ pw.println();
+
+ if (autoStopped) {
+ pw.println("Conncted to Sling Id : auto-stopped");
+ pw.println("Connector status : auto-stopped due to local-loop");
+ } else if (isConnected && !topologyConnectorClient.representsLoop()) {
+ pw.print("Connected to Sling Id : ");
+ pw.println(remoteSlingId);
+ pw.println("Connector status : ok, in use");
+ } else if (topologyConnectorClient.representsLoop()) {
+ pw.print("Connected to Sling Id : ");
+ pw.println(remoteSlingId);
+ pw.println("Connector status : ok, unused (loop or duplicate): standby");
+ } else {
+ final int statusCode = topologyConnectorClient.getStatusCode();
+ final String statusDetails = topologyConnectorClient.getStatusDetails();
+ final String tooltipText;
+ switch(statusCode) {
+ case HttpServletResponse.SC_UNAUTHORIZED:
+ tooltipText = HttpServletResponse.SC_UNAUTHORIZED +
+ ": possible setup issue of discovery.oak on target instance, or wrong URL";
+ break;
+ case HttpServletResponse.SC_NOT_FOUND:
+ tooltipText = HttpServletResponse.SC_NOT_FOUND +
+ ": possible white list rejection by target instance";
+ break;
+ case -1:
+ tooltipText = "-1: check error log. possible connection refused.";
+ break;
+ default:
+ tooltipText = null;
+ }
+ pw.println("Connected to Sling Id : not connected");
+ pw.print("Connector status : not ok");
+ if ( tooltipText != null ) {
+ pw.print(" (");
+ pw.print(tooltipText);
+ pw.print(")");
+ }
+ pw.print(" (HTTP StatusCode: "+statusCode+", "+statusDetails+")");
+ pw.println();
+ pw.println("Last heartbeat sent : "+beautifiedTimeDiff(topologyConnectorClient.getLastPingSent()));
+ pw.println("Next heartbeat due : "+beautifiedDueTime(topologyConnectorClient.getNextPingDue()));
+ }
+ pw.println();
+ }
+ pw.println();
+ pw.println();
+ }
+
+ ResourceResolver resourceResolver = null;
+ pw.println("Discovery-Lite Descriptor History");
+ pw.println("---------------------------------------");
+ for (String discoLiteHistoryEntry : discoveryLiteHistory) {
+ pw.println(discoLiteHistoryEntry);
+ }
+ pw.println();
+ pw.println();
+ pw.println("Current Discovery-Lite Descriptor Value");
+ pw.println("---------------------------------------");
+ try{
+ resourceResolver = getResourceResolver();
+ DiscoveryLiteDescriptor descriptor = DiscoveryLiteDescriptor.getDescriptorFrom(resourceResolver);
+ final String logEntry = getCurrentDateFormatted() + ": " + descriptor.getDescriptorStr();
+ pw.println(logEntry);
+ pw.println();
+ pw.println();
+ } catch(Exception e) {
+ logger.error("renderOverview: Exception: "+e, e);
+ pw.println("Got exception trying to get repository descriptor: "+e);
+ pw.println();
+ pw.println();
+ } finally {
+ if (resourceResolver != null) {
+ resourceResolver.close();
+ }
+ }
+
+ if ( topologyLog.size() > 0 ) {
+ pw.println("Topology Change History");
+ pw.println("---------------------------------------");
+ for(final String aLogEntry : topologyLog) {
+ pw.println(aLogEntry);
+ }
+ pw.println();
+ pw.println();
+ }
+
+ if ( propertyChangeLog.size() > 0 ) {
+ pw.println("Property Change History");
+ pw.println("---------------------------------------");
+ for(final String aLogEntry : propertyChangeLog) {
+ pw.println(aLogEntry);
+ }
+ pw.println();
+ }
+ }
+
+ private String getCurrentDateFormatted() {
+ return sdf.format(Calendar.getInstance().getTime());
+ }
+
+ /**
+ * Render a particular cluster
+ */
+ private void printCluster(final PrintWriter pw, final ClusterView renderCluster, final ClusterView localCluster) {
+ final Collection<Announcement> announcements = announcementRegistry.listAnnouncementsInSameCluster(localCluster);
+
+ for(final InstanceDescription instanceDescription : renderCluster.getInstances() ) {
+ final boolean inLocalCluster = renderCluster == localCluster;
+ Announcement parentAnnouncement = null;
+ for (Iterator<Announcement> it2 = announcements.iterator(); it2
+ .hasNext();) {
+ Announcement announcement = it2.next();
+ for (Iterator<InstanceDescription> it3 = announcement
+ .listInstances().iterator(); it3.hasNext();) {
+ InstanceDescription announcedInstance = it3.next();
+ if (announcedInstance.getSlingId().equals(
+ instanceDescription.getSlingId())) {
+ parentAnnouncement = announcement;
+ break;
+ }
+ }
+ }
+
+ final boolean isLocal = instanceDescription.isLocal();
+ final String slingId = instanceDescription.getSlingId();
+
+ pw.print("Sling ID : ");
+ pw.print(slingId);
+ pw.println();
+ pw.print("Cluster View ID : ");
+ pw.print(instanceDescription.getClusterView() == null ? "null"
+ : instanceDescription.getClusterView().getId());
+ pw.println();
+ pw.print("Local instance : ");
+ pw.print(isLocal);
+ pw.println();
+ pw.print("Leader instance : ");
+ pw.print(instanceDescription.isLeader());
+ pw.println();
+ pw.print("In local cluster : ");
+ if (inLocalCluster) {
+ pw.print("local");
+ } else {
+ pw.print("remote");
+ }
+ pw.println();
+ pw.print("Announced by : ");
+ if (inLocalCluster) {
+ pw.print("n/a");
+ } else {
+ if (parentAnnouncement != null) {
+ pw.print(parentAnnouncement.getOwnerId());
+ } else {
+ pw.print("(changing)");
+ }
+ }
+ pw.println();
+
+ pw.println("Properties:");
+ for(final Map.Entry<String, String> entry : instanceDescription.getProperties().entrySet()) {
+ pw.print("- ");
+ pw.print(entry.getKey());
+ pw.print(" : ");
+ pw.print(entry.getValue());
+ pw.println();
+ }
+ pw.println();
+ pw.println();
+ }
+ }
+}
diff --git a/src/main/java/org/apache/sling/discovery/oak/cluster/OakClusterViewService.java b/src/main/java/org/apache/sling/discovery/oak/cluster/OakClusterViewService.java
new file mode 100644
index 0000000..ce55cc2
--- /dev/null
+++ b/src/main/java/org/apache/sling/discovery/oak/cluster/OakClusterViewService.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.discovery.oak.cluster;
+
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.Service;
+import org.apache.sling.api.resource.LoginException;
+import org.apache.sling.api.resource.Resource;
+import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.api.resource.ResourceResolverFactory;
+import org.apache.sling.api.resource.ValueMap;
+import org.apache.sling.discovery.InstanceDescription;
+import org.apache.sling.discovery.base.commons.ClusterViewService;
+import org.apache.sling.discovery.base.commons.UndefinedClusterViewException;
+import org.apache.sling.discovery.base.commons.UndefinedClusterViewException.Reason;
+import org.apache.sling.discovery.commons.providers.DefaultInstanceDescription;
+import org.apache.sling.discovery.commons.providers.spi.LocalClusterView;
+import org.apache.sling.discovery.commons.providers.spi.base.DiscoveryLiteDescriptor;
+import org.apache.sling.discovery.commons.providers.spi.base.IdMapService;
+import org.apache.sling.discovery.oak.Config;
+import org.apache.sling.settings.SlingSettingsService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Oak-based implementation of the ClusterViewService interface.
+ */
+@Component
+@Service(value = ClusterViewService.class)
+public class OakClusterViewService implements ClusterViewService {
+
+ private final Logger logger = LoggerFactory.getLogger(this.getClass());
+
+ @Reference
+ private SlingSettingsService settingsService;
+
+ @Reference
+ private ResourceResolverFactory resourceResolverFactory;
+
+ @Reference
+ private Config config;
+
+ @Reference
+ private IdMapService idMapService;
+
+ public static OakClusterViewService testConstructor(SlingSettingsService settingsService,
+ ResourceResolverFactory resourceResolverFactory,
+ IdMapService idMapService,
+ Config config) {
+ OakClusterViewService service = new OakClusterViewService();
+ service.settingsService = settingsService;
+ service.resourceResolverFactory = resourceResolverFactory;
+ service.config = config;
+ service.idMapService = idMapService;
+ return service;
+ }
+
+ public String getSlingId() {
+ if (settingsService==null) {
+ return null;
+ }
+ return settingsService.getSlingId();
+ }
+
+ protected ResourceResolver getResourceResolver() throws LoginException {
+ return resourceResolverFactory.getAdministrativeResourceResolver(null);
+ }
+
+ public LocalClusterView getLocalClusterView() throws UndefinedClusterViewException {
+ logger.trace("getLocalClusterView: start");
+ ResourceResolver resourceResolver = null;
+ try{
+ resourceResolver = getResourceResolver();
+ DiscoveryLiteDescriptor descriptor =
+ DiscoveryLiteDescriptor.getDescriptorFrom(resourceResolver);
+ return asClusterView(descriptor, resourceResolver);
+ } catch (UndefinedClusterViewException e) {
+ logger.info("getLocalClusterView: undefined clusterView: "+e.getReason()+" - "+e.getMessage());
+ throw e;
+ } catch (Exception e) {
+ logger.error("getLocalClusterView: repository exception: "+e, e);
+ throw new UndefinedClusterViewException(Reason.REPOSITORY_EXCEPTION, "Exception while processing descriptor: "+e);
+ } finally {
+ logger.trace("getLocalClusterView: end");
+ if (resourceResolver!=null) {
+ resourceResolver.close();
+ }
+ }
+ }
+
+ private LocalClusterView asClusterView(DiscoveryLiteDescriptor descriptor, ResourceResolver resourceResolver) throws Exception {
+ if (descriptor == null) {
+ throw new IllegalArgumentException("descriptor must not be null");
+ }
+ if (resourceResolver==null) {
+ throw new IllegalArgumentException("resourceResolver must not be null");
+ }
+ logger.trace("asClusterView: start");
+ String clusterViewId = descriptor.getViewId();
+ String localClusterSyncTokenId = descriptor.getViewId()+"_"+descriptor.getSeqNum();
+ if (!descriptor.isFinal()) {
+ throw new UndefinedClusterViewException(Reason.NO_ESTABLISHED_VIEW, "descriptor is not yet final: "+descriptor);
+ }
+ LocalClusterView cluster = new LocalClusterView(clusterViewId, localClusterSyncTokenId);
+ long me = descriptor.getMyId();
+ int[] activeIds = descriptor.getActiveIds();
+ if (activeIds==null || activeIds.length==0) {
+ throw new UndefinedClusterViewException(Reason.NO_ESTABLISHED_VIEW, "Descriptor contained no active ids: "+descriptor.getDescriptorStr());
+ }
+ // convert int[] to List<Integer>
+ //TODO: could use Guava's Ints class here..
+ List<Integer> activeIdsList = new LinkedList<Integer>();
+ for (Integer integer : activeIds) {
+ activeIdsList.add(integer);
+ }
+
+ // step 1: sort activeIds by their leaderElectionId
+ // serves two purposes: pos[0] is then leader
+ // and the rest are properly sorted within the cluster
+ final Map<Integer, String> leaderElectionIds = new HashMap<Integer, String>();
+ for (Integer id : activeIdsList) {
+ String slingId = idMapService.toSlingId(id, resourceResolver);
+ if (slingId == null) {
+ throw new UndefinedClusterViewException(Reason.NO_ESTABLISHED_VIEW,
+ "no slingId mapped for clusterNodeId="+id);
+ }
+ String leaderElectionId = getLeaderElectionId(resourceResolver,
+ slingId);
+ leaderElectionIds.put(id, leaderElectionId);
+ }
+
+ Collections.sort(activeIdsList, new Comparator<Integer>() {
+
+ @Override
+ public int compare(Integer arg0, Integer arg1) {
+ return leaderElectionIds.get(arg0)
+ .compareTo(leaderElectionIds.get(arg1));
+ }
+ });
+
+ for(int i=0; i<activeIdsList.size(); i++) {
+ int id = activeIdsList.get(i);
+ boolean isLeader = i==0; // thx to sorting above [0] is leader indeed
+ boolean isOwn = id==me;
+ String slingId = idMapService.toSlingId(id, resourceResolver);
+ if (slingId==null) {
+ logger.info("asClusterView: cannot resolve oak-clusterNodeId {} to a slingId", id);
+ throw new Exception("Cannot resolve oak-clusterNodeId "+id+" to a slingId");
+ }
+ Map<String, String> properties = readProperties(slingId, resourceResolver);
+ // create a new instance (adds itself to the cluster in the constructor)
+ new DefaultInstanceDescription(cluster, isLeader, isOwn, slingId, properties);
+ }
+ logger.trace("asClusterView: returning {}", cluster);
+ InstanceDescription local = cluster.getLocalInstance();
+ if (local != null) {
+ return cluster;
+ } else {
+ logger.info("getClusterView: the local instance ("+getSlingId()+") is currently not included in the existing established view! "
+ + "This is normal at startup. At other times is pseudo-network-partitioning is an indicator for repository/network-delays or clocks-out-of-sync (SLING-3432). "
+ + "(increasing the heartbeatTimeout can help as a workaround too) "
+ + "The local instance will stay in TOPOLOGY_CHANGING or pre _INIT mode until a new vote was successful.");
+ throw new UndefinedClusterViewException(Reason.ISOLATED_FROM_TOPOLOGY,
+ "established view does not include local instance - isolated");
+ }
+ }
+
+ private String getLeaderElectionId(ResourceResolver resourceResolver, String slingId) {
+ if (slingId==null) {
+ throw new IllegalStateException("slingId must not be null");
+ }
+ final String myClusterNodePath = config.getClusterInstancesPath()+"/"+slingId;
+ ValueMap resourceMap = resourceResolver.getResource(myClusterNodePath)
+ .adaptTo(ValueMap.class);
+ String result = resourceMap.get("leaderElectionId", String.class);
+ return result;
+ }
+
+ private Map<String, String> readProperties(String slingId, ResourceResolver resourceResolver) {
+ Resource res = resourceResolver.getResource(
+ config.getClusterInstancesPath() + "/"
+ + slingId);
+ final Map<String, String> props = new HashMap<String, String>();
+ if (res != null) {
+ final Resource propertiesChild = res.getChild("properties");
+ if (propertiesChild != null) {
+ final ValueMap properties = propertiesChild.adaptTo(ValueMap.class);
+ if (properties != null) {
+ for (Iterator<String> it = properties.keySet().iterator(); it
+ .hasNext();) {
+ String key = it.next();
+ if (!key.equals("jcr:primaryType")) {
+ props.put(key, properties.get(key, String.class));
+ }
+ }
+ }
+ }
+ }
+ return props;
+ }
+
+}
diff --git a/src/main/java/org/apache/sling/discovery/oak/pinger/OakViewChecker.java b/src/main/java/org/apache/sling/discovery/oak/pinger/OakViewChecker.java
new file mode 100644
index 0000000..cf50d26
--- /dev/null
+++ b/src/main/java/org/apache/sling/discovery/oak/pinger/OakViewChecker.java
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.discovery.oak.pinger;
+
+import java.util.Calendar;
+import java.util.UUID;
+
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.ReferencePolicy;
+import org.apache.felix.scr.annotations.Service;
+import org.apache.sling.api.resource.LoginException;
+import org.apache.sling.api.resource.ModifiableValueMap;
+import org.apache.sling.api.resource.PersistenceException;
+import org.apache.sling.api.resource.Resource;
+import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.api.resource.ResourceResolverFactory;
+import org.apache.sling.commons.scheduler.Scheduler;
+import org.apache.sling.discovery.base.commons.BaseViewChecker;
+import org.apache.sling.discovery.base.connectors.BaseConfig;
+import org.apache.sling.discovery.base.connectors.announcement.AnnouncementRegistry;
+import org.apache.sling.discovery.base.connectors.ping.ConnectorRegistry;
+import org.apache.sling.discovery.commons.providers.util.ResourceHelper;
+import org.apache.sling.discovery.oak.Config;
+import org.apache.sling.discovery.oak.OakDiscoveryService;
+import org.apache.sling.launchpad.api.StartupListener;
+import org.apache.sling.launchpad.api.StartupMode;
+import org.apache.sling.settings.SlingSettingsService;
+import org.osgi.service.http.HttpService;
+
+/**
+ * The OakViewChecker is taking care of checking the oak discovery-lite
+ * descriptor when checking the local cluster view and passing that
+ * on to the ViewStateManager which will then detect whether there was
+ * any change or not. Unlike discovery.impl's HeartbeatHandler this one
+ * does not store any heartbeats in the repository anymore.
+ * <p>
+ * Remote heartbeats are POSTs to remote TopologyConnectorServlets using
+ * discovery.base
+ */
+@Component
+@Service(value = { OakViewChecker.class, StartupListener.class })
+@Reference(referenceInterface=HttpService.class,
+ cardinality=ReferenceCardinality.OPTIONAL_MULTIPLE,
+ policy=ReferencePolicy.DYNAMIC)
+public class OakViewChecker extends BaseViewChecker {
+
+ @Reference
+ protected SlingSettingsService slingSettingsService;
+
+ @Reference
+ protected ResourceResolverFactory resourceResolverFactory;
+
+ @Reference
+ protected ConnectorRegistry connectorRegistry;
+
+ @Reference
+ protected AnnouncementRegistry announcementRegistry;
+
+ @Reference
+ protected Scheduler scheduler;
+
+ @Reference
+ private Config config;
+
+ private OakDiscoveryService discoveryService;
+
+ /** for testing only **/
+ public static OakViewChecker testConstructor(
+ SlingSettingsService slingSettingsService,
+ ResourceResolverFactory resourceResolverFactory,
+ ConnectorRegistry connectorRegistry,
+ AnnouncementRegistry announcementRegistry,
+ Scheduler scheduler,
+ Config config) {
+ OakViewChecker pinger = new OakViewChecker();
+ pinger.slingSettingsService = slingSettingsService;
+ pinger.resourceResolverFactory = resourceResolverFactory;
+ pinger.connectorRegistry = connectorRegistry;
+ pinger.announcementRegistry = announcementRegistry;
+ pinger.scheduler = scheduler;
+ pinger.config = config;
+ return pinger;
+ }
+
+ @Override
+ protected AnnouncementRegistry getAnnouncementRegistry() {
+ return announcementRegistry;
+ }
+
+ @Override
+ protected BaseConfig getConnectorConfig() {
+ return config;
+ }
+
+ @Override
+ protected ConnectorRegistry getConnectorRegistry() {
+ return connectorRegistry;
+ }
+
+ @Override
+ protected ResourceResolverFactory getResourceResolverFactory() {
+ return resourceResolverFactory;
+ }
+
+ @Override
+ protected Scheduler getScheduler() {
+ return scheduler;
+ }
+
+ @Override
+ protected SlingSettingsService getSlingSettingsService() {
+ return slingSettingsService;
+ }
+
+ @Override
+ protected void doActivate() {
+ // on activate the resetLeaderElectionId is set to true to ensure that
+ // the 'leaderElectionId' property is reset on next heartbeat issuance.
+ // the idea being that a node which leaves the cluster should not
+ // become leader on next join - and by resetting the leaderElectionId
+ // to the current time, this is ensured.
+ runtimeId = UUID.randomUUID().toString();
+
+ logger.info("doActivate: activated with runtimeId: {}, slingId: {}", runtimeId, slingId);
+
+ resetLeaderElectionId();
+ }
+
+ @Override
+ public void startupFinished(StartupMode mode) {
+ super.startupFinished(mode);
+
+ synchronized(lock) {
+ if (activated) {
+ // only reset if activated
+ resetLeaderElectionId();
+ }
+ }
+
+ }
+
+ /**
+ * The initialize method is called by the OakDiscoveryService.activate
+ * as we require the discoveryService (and the discoveryService has
+ * a reference on us - but we cant have circular references in osgi).
+ */
+ public void initialize(final OakDiscoveryService discoveryService) {
+ logger.info("initialize: initializing.");
+ synchronized(lock) {
+ this.discoveryService = discoveryService;
+ issueHeartbeat();
+ }
+
+ // start the (less frequent) periodic job that does the
+ // connector pings and checks the connector/topology view
+ try {
+ final long interval = config.getConnectorPingInterval();
+ logger.info("initialize: starting periodic connectorPing job for "+slingId+" with interval "+interval+" sec.");
+ scheduler.addPeriodicJob(NAME+".connectorPinger", this,
+ null, interval, false);
+ } catch (Exception e) {
+ logger.error("activate: Could not start heartbeat runner: " + e, e);
+ }
+
+ // start the (more frequent) periodic job that checks
+ // the discoveryLite descriptor - that can be more frequent
+ // since it is only reading an oak repository descriptor
+ // which is designed to be read very frequently (it caches
+ // the value and only updates it on change, so reading is very cheap)
+ // and because doing this more frequently means that the
+ // reaction time is faster
+ try{
+ final long interval = config.getDiscoveryLiteCheckInterval();
+ logger.info("initialize: starting periodic discoveryLiteCheck job for "+slingId+" with interval "+interval+" sec.");
+ scheduler.addPeriodicJob(NAME+".discoveryLiteCheck", new Runnable() {
+
+ @Override
+ public void run() {
+ discoveryLiteCheck();
+ }
+
+ },
+ null, interval, false);
+ } catch (Exception e) {
+ logger.error("activate: Could not start heartbeat runner: " + e, e);
+ }
+ }
+
+ private void discoveryLiteCheck() {
+ logger.debug("discoveryLiteCheck: start. [for slingId="+slingId+"]");
+ synchronized(lock) {
+ if (!activated) {
+ // SLING:2895: avoid checks if not activated
+ logger.debug("discoveryLiteCheck: not activated yet");
+ return;
+ }
+
+ // check the view
+ // discovery.oak relies on oak's discovery-lite descriptor
+ // to be updated independently in case of cluster view change.
+ // all that we can therefore do here is assume something
+ // might have changed and let discoveryService/viewStateManager
+ // filter out the 99.99% of unchanged cases.
+ discoveryService.handlePotentialTopologyChange();
+ }
+ logger.debug("discoveryLiteCheck: end. [for slingId="+slingId+"]");
+ }
+
+ /** Get or create a ResourceResolver **/
+ private ResourceResolver getResourceResolver() throws LoginException {
+ if (resourceResolverFactory == null) {
+ logger.error("getResourceResolver: resourceResolverFactory is null!");
+ return null;
+ }
+ return resourceResolverFactory.getAdministrativeResourceResolver(null);
+ }
+
+ /** Calcualte the local cluster instance path **/
+ private String getLocalClusterNodePath() {
+ return config.getClusterInstancesPath() + "/" + slingId;
+ }
+
+ /**
+ * Hook that will cause a reset of the leaderElectionId
+ * on next invocation of issueClusterLocalHeartbeat.
+ * @return true if the leaderElectionId was reset - false if that was not
+ * necessary as that happened earlier already and it has not propagated
+ * yet to the ./clusterInstances in the meantime
+ */
+ public boolean resetLeaderElectionId() {
+ ResourceResolver resourceResolver = null;
+ try{
+ final String myClusterNodePath = getLocalClusterNodePath();
+ resourceResolver = getResourceResolver();
+ if (resourceResolver==null) {
+ logger.warn("resetLeaderElectionId: could not login, new leaderElectionId will be calculated upon next heartbeat only!");
+ return false;
+ }
+ String newLeaderElectionId = newLeaderElectionId();
+
+ final Resource resource = ResourceHelper.getOrCreateResource(
+ resourceResolver, myClusterNodePath);
+ final ModifiableValueMap resourceMap = resource.adaptTo(ModifiableValueMap.class);
+
+ resourceMap.put(PROPERTY_ID_RUNTIME, runtimeId);
+ // SLING-4765 : store more infos to be able to be more verbose on duplicate slingId/ghost detection
+ final String slingHomePath = slingSettingsService==null ? "n/a" : slingSettingsService.getSlingHomePath();
+ resourceMap.put(PROPERTY_ID_SLING_HOME_PATH, slingHomePath);
+ final String endpointsAsString = getEndpointsAsString();
+ resourceMap.put(PROPERTY_ID_ENDPOINTS, endpointsAsString);
+
+ Calendar leaderElectionCreatedAt = Calendar.getInstance();
+ resourceMap.put("leaderElectionId", newLeaderElectionId);
+ resourceMap.put("leaderElectionIdCreatedAt", leaderElectionCreatedAt);
+
+ logger.info("resetLeaderElectionId: storing my runtimeId: {}, endpoints: {} and sling home path: {}",
+ new Object[]{runtimeId, endpointsAsString, slingHomePath, newLeaderElectionId, leaderElectionCreatedAt});
+ resourceResolver.commit();
+ } catch (LoginException e) {
+ logger.error("resetLeaderElectionid: could not login: "+e, e);
+ } catch (PersistenceException e) {
+ logger.error("resetLeaderElectionid: got PersistenceException: "+e, e);
+ } finally {
+ if (resourceResolver!=null) {
+ resourceResolver.close();
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Calculate a new leaderElectionId based on the current config and system time
+ */
+ private String newLeaderElectionId() {
+ int maxLongLength = String.valueOf(Long.MAX_VALUE).length();
+ String currentTimeMillisStr = String.format("%0"
+ + maxLongLength + "d", System.currentTimeMillis());
+
+ String prefix = "1";
+
+ final String newLeaderElectionId = prefix + "_"
+ + currentTimeMillisStr + "_" + slingId;
+ return newLeaderElectionId;
+ }
+
+ @Override
+ protected void doCheckView() {
+ super.doCheckView();
+
+ // discovery.oak relies on oak's discovery-lite descriptor
+ // to be updated independently in case of cluster view change.
+ // all that we can therefore do here is assume something
+ // might have changed and let discoveryService/viewStateManager
+ // filter out the 99.99% of unchanged cases.
+ discoveryService.handlePotentialTopologyChange();
+ }
+
+ protected void updateProperties() {
+ if (discoveryService == null) {
+ logger.error("issueHeartbeat: discoveryService is null");
+ } else {
+ discoveryService.updateProperties();
+ }
+ }}
diff --git a/src/main/resources/OSGI-INF/metatype/metatype.properties b/src/main/resources/OSGI-INF/metatype/metatype.properties
new file mode 100644
index 0000000..de9301f
--- /dev/null
+++ b/src/main/resources/OSGI-INF/metatype/metatype.properties
@@ -0,0 +1,121 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+#
+# This file contains localization strings for configuration labels and
+# descriptions as used in the metatype.xml descriptor generated by the
+# the SCR plugin
+config.name=Apache Sling Oak-Based Discovery Service Configuration
+config.description = The configuration of the Oak based discovery service implementation.
+
+connectorPingTimeout.name = Connector Ping timeout (seconds)
+connectorPingTimeout.description = Configure the timeout (in seconds) after which an announcement \
+ sent via a topology connector is considered timed out. Default is 120 seconds.
+
+connectorPingInterval.name = Connector Ping interval (seconds)
+connectorPingInterval.description = Configure the interval (in seconds) according to which the \
+ topology connector pings are exchanged in the topology. Default is 30 seconds.
+
+discoveryLiteCheckInterval.name = Discovery-Lite Check interval (seconds)
+discoveryLiteCheckInterval.description = Configure the interval (in seconds) with which Oak's \
+ discoveryLite descriptor should be checked for changes. Default is 2 seconds. \
+ Note that the timeout value is configured within Oak directly.
+
+minEventDelay.name = Minimal Event Delay (seconds)
+minEventDelay.description = Configure a minimal delay (in seconds) between TOPOLOGY_CHANGING \
+ and TOPOLOGY_CHANGED. Any further changes happening during this delay are accumulated and \
+ combined in the TOPOLOGY_CHANGED after this delay. This helps avoiding event-flooding. \
+ Default is 3 seconds. A negative value or zero disables this delay.
+
+topologyConnectorUrls.name = Topology Connector URLs
+topologyConnectorUrls.description = URLs where to join a topology, e.g. \
+ http://localhost:4502/libs/sling/topology/connector
+
+topologyConnectorWhitelist.name = Topology Connector Whitelist
+topologyConnectorWhitelist.description = List of IPs and/or hostnames which are allowed to \
+ connect to the connector URL. There are four variants here: 1. provide a plain hostname. \
+ 2. provide an IP address. 3. provide a hostname or IP address with wildcards (* or ?). \
+ 4. provide an IP address with a subnet mask, either using the CIDR notation: 1.2.3.4/24 \
+ or an IP address, space, subnet mask: 1.2.3.4 255.255.255.0)
+
+discoveryResourcePath.name = Discovery Resource Path
+discoveryResourcePath.description = Path of resource where to keep discovery information. \
+ The default is /var/discovery/oak.
+
+leaderElectionRepositoryDescriptor.name = Repository Descriptor Name
+leaderElectionRepositoryDescriptor.description = Name of the repository descriptor to be taken \
+ into account for leader election: those instances have preference to become leader which have \
+ the corresponding descriptor value of 'false'.
+
+invertRepositoryDescriptor.name = Invert Repository Descriptor
+invertRepositoryDescriptor.description = Enabling this property allows to invert the \
+ repository descriptor value that is obtained via the configured 'leaderElectionRepositoryDescriptor' \
+ (thus only applies if that is configured). Default is 'false' (don't invert).
+
+autoStopLocalLoopEnabled.name = Auto-Stop Local-Loops
+autoStopLocalLoopEnabled.description = If true, and the discovery.impl detects a local-looping \
+ topology connector, the corresponding topology connector will be automatically stopped. \
+ This is useful to prevent unnecessary loops with eg pre-configured topology connectors.
+
+gzipConnectorRequestsEnabled.name = gzip requests
+gzipConnectorRequestsEnabled.description = If true, the payloads of topology connector requests \
+ will be gzipped. This is advisable on certain connector structures, eg in a tree structure, where \
+ a topology connector announces a large sub-topology. Note that this only works with \
+ the server running discovery.impl 1.0.4 and onwards. Replies are gzipped automatically.
+
+socketConnectTimeout.name = connector's socket.connect() timeout
+socketConnectTimeout.description = Timeout (in seconds!) for the topology connector's \
+ socket.connect()
+
+soTimeout.name = connector's read timeout
+soTimeout.description = Topology connector's socket timeout (SO_TIMEOUT) (in seconds!) which is \
+ the timeout for waiting for data
+
+hmacEnabled.name = Enable Hmac message signatures
+hmacEnabled.description = If true, and the Shared Key is set to the same value on all members of the \
+ topology, the messages will be validated using a HMAC of a digest of the body of the message. \
+ The hmac and message digest are in the HTTP request and response headers. Both requests and responses \
+ are signed.
+
+enableEncryption.name = Enable Message encryption
+enableEncryption.description = If Message HMACs are enabled and there is a shared key set, setting this to \
+ true will encrypt the body of the message using 128 bit AES encryption. Once encrypted you will not be able \
+ debug the messages at the http level.
+
+sharedKey.name = Message shared key.
+sharedKey.description = If message signing and encryption is used, this should be set to the same value \
+ on all members of the same topology. If any member of the topology has a different key it will effectively \
+ be excluded from the topology even if it attempts to send messages to other members of the topology.
+
+hmacSharedKeyTTL.name = Shared Key TTL
+hmacSharedKeyTTL.description = Shared keys for message signatures are derived from the configured shared key. \
+ Each derived key has a lifetime (TTL). Once that time has expired a new key is derived and used for hmac signatures. \
+ This setting, sets the TTL in ms. Keys that are 2 lifetimes old are ignored. Set according to you level of paranoia, \
+ but don't set to less than the greatest possible clock drift between members of the topology. The default is 4 hours. Setting \
+ to a ridiculously low value will increase the turnover of keys. Generating a key takes about 2ms. There is no risk of \
+ memory consumption with low values, only a risk of the topology falling apart due to incorrectly set clocks.
+
+backoffStableFactor.name = Backoff factor for stable connectors
+backoffStableFactor.description = When a topology connector is stable (ie no changes occuring in the announcements sent), \
+ then the heartbeat frequency is lowered, ie the heartbeatInterval for this connector is steadily increased, at maximum by the \
+ backoffStableFactor.
+
+backoffStandbyFactor.name = Backoff factor for standby connectors
+backoffStandbyFactor.description = When a topology connector is in standby mode (ie when it is redundant), the heartbeat \
+ frequency is lowered, ie the heartbeatInterval for this connector is increased , at maximum by the backoffStandbyFactor
diff --git a/src/test/java/org/apache/sling/discovery/oak/OakDiscoveryServiceTest.java b/src/test/java/org/apache/sling/discovery/oak/OakDiscoveryServiceTest.java
new file mode 100644
index 0000000..7affca5
--- /dev/null
+++ b/src/test/java/org/apache/sling/discovery/oak/OakDiscoveryServiceTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.discovery.oak;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.UUID;
+
+import org.apache.sling.discovery.commons.providers.base.DummyListener;
+import org.apache.sling.discovery.commons.providers.spi.base.DescriptorHelper;
+import org.apache.sling.discovery.commons.providers.spi.base.DiscoveryLiteConfig;
+import org.apache.sling.discovery.commons.providers.spi.base.DiscoveryLiteDescriptorBuilder;
+import org.apache.sling.discovery.commons.providers.spi.base.DummySlingSettingsService;
+import org.apache.sling.discovery.commons.providers.spi.base.IdMapService;
+import org.apache.sling.discovery.oak.its.setup.OakVirtualInstanceBuilder;
+import org.junit.Test;
+
+public class OakDiscoveryServiceTest {
+
+ public final class SimpleCommonsConfig implements DiscoveryLiteConfig {
+
+ private long bgIntervalMillis;
+ private long bgTimeoutMillis;
+
+ SimpleCommonsConfig(long bgIntervalMillis, long bgTimeoutMillis) {
+ this.bgIntervalMillis = bgIntervalMillis;
+ this.bgTimeoutMillis = bgTimeoutMillis;
+ }
+
+ @Override
+ public String getSyncTokenPath() {
+ return "/var/synctokens";
+ }
+
+ @Override
+ public String getIdMapPath() {
+ return "/var/idmap";
+ }
+
+ @Override
+ public long getBgTimeoutMillis() {
+ return bgTimeoutMillis;
+ }
+
+ @Override
+ public long getBgIntervalMillis() {
+ return bgIntervalMillis;
+ }
+
+ }
+
+ @Test
+ public void testBindBeforeActivate() throws Exception {
+ OakVirtualInstanceBuilder builder =
+ (OakVirtualInstanceBuilder) new OakVirtualInstanceBuilder()
+ .setDebugName("test")
+ .newRepository("/foo/bar", true);
+ String slingId = UUID.randomUUID().toString();;
+ DiscoveryLiteDescriptorBuilder discoBuilder = new DiscoveryLiteDescriptorBuilder();
+ discoBuilder.id("id").me(1).activeIds(1);
+ // make sure the discovery-lite descriptor is marked as not final
+ // such that the view is not already set before we want it to be
+ discoBuilder.setFinal(false);
+ DescriptorHelper.setDiscoveryLiteDescriptor(builder.getResourceResolverFactory(),
+ discoBuilder);
+ IdMapService idMapService = IdMapService.testConstructor(new SimpleCommonsConfig(1000, -1), new DummySlingSettingsService(slingId), builder.getResourceResolverFactory());
+ assertTrue(idMapService.waitForInit(2000));
+ OakDiscoveryService discoveryService = (OakDiscoveryService) builder.getDiscoverService();
+ assertNotNull(discoveryService);
+ DummyListener listener = new DummyListener();
+ for(int i=0; i<100; i++) {
+ discoveryService.bindTopologyEventListener(listener);
+ discoveryService.unbindTopologyEventListener(listener);
+ }
+ discoveryService.bindTopologyEventListener(listener);
+ assertEquals(0, listener.countEvents());
+ discoveryService.activate(null);
+ assertEquals(0, listener.countEvents());
+ // some more confusion...
+ discoveryService.unbindTopologyEventListener(listener);
+ discoveryService.bindTopologyEventListener(listener);
+ // only set the final flag now - this makes sure that handlePotentialTopologyChange
+ // will actually detect a valid new, different view and send out an event -
+ // exactly as we want to
+ discoBuilder.setFinal(true);
+ DescriptorHelper.setDiscoveryLiteDescriptor(builder.getResourceResolverFactory(),
+ discoBuilder);
+ discoveryService.handlePotentialTopologyChange();
+ assertTrue(discoveryService.getViewStateManager().waitForAsyncEvents(2000));
+ assertEquals(1, listener.countEvents());
+ discoveryService.unbindTopologyEventListener(listener);
+ assertEquals(1, listener.countEvents());
+ discoveryService.bindTopologyEventListener(listener);
+ assertTrue(discoveryService.getViewStateManager().waitForAsyncEvents(2000));
+ assertEquals(2, listener.countEvents()); // should now have gotten an INIT too
+ }
+
+}
diff --git a/src/test/java/org/apache/sling/discovery/oak/its/OakClusterLoadTest.java b/src/test/java/org/apache/sling/discovery/oak/its/OakClusterLoadTest.java
new file mode 100644
index 0000000..5ce0636
--- /dev/null
+++ b/src/test/java/org/apache/sling/discovery/oak/its/OakClusterLoadTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.discovery.oak.its;
+
+import org.apache.sling.discovery.base.its.AbstractClusterLoadTest;
+import org.apache.sling.discovery.base.its.setup.VirtualInstanceBuilder;
+import org.apache.sling.discovery.oak.its.setup.OakVirtualInstanceBuilder;
+
+public class OakClusterLoadTest extends AbstractClusterLoadTest {
+
+ @Override
+ public VirtualInstanceBuilder newBuilder() {
+ return new OakVirtualInstanceBuilder();
+ }
+
+}
diff --git a/src/test/java/org/apache/sling/discovery/oak/its/OakClusterTest.java b/src/test/java/org/apache/sling/discovery/oak/its/OakClusterTest.java
new file mode 100644
index 0000000..1d4e5df
--- /dev/null
+++ b/src/test/java/org/apache/sling/discovery/oak/its/OakClusterTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.discovery.oak.its;
+
+import org.apache.sling.discovery.base.its.AbstractClusterTest;
+import org.apache.sling.discovery.base.its.setup.VirtualInstanceBuilder;
+import org.apache.sling.discovery.oak.its.setup.OakVirtualInstanceBuilder;
+
+public class OakClusterTest extends AbstractClusterTest {
+
+ @Override
+ protected VirtualInstanceBuilder newBuilder() {
+ return new OakVirtualInstanceBuilder();
+ }
+
+}
diff --git a/src/test/java/org/apache/sling/discovery/oak/its/OakSingleInstanceTest.java b/src/test/java/org/apache/sling/discovery/oak/its/OakSingleInstanceTest.java
new file mode 100644
index 0000000..c0e5499
--- /dev/null
+++ b/src/test/java/org/apache/sling/discovery/oak/its/OakSingleInstanceTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.discovery.oak.its;
+
+import org.apache.sling.discovery.base.its.AbstractSingleInstanceTest;
+import org.apache.sling.discovery.base.its.setup.VirtualInstanceBuilder;
+import org.apache.sling.discovery.oak.its.setup.OakVirtualInstanceBuilder;
+
+public class OakSingleInstanceTest extends AbstractSingleInstanceTest {
+
+ @Override
+ protected VirtualInstanceBuilder newBuilder() {
+ return new OakVirtualInstanceBuilder();
+ }
+
+}
diff --git a/src/test/java/org/apache/sling/discovery/oak/its/OakTopologyEventTest.java b/src/test/java/org/apache/sling/discovery/oak/its/OakTopologyEventTest.java
new file mode 100644
index 0000000..3b033a2
--- /dev/null
+++ b/src/test/java/org/apache/sling/discovery/oak/its/OakTopologyEventTest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.discovery.oak.its;
+
+import static org.junit.Assert.assertNotNull;
+
+import org.apache.sling.discovery.TopologyView;
+import org.apache.sling.discovery.base.its.AbstractTopologyEventTest;
+import org.apache.sling.discovery.base.its.setup.VirtualInstanceBuilder;
+import org.apache.sling.discovery.oak.its.setup.OakVirtualInstanceBuilder;
+
+public class OakTopologyEventTest extends AbstractTopologyEventTest {
+
+ @Override
+ public VirtualInstanceBuilder newBuilder() {
+ return new OakVirtualInstanceBuilder();
+ }
+
+ @Override
+ public void assertEarlyAndFirstClusterViewIdMatches(TopologyView earlyTopo, TopologyView secondTopo) {
+ // for the oak discovery-lite variant there's nothing we can assert here
+ // except perhaps that they shouldn't be null..
+ assertNotNull(earlyTopo);
+ assertNotNull(secondTopo);
+ }
+
+}
diff --git a/src/test/java/org/apache/sling/discovery/oak/its/setup/OakTestConfig.java b/src/test/java/org/apache/sling/discovery/oak/its/setup/OakTestConfig.java
new file mode 100644
index 0000000..f84251b
--- /dev/null
+++ b/src/test/java/org/apache/sling/discovery/oak/its/setup/OakTestConfig.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.discovery.oak.its.setup;
+
+import org.apache.sling.discovery.base.its.setup.ModifiableTestBaseConfig;
+import org.apache.sling.discovery.oak.Config;
+
+public class OakTestConfig extends Config implements ModifiableTestBaseConfig {
+
+ public OakTestConfig() {
+ // empty
+ }
+
+ public void setDiscoveryResourcePath(String discoveryResourcePath) {
+ this.discoveryResourcePath = discoveryResourcePath;
+ }
+
+ public void setMinEventDelay(int minEventDelay) {
+ this.minEventDelay = minEventDelay;
+ }
+
+ public void addTopologyConnectorWhitelistEntry(String whitelistEntry) {
+ if (topologyConnectorWhitelist==null) {
+ topologyConnectorWhitelist = new String[] {whitelistEntry};
+ } else {
+ String[] list = topologyConnectorWhitelist;
+ topologyConnectorWhitelist = new String[list.length+1];
+ System.arraycopy(list, 0, topologyConnectorWhitelist, 0, list.length);
+ topologyConnectorWhitelist[topologyConnectorWhitelist.length-1] = whitelistEntry;
+ }
+ }
+
+ @Override
+ public int getBackoffStableFactor() {
+ return 1;
+ }
+
+ @Override
+ public int getBackoffStandbyFactor() {
+ return 1;
+ }
+
+ public void setConnectorInterval(long connectorInterval) {
+ this.connectorPingInterval = connectorInterval;
+ }
+
+ public void setConnectorTimeout(long connectorTimeout) {
+ this.connectorPingTimeout = connectorTimeout;
+ }
+
+ @Override
+ public void setViewCheckTimeout(int viewCheckTimeout) {
+ setConnectorTimeout(viewCheckTimeout);
+ }
+
+ @Override
+ public void setViewCheckInterval(int viewCheckInterval) {
+ setConnectorInterval(viewCheckInterval);
+ }
+
+ public long getViewCheckerTimeout() {
+ return connectorPingTimeout;
+ }
+}
\ No newline at end of file
diff --git a/src/test/java/org/apache/sling/discovery/oak/its/setup/OakVirtualInstanceBuilder.java b/src/test/java/org/apache/sling/discovery/oak/its/setup/OakVirtualInstanceBuilder.java
new file mode 100644
index 0000000..2080c0e
--- /dev/null
+++ b/src/test/java/org/apache/sling/discovery/oak/its/setup/OakVirtualInstanceBuilder.java
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.discovery.oak.its.setup;
+
+import static org.junit.Assert.fail;
+
+import javax.jcr.Session;
+
+import org.apache.jackrabbit.oak.plugins.memory.MemoryNodeStore;
+import org.apache.jackrabbit.oak.spi.state.NodeStore;
+import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.discovery.base.commons.BaseDiscoveryService;
+import org.apache.sling.discovery.base.commons.ClusterViewService;
+import org.apache.sling.discovery.base.commons.ViewChecker;
+import org.apache.sling.discovery.base.its.setup.ModifiableTestBaseConfig;
+import org.apache.sling.discovery.base.its.setup.VirtualInstance;
+import org.apache.sling.discovery.base.its.setup.VirtualInstanceBuilder;
+import org.apache.sling.discovery.base.its.setup.mock.MockFactory;
+import org.apache.sling.discovery.commons.providers.spi.base.IdMapService;
+import org.apache.sling.discovery.commons.providers.spi.base.OakSyncTokenConsistencyService;
+import org.apache.sling.discovery.commons.providers.spi.base.RepositoryTestHelper;
+import org.apache.sling.discovery.oak.OakDiscoveryService;
+import org.apache.sling.discovery.oak.cluster.OakClusterViewService;
+import org.apache.sling.discovery.oak.pinger.OakViewChecker;
+import org.apache.sling.jcr.api.SlingRepository;
+import org.osgi.service.component.ComponentContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import junitx.util.PrivateAccessor;
+
+public class OakVirtualInstanceBuilder extends VirtualInstanceBuilder {
+
+ NodeStore nodeStore;
+ private String path;
+ private OakTestConfig config;
+ private IdMapService idMapService;
+ private OakViewChecker oakViewChecker;
+ private SimulatedLeaseCollection leaseCollection;
+ private OakSyncTokenConsistencyService consistencyService;
+
+ @Override
+ public VirtualInstanceBuilder createNewRepository() throws Exception {
+ nodeStore = new MemoryNodeStore();
+ SlingRepository repository = RepositoryTestHelper.newOakRepository(nodeStore);
+ factory = MockFactory.mockResourceResolverFactory(repository);
+ leaseCollection = new SimulatedLeaseCollection();
+ return this;
+ }
+
+ @Override
+ public VirtualInstanceBuilder useRepositoryOf(VirtualInstanceBuilder other) throws Exception {
+ if (!(other instanceof OakVirtualInstanceBuilder)) {
+ throw new IllegalArgumentException("other must be of type OakVirtualInstanceBuilder but is: "+other);
+ }
+ OakVirtualInstanceBuilder otherOakbuilder = (OakVirtualInstanceBuilder)other;
+ nodeStore = otherOakbuilder.nodeStore;
+ SlingRepository repository = RepositoryTestHelper.newOakRepository(nodeStore);
+ factory = MockFactory.mockResourceResolverFactory(repository);
+ leaseCollection = otherOakbuilder.leaseCollection;
+ hookedToBuilder = other;
+ ownRepository = false;
+ return this;
+ }
+
+ @Override
+ public VirtualInstanceBuilder setPath(String path) {
+ this.path = path;
+ return this;
+ }
+
+ @Override
+ public Object[] getAdditionalServices(VirtualInstance instance) throws Exception {
+ return null;
+ }
+
+ public IdMapService getIdMapService() {
+ if (idMapService==null) {
+ idMapService = createIdMapService();
+ }
+ return idMapService;
+ }
+
+ private IdMapService createIdMapService() {
+ return IdMapService.testConstructor(getConfig(), getSlingSettingsService(), getResourceResolverFactory());
+ }
+
+ @Override
+ protected ClusterViewService createClusterViewService() {
+ return OakClusterViewService.testConstructor(getSlingSettingsService(), getResourceResolverFactory(), getIdMapService(), getConfig());
+ }
+
+ OakTestConfig getConfig() {
+ if (config==null) {
+ config = createConfig();
+ }
+ return config;
+ }
+
+ @Override
+ public ModifiableTestBaseConfig getConnectorConfig() {
+ return getConfig();
+ }
+
+ private OakTestConfig createConfig() {
+ OakTestConfig c = new OakTestConfig();
+ c.setDiscoveryResourcePath(path);
+ return c;
+ }
+
+ @Override
+ protected ViewChecker createViewChecker() throws Exception {
+ getOakViewChecker();
+ return new ViewChecker() {
+
+ private final Logger logger = LoggerFactory.getLogger(getClass());
+
+ private SimulatedLease lease = new SimulatedLease(getResourceResolverFactory(), leaseCollection, getSlingId());
+
+ protected void activate(ComponentContext c) throws Throwable {
+ OakViewChecker pinger = getOakViewChecker();
+ PrivateAccessor.invoke(pinger, "activate", new Class[] {ComponentContext.class}, new Object[] {c});
+ }
+
+ @Override
+ public void checkView() {
+ try {
+ lease.updateDescriptor(getConfig());
+ } catch (Exception e) {
+ logger.error("run: could not update lease: "+e);
+ }
+ }
+
+ public void run() {
+ heartbeatAndCheckView();
+ }
+
+ @Override
+ public void heartbeatAndCheckView() {
+// next step is try to simulate the logic
+// where no heartbeat means no descriptor yet
+// one heartbeat means i'm visible for others
+// as soon as I see others the descriptor is updated
+ try {
+ lease.updateLeaseAndDescriptor(getConfig());
+ } catch (Exception e) {
+ logger.error("run: could not update lease: "+e, e);
+ }
+ try{
+ getOakViewChecker().run();
+ } catch(Exception e) {
+ logger.error("run: could not ping: "+e, e);
+ }
+ if (!getIdMapService().isInitialized()) {
+ if (!getIdMapService().waitForInit(1500)) {
+ fail("init didnt work");
+ }
+ }
+ }
+ };
+ }
+
+ private OakViewChecker getOakViewChecker() throws Exception {
+ if (oakViewChecker==null) {
+ oakViewChecker = createOakViewChecker() ;
+ }
+ return oakViewChecker;
+ }
+
+ private OakViewChecker createOakViewChecker() throws Exception {
+ return OakViewChecker.testConstructor(getSlingSettingsService(), getResourceResolverFactory(), getConnectorRegistry(), getAnnouncementRegistry(), getScheduler(), getConfig());
+ }
+
+ private OakSyncTokenConsistencyService getConsistencyService() throws Exception {
+ if (consistencyService == null) {
+ consistencyService = createConsistencyService();
+ }
+ return consistencyService;
+ }
+
+ private OakSyncTokenConsistencyService createConsistencyService() {
+ return OakSyncTokenConsistencyService.testConstructorAndActivate(getConfig(), getIdMapService(), getSlingSettingsService(), getResourceResolverFactory());
+ }
+
+ @Override
+ protected BaseDiscoveryService createDiscoveryService() throws Exception {
+ return OakDiscoveryService.testConstructor(
+ getSlingSettingsService(),
+ getAnnouncementRegistry(),
+ getConnectorRegistry(),
+ getClusterViewService(),
+ getConfig(),
+ getOakViewChecker(),
+ getScheduler(),
+ getIdMapService(),
+ getConsistencyService(),
+ getResourceResolverFactory());
+ }
+
+ @Override
+ public VirtualInstance build() throws Exception {
+ if (path==null) {
+ if (ownRepository) {
+ setPath("/var/discovery/impl/");
+ getConfig().setDiscoveryResourcePath("/var/discovery/impl/");
+ } else {
+ OakVirtualInstanceBuilder other = (OakVirtualInstanceBuilder) hookedToBuilder;
+ this.path = other.path;
+ getConfig().setDiscoveryResourcePath(other.path);
+ }
+ }
+ if (path==null) {
+ throw new IllegalStateException("no path set");
+ }
+ if (!path.startsWith("/")) {
+ throw new IllegalStateException("path must start with /: "+path);
+ }
+ if (!path.endsWith("/")) {
+ throw new IllegalStateException("path must end with /: "+path);
+ }
+ VirtualInstance result = new VirtualInstance(this) {
+
+ };
+ return result;
+ }
+
+ @Override
+ protected void resetRepo() throws Exception {
+ leaseCollection.reset();
+ ResourceResolver rr = null;
+ Session l = null;
+ try {
+ rr = factory.getAdministrativeResourceResolver(null);
+ l = rr.adaptTo(Session.class);
+ l.removeItem("/var");
+ l.save();
+ l.logout();
+ } catch (Exception e) {
+ l.refresh(false);
+ l.logout();
+ }
+ }
+}
diff --git a/src/test/java/org/apache/sling/discovery/oak/its/setup/SimulatedLease.java b/src/test/java/org/apache/sling/discovery/oak/its/setup/SimulatedLease.java
new file mode 100644
index 0000000..aa49818
--- /dev/null
+++ b/src/test/java/org/apache/sling/discovery/oak/its/setup/SimulatedLease.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.discovery.oak.its.setup;
+
+import org.apache.sling.api.resource.ResourceResolverFactory;
+import org.apache.sling.discovery.commons.providers.spi.base.DescriptorHelper;
+import org.apache.sling.discovery.commons.providers.spi.base.DiscoveryLiteDescriptorBuilder;
+
+public class SimulatedLease {
+
+ private final SimulatedLeaseCollection collection;
+ private final ResourceResolverFactory factory;
+ private final String slingId;
+
+ public SimulatedLease(ResourceResolverFactory factory,
+ SimulatedLeaseCollection collection,
+ String slingId) {
+ this.factory = factory;
+ this.collection = collection;
+ collection.hooked(this);
+ this.slingId = slingId;
+ }
+
+ @Override
+ public String toString() {
+ return "a SimulatedLease[slingId="+slingId+"]";
+ }
+
+ public String getSlingId() {
+ return slingId;
+ }
+
+ public void updateDescriptor(OakTestConfig config) throws Exception {
+ DiscoveryLiteDescriptorBuilder builder = collection.getDescriptorFor(this, config);
+ DescriptorHelper.setDiscoveryLiteDescriptor(factory, builder);
+ }
+
+ public void updateLeaseAndDescriptor(OakTestConfig config) throws Exception {
+ DiscoveryLiteDescriptorBuilder builder = collection.updateAndGetDescriptorFor(this, config);
+ DescriptorHelper.setDiscoveryLiteDescriptor(factory, builder);
+ }
+
+}
diff --git a/src/test/java/org/apache/sling/discovery/oak/its/setup/SimulatedLeaseCollection.java b/src/test/java/org/apache/sling/discovery/oak/its/setup/SimulatedLeaseCollection.java
new file mode 100644
index 0000000..eed9568
--- /dev/null
+++ b/src/test/java/org/apache/sling/discovery/oak/its/setup/SimulatedLeaseCollection.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.discovery.oak.its.setup;
+
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.sling.discovery.commons.providers.spi.base.DiscoveryLiteDescriptorBuilder;
+import org.apache.sling.discovery.oak.Config;
+
+public class SimulatedLeaseCollection {
+
+ private final Map<String,Long> leaseUpdates =
+ new HashMap<String, Long>();
+
+ private final Map<String,Integer> clusterNodeIds =
+ new HashMap<String, Integer>();
+
+ private int highestId = 0;
+
+ private final String viewId = UUID.randomUUID().toString();
+
+ List<SimulatedLease> leases = new LinkedList<SimulatedLease>();
+
+ public SimulatedLeaseCollection() {
+ // empty
+ }
+
+ public synchronized void hooked(SimulatedLease lease) {
+ leases.add(lease);
+ }
+
+ public synchronized DiscoveryLiteDescriptorBuilder getDescriptorFor(SimulatedLease simulatedLease, OakTestConfig config) {
+ return doUpdateAndGet(simulatedLease, config, false);
+ }
+
+ public synchronized DiscoveryLiteDescriptorBuilder updateAndGetDescriptorFor(SimulatedLease simulatedLease, OakTestConfig config) {
+ return doUpdateAndGet(simulatedLease, config, true);
+ }
+
+ private DiscoveryLiteDescriptorBuilder doUpdateAndGet(SimulatedLease simulatedLease, OakTestConfig config, boolean updateLease) {
+ int clusterNodeId = getClusterNodeId(simulatedLease.getSlingId());
+ if (updateLease) {
+ leaseUpdates.put(simulatedLease.getSlingId(), System.currentTimeMillis());
+ }
+ DiscoveryLiteDescriptorBuilder discoBuilder =
+ new DiscoveryLiteDescriptorBuilder();
+ discoBuilder.me(clusterNodeId);
+ discoBuilder.id(viewId);
+ discoBuilder.setFinal(true);
+ List<Integer> actives = new LinkedList<Integer>();
+ List<Integer> inactives = new LinkedList<Integer>();
+ for (Map.Entry<String, Long> entry : leaseUpdates.entrySet()) {
+ int id = getClusterNodeId(entry.getKey());
+ if (isTimedout(entry.getValue(), config)) {
+ inactives.add(id);
+ } else {
+ actives.add(id);
+ }
+ }
+ discoBuilder.activeIds(actives.toArray(new Integer[0]));
+ discoBuilder.inactiveIds(inactives.toArray(new Integer[0]));
+ return discoBuilder;
+ }
+
+ private boolean isTimedout(Long lastHeartbeat, OakTestConfig config) {
+ return System.currentTimeMillis() > lastHeartbeat + (1000 * config.getViewCheckerTimeout());
+ }
+
+ private int getClusterNodeId(String slingId) {
+ Integer id = clusterNodeIds.get(slingId);
+ if (id==null) {
+ id = ++highestId;
+ clusterNodeIds.put(slingId, id);
+ }
+ return id;
+ }
+
+ public void reset() {
+ clusterNodeIds.clear();
+ }
+
+}
diff --git a/src/test/resources/log4j.properties b/src/test/resources/log4j.properties
new file mode 100644
index 0000000..7db291c
--- /dev/null
+++ b/src/test/resources/log4j.properties
@@ -0,0 +1,26 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+log4j.rootLogger=INFO, stdout
+
+log4j.logger.org.apache.jackrabbit.core.TransientRepository=WARN
+#log4j.logger.org.apache.sling.discovery.impl=DEBUG
+
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+#log4j.appender.stdout.layout.ConversionPattern=%d{dd.MM.yyyy HH:mm:ss} *%-5p* [%t] %c{1}: %m (%F, line %L)\n
+log4j.appender.stdout.layout.ConversionPattern=%d{dd.MM.yyyy HH:mm:ss} *%-5p* [%t] %c{1}: %m\n
--
To stop receiving notification emails like this one, please contact
"commits@sling.apache.org" <co...@sling.apache.org>.