You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by st...@apache.org on 2015/10/20 16:12:34 UTC
svn commit: r1709601 [10/11] - in /sling/trunk/bundles/extensions/discovery:
base/ base/src/ base/src/main/ base/src/main/java/ base/src/main/java/org/
base/src/main/java/org/apache/ base/src/main/java/org/apache/sling/
base/src/main/java/org/apache/sl...
Modified: sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/spi/impl/TestOakSyncTokenConsistencyService.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/spi/impl/TestOakSyncTokenConsistencyService.java?rev=1709601&r1=1709600&r2=1709601&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/spi/impl/TestOakSyncTokenConsistencyService.java (original)
+++ sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/spi/impl/TestOakSyncTokenConsistencyService.java Tue Oct 20 14:12:31 2015
@@ -19,25 +19,17 @@
package org.apache.sling.discovery.commons.providers.spi.impl;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
+import static org.junit.Assert.assertTrue;
-import java.lang.reflect.Method;
import java.util.UUID;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
-import javax.jcr.Repository;
-import javax.jcr.Session;
-
-import org.apache.jackrabbit.commons.SimpleValueFactory;
import org.apache.jackrabbit.oak.plugins.memory.MemoryNodeStore;
-import org.apache.jackrabbit.oak.util.GenericDescriptors;
-import org.apache.sling.api.resource.ResourceResolver;
import org.apache.sling.api.resource.ResourceResolverFactory;
-import org.apache.sling.commons.json.JSONException;
+import org.apache.sling.discovery.commons.providers.DummyTopologyView;
import org.apache.sling.discovery.commons.providers.ViewStateManager;
-import org.apache.sling.discovery.commons.providers.impl.Listener;
-import org.apache.sling.discovery.commons.providers.impl.SimpleTopologyView;
+import org.apache.sling.discovery.commons.providers.impl.DummyListener;
import org.apache.sling.discovery.commons.providers.impl.TestHelper;
import org.apache.sling.discovery.commons.providers.impl.ViewStateManagerFactory;
import org.apache.sling.jcr.api.SlingRepository;
@@ -47,46 +39,86 @@ import org.junit.Test;
public class TestOakSyncTokenConsistencyService {
+ private static final String SYNCTOKEN_PATH = "/var/discovery/commons/synctokens";
+
+ private static final String IDMAP_PATH = "/var/discovery/commons/idmap";
+
+ public final class SimpleCommonsConfig implements DiscoveryLiteConfig {
+
+ private long bgIntervalMillis;
+ private long bgTimeoutMillis;
+
+ SimpleCommonsConfig() {
+ this(1000, -1); // defaults
+ }
+
+ SimpleCommonsConfig(long bgIntervalMillis, long bgTimeoutMillis) {
+ this.bgIntervalMillis = bgIntervalMillis;
+ this.bgTimeoutMillis = bgTimeoutMillis;
+ }
+
+ @Override
+ public String getSyncTokenPath() {
+ return SYNCTOKEN_PATH;
+ }
+
+ @Override
+ public String getIdMapPath() {
+ return IDMAP_PATH;
+ }
+
+ @Override
+ public long getBgTimeoutMillis() {
+ return bgTimeoutMillis;
+ }
+
+ @Override
+ public long getBgIntervalMillis() {
+ return bgIntervalMillis;
+ }
+
+ }
+
ResourceResolverFactory factory1;
ResourceResolverFactory factory2;
private SlingRepository repository1;
private SlingRepository repository2;
private MemoryNodeStore memoryNS;
+ private IdMapService idMapService1;
+ private String slingId1;
@Before
public void setup() throws Exception {
- MockFactory.resetRepo();
+ RepositoryTestHelper.resetRepo();
memoryNS = new MemoryNodeStore();
- repository1 = RepositoryHelper.newOakRepository(memoryNS);
-// repository1 = MultipleRepositoriesSupport.newRepository("target/repo1");
- RepositoryHelper.initSlingNodeTypes(repository1);
- repository2 = RepositoryHelper.newOakRepository(memoryNS);
-// repository2 = MultipleRepositoriesSupport.newRepository("target/repo2");
-// MultipleRepositoriesSupport.initSlingNodeTypes(repository2);
- factory1 = MockFactory.mockResourceResolverFactory(repository1);
- factory2 = MockFactory.mockResourceResolverFactory(repository2);
+ repository1 = RepositoryTestHelper.newOakRepository(memoryNS);
+ RepositoryTestHelper.initSlingNodeTypes(repository1);
+ repository2 = RepositoryTestHelper.newOakRepository(memoryNS);
+ factory1 = RepositoryTestHelper.mockResourceResolverFactory(repository1);
+ factory2 = RepositoryTestHelper.mockResourceResolverFactory(repository2);
+ slingId1 = UUID.randomUUID().toString();
+ idMapService1 = IdMapService.testConstructor(new SimpleCommonsConfig(), new DummySlingSettingsService(slingId1), factory1);
}
@After
public void tearDown() throws Exception {
if (repository1!=null) {
- RepositoryHelper.stopRepository(repository1);
+ RepositoryTestHelper.stopRepository(repository1);
repository1 = null;
}
if (repository2!=null) {
- RepositoryHelper.stopRepository(repository2);
+ RepositoryTestHelper.stopRepository(repository2);
repository2 = null;
}
}
@Test
public void testOneNode() throws Exception {
- String slingId1 = UUID.randomUUID().toString();
- SimpleTopologyView one = TestHelper.newView(true, slingId1, slingId1, slingId1);
+ DummyTopologyView one = TestHelper.newView(true, slingId1, slingId1, slingId1);
Lock lock = new ReentrantLock();
- OakSyncTokenConsistencyService cs = new OakSyncTokenConsistencyService(factory1, slingId1, -1, -1);
+ OakSyncTokenConsistencyService cs = OakSyncTokenConsistencyService.testConstructorAndActivate(new SimpleCommonsConfig(), idMapService1, new DummySlingSettingsService(slingId1), factory1);
ViewStateManager vsm = ViewStateManagerFactory.newViewStateManager(lock, cs);
- Listener l = new Listener();
+ DummyListener l = new DummyListener();
assertEquals(0, l.countEvents());
vsm.bind(l);
cs.triggerBackgroundCheck();
@@ -98,106 +130,67 @@ public class TestOakSyncTokenConsistency
cs.triggerBackgroundCheck();
assertEquals(0, l.countEvents());
cs.triggerBackgroundCheck();
- setDiscoveryLiteDescriptor(factory1, new DiscoLite().me(1).seq(1).activeIds(1));
+ DescriptorHelper.setDiscoveryLiteDescriptor(factory1, new DiscoveryLiteDescriptorBuilder().me(1).seq(1).activeIds(1).setFinal(true));
+ assertTrue(idMapService1.waitForInit(2000));
cs.triggerBackgroundCheck();
+ assertTrue(vsm.waitForAsyncEvents(1000));
assertEquals(1, l.countEvents());
}
@Test
public void testTwoNodesOneLeaving() throws Exception {
- String slingId1 = UUID.randomUUID().toString();
String slingId2 = UUID.randomUUID().toString();
- SimpleTopologyView two1 = TestHelper.newView(true, slingId1, slingId1, slingId1, slingId2);
+ DummyTopologyView two1 = TestHelper.newView(true, slingId1, slingId1, slingId1, slingId2);
Lock lock1 = new ReentrantLock();
- OakSyncTokenConsistencyService cs1 = new OakSyncTokenConsistencyService(factory1, slingId1, -1, -1);
+ OakSyncTokenConsistencyService cs1 = OakSyncTokenConsistencyService.testConstructorAndActivate(new SimpleCommonsConfig(), idMapService1, new DummySlingSettingsService(slingId1), factory1);
ViewStateManager vsm1 = ViewStateManagerFactory.newViewStateManager(lock1, cs1);
- Listener l = new Listener();
+ DummyListener l = new DummyListener();
vsm1.bind(l);
vsm1.handleActivated();
vsm1.handleNewView(two1);
cs1.triggerBackgroundCheck();
assertEquals(0, l.countEvents());
- setDiscoveryLiteDescriptor(factory1, new DiscoLite().me(1).seq(1).activeIds(1).deactivatingIds(2));
+ DescriptorHelper.setDiscoveryLiteDescriptor(factory1, new DiscoveryLiteDescriptorBuilder().setFinal(true).me(1).seq(1).activeIds(1).deactivatingIds(2));
cs1.triggerBackgroundCheck();
assertEquals(0, l.countEvents());
- setDiscoveryLiteDescriptor(factory1, new DiscoLite().me(1).seq(2).activeIds(1));
+ DescriptorHelper.setDiscoveryLiteDescriptor(factory1, new DiscoveryLiteDescriptorBuilder().setFinal(true).me(1).seq(2).activeIds(1));
cs1.triggerBackgroundCheck();
Lock lock2 = new ReentrantLock();
- OakSyncTokenConsistencyService cs2 = new OakSyncTokenConsistencyService(factory2, slingId2, -1, -1);
+ IdMapService idMapService2 = IdMapService.testConstructor(
+ new SimpleCommonsConfig(), new DummySlingSettingsService(slingId2), factory2);
+ OakSyncTokenConsistencyService cs2 = OakSyncTokenConsistencyService.testConstructorAndActivate(new SimpleCommonsConfig(), idMapService2, new DummySlingSettingsService(slingId2), factory2);
ViewStateManager vsm2 = ViewStateManagerFactory.newViewStateManager(lock2, cs2);
cs1.triggerBackgroundCheck();
cs2.triggerBackgroundCheck();
assertEquals(0, l.countEvents());
- setDiscoveryLiteDescriptor(factory2, new DiscoLite().me(2).seq(3).activeIds(1, 2));
+ DescriptorHelper.setDiscoveryLiteDescriptor(factory2, new DiscoveryLiteDescriptorBuilder().setFinal(true).me(2).seq(3).activeIds(1, 2));
cs1.triggerBackgroundCheck();
cs2.triggerBackgroundCheck();
assertEquals(0, l.countEvents());
- setDiscoveryLiteDescriptor(factory1, new DiscoLite().me(1).seq(3).activeIds(1, 2));
+ DescriptorHelper.setDiscoveryLiteDescriptor(factory1, new DiscoveryLiteDescriptorBuilder().setFinal(true).me(1).seq(3).activeIds(1, 2));
cs1.triggerBackgroundCheck();
cs2.triggerBackgroundCheck();
assertEquals(0, l.countEvents());
vsm2.handleActivated();
- SimpleTopologyView two2 = TestHelper.newView(two1.getLocalClusterSyncTokenId(), two1.getLocalInstance().getClusterView().getId(), true, slingId1, slingId1, slingId1, slingId2);
+ assertTrue(idMapService1.waitForInit(2000));
+ assertTrue(idMapService2.waitForInit(2000));
+ DummyTopologyView two2 = TestHelper.newView(two1.getLocalClusterSyncTokenId(), two1.getLocalInstance().getClusterView().getId(), true, slingId1, slingId1, slingId1, slingId2);
vsm2.handleNewView(two2);
cs1.triggerBackgroundCheck();
cs2.triggerBackgroundCheck();
assertEquals(1, l.countEvents());
- SimpleTopologyView oneLeaving = two1.clone();
+ DummyTopologyView oneLeaving = two1.clone();
oneLeaving.removeInstance(slingId2);
- setDiscoveryLiteDescriptor(factory1, new DiscoLite().me(1).seq(1).activeIds(1).deactivatingIds(2));
+ DescriptorHelper.setDiscoveryLiteDescriptor(factory1, new DiscoveryLiteDescriptorBuilder().setFinal(true).me(1).seq(1).activeIds(1).deactivatingIds(2));
vsm1.handleNewView(oneLeaving);
cs1.triggerBackgroundCheck();
cs2.triggerBackgroundCheck();
assertEquals(2, l.countEvents());
- setDiscoveryLiteDescriptor(factory1, new DiscoLite().me(1).seq(2).activeIds(1).inactiveIds(2));
+ DescriptorHelper.setDiscoveryLiteDescriptor(factory1, new DiscoveryLiteDescriptorBuilder().setFinal(true).me(1).seq(2).activeIds(1).inactiveIds(2));
cs1.triggerBackgroundCheck();
cs2.triggerBackgroundCheck();
- RepositoryHelper.dumpRepo(factory1);
+ RepositoryTestHelper.dumpRepo(factory1);
assertEquals(3, l.countEvents());
}
- private void setDiscoveryLiteDescriptor(ResourceResolverFactory factory, DiscoLite builder) throws JSONException, Exception {
- setDescriptor(factory, OakSyncTokenConsistencyService.OAK_DISCOVERYLITE_CLUSTERVIEW, builder.asJson());
- }
-
- private void setDescriptor(ResourceResolverFactory factory, String key,
- String value) throws Exception {
- ResourceResolver resourceResolver = factory.getAdministrativeResourceResolver(null);
- try{
- Session session = resourceResolver.adaptTo(Session.class);
- if (session == null) {
- return;
- }
- Repository repo = session.getRepository();
-
- //<hack>
-// Method setDescriptorMethod = repo.getClass().
-// getDeclaredMethod("setDescriptor", String.class, String.class);
-// if (setDescriptorMethod!=null) {
-// setDescriptorMethod.setAccessible(true);
-// setDescriptorMethod.invoke(repo, key, value);
-// } else {
-// fail("could not get 'setDescriptor' method");
-// }
- Method getDescriptorsMethod = repo.getClass().getDeclaredMethod("getDescriptors");
- if (getDescriptorsMethod==null) {
- fail("could not get 'getDescriptors' method");
- } else {
- getDescriptorsMethod.setAccessible(true);
- GenericDescriptors descriptors = (GenericDescriptors) getDescriptorsMethod.invoke(repo);
- SimpleValueFactory valueFactory = new SimpleValueFactory();
- descriptors.put(key, valueFactory.createValue(value), true, true);
- }
- //</hack>
-
- //<verify-hack>
- assertEquals(value, repo.getDescriptor(key));
- //</verify-hack>
- } finally {
- if (resourceResolver!=null) {
- resourceResolver.close();
- }
- }
- }
-
}
Modified: sling/trunk/bundles/extensions/discovery/impl/pom.xml
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/discovery/impl/pom.xml?rev=1709601&r1=1709600&r2=1709601&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/discovery/impl/pom.xml (original)
+++ sling/trunk/bundles/extensions/discovery/impl/pom.xml Tue Oct 20 14:12:31 2015
@@ -132,6 +132,32 @@
<version>1.0.0-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
+ <!-- besides including discovery.commons' normal jar above,
+ for testing a few test helper classes are also reused.
+ in order to achieve that, also adding a test/test-jar dependency: -->
+ <dependency>
+ <groupId>org.apache.sling</groupId>
+ <artifactId>org.apache.sling.discovery.commons</artifactId>
+ <version>1.0.0-SNAPSHOT</version>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.sling</groupId>
+ <artifactId>org.apache.sling.discovery.base</artifactId>
+ <version>1.0.0-SNAPSHOT</version>
+ <scope>provided</scope>
+ </dependency>
+ <!-- besides including discovery.base' normal jar above,
+ for testing a few test helper classes are also reused.
+ in order to achieve that, also adding a test/test-jar dependency: -->
+ <dependency>
+ <groupId>org.apache.sling</groupId>
+ <artifactId>org.apache.sling.discovery.base</artifactId>
+ <version>1.0.0-SNAPSHOT</version>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
<dependency>
<groupId>org.apache.sling</groupId>
<artifactId>org.apache.sling.api</artifactId>
Modified: sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/Config.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/Config.java?rev=1709601&r1=1709600&r2=1709601&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/Config.java (original)
+++ sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/Config.java Tue Oct 20 14:12:31 2015
@@ -29,6 +29,7 @@ import org.apache.felix.scr.annotations.
import org.apache.felix.scr.annotations.Property;
import org.apache.felix.scr.annotations.Service;
import org.apache.sling.commons.osgi.PropertiesUtil;
+import org.apache.sling.discovery.base.connectors.BaseConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,7 +41,7 @@ import org.slf4j.LoggerFactory;
*/
@Component(metatype = true, label="%config.name", description="%config.description")
@Service(value = { Config.class })
-public class Config {
+public class Config implements BaseConfig {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@@ -339,7 +340,7 @@ public class Config {
* Returns the socket connect() timeout used by the topology connector, 0 disables the timeout
* @return the socket connect() timeout used by the topology connector, 0 disables the timeout
*/
- public int getConnectionTimeout() {
+ public int getSocketConnectionTimeout() {
return connectionTimeout;
}
@@ -387,12 +388,16 @@ public class Config {
return topologyConnectorWhitelist;
}
+ protected String getDiscoveryResourcePath() {
+ return discoveryResourcePath;
+ }
+
/**
* Returns the resource path where cluster instance informations are stored.
* @return the resource path where cluster instance informations are stored
*/
public String getClusterInstancesPath() {
- return discoveryResourcePath + CLUSTERINSTANCES_RESOURCE;
+ return getDiscoveryResourcePath() + CLUSTERINSTANCES_RESOURCE;
}
/**
@@ -400,7 +405,7 @@ public class Config {
* @return the resource path where the established view is stored
*/
public String getEstablishedViewPath() {
- return discoveryResourcePath + ESTABLISHED_VIEW_RESOURCE;
+ return getDiscoveryResourcePath() + ESTABLISHED_VIEW_RESOURCE;
}
/**
@@ -408,7 +413,7 @@ public class Config {
* @return the resource path where ongoing votings are stored
*/
public String getOngoingVotingsPath() {
- return discoveryResourcePath + ONGOING_VOTING_RESOURCE;
+ return getDiscoveryResourcePath() + ONGOING_VOTING_RESOURCE;
}
/**
@@ -416,7 +421,7 @@ public class Config {
* @return the resource path where the previous view is stored
*/
public String getPreviousViewPath() {
- return discoveryResourcePath + PREVIOUS_VIEW_RESOURCE;
+ return getDiscoveryResourcePath() + PREVIOUS_VIEW_RESOURCE;
}
/**
@@ -515,4 +520,15 @@ public class Config {
return factor * getHeartbeatInterval();
}
}
+
+ @Override
+ public long getConnectorPingInterval() {
+ return getHeartbeatInterval();
+ }
+
+ @Override
+ public long getConnectorPingTimeout() {
+ return getHeartbeatTimeout();
+ }
+
}
Modified: sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/DiscoveryServiceImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/DiscoveryServiceImpl.java?rev=1709601&r1=1709600&r2=1709601&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/DiscoveryServiceImpl.java (original)
+++ sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/DiscoveryServiceImpl.java Tue Oct 20 14:12:31 2015
@@ -47,26 +47,25 @@ import org.apache.sling.api.resource.Res
import org.apache.sling.api.resource.ResourceResolver;
import org.apache.sling.api.resource.ResourceResolverFactory;
import org.apache.sling.commons.scheduler.Scheduler;
-import org.apache.sling.discovery.ClusterView;
import org.apache.sling.discovery.DiscoveryService;
import org.apache.sling.discovery.InstanceDescription;
import org.apache.sling.discovery.PropertyProvider;
import org.apache.sling.discovery.TopologyEventListener;
-import org.apache.sling.discovery.TopologyView;
+import org.apache.sling.discovery.base.commons.BaseDiscoveryService;
+import org.apache.sling.discovery.base.commons.ClusterViewService;
+import org.apache.sling.discovery.base.commons.DefaultTopologyView;
+import org.apache.sling.discovery.base.connectors.announcement.AnnouncementRegistry;
+import org.apache.sling.discovery.base.connectors.ping.ConnectorRegistry;
import org.apache.sling.discovery.commons.providers.BaseTopologyView;
+import org.apache.sling.discovery.commons.providers.DefaultClusterView;
+import org.apache.sling.discovery.commons.providers.DefaultInstanceDescription;
import org.apache.sling.discovery.commons.providers.ViewStateManager;
import org.apache.sling.discovery.commons.providers.impl.ViewStateManagerFactory;
import org.apache.sling.discovery.commons.providers.spi.ConsistencyService;
-import org.apache.sling.discovery.impl.cluster.ClusterViewService;
-import org.apache.sling.discovery.impl.cluster.UndefinedClusterViewException;
-import org.apache.sling.discovery.impl.cluster.UndefinedClusterViewException.Reason;
-import org.apache.sling.discovery.impl.common.DefaultClusterViewImpl;
-import org.apache.sling.discovery.impl.common.DefaultInstanceDescriptionImpl;
+import org.apache.sling.discovery.commons.providers.spi.impl.IdMapService;
+import org.apache.sling.discovery.commons.providers.util.PropertyNameHelper;
+import org.apache.sling.discovery.commons.providers.util.ResourceHelper;
import org.apache.sling.discovery.impl.common.heartbeat.HeartbeatHandler;
-import org.apache.sling.discovery.impl.common.resource.ResourceHelper;
-import org.apache.sling.discovery.impl.topology.TopologyViewImpl;
-import org.apache.sling.discovery.impl.topology.announcement.AnnouncementRegistry;
-import org.apache.sling.discovery.impl.topology.connector.ConnectorRegistry;
import org.apache.sling.settings.SlingSettingsService;
import org.osgi.framework.BundleContext;
import org.osgi.framework.Constants;
@@ -81,7 +80,7 @@ import org.slf4j.LoggerFactory;
*/
@Component(immediate = true)
@Service(value = { DiscoveryService.class, DiscoveryServiceImpl.class })
-public class DiscoveryServiceImpl implements DiscoveryService {
+public class DiscoveryServiceImpl extends BaseDiscoveryService {
private final static Logger logger = LoggerFactory.getLogger(DiscoveryServiceImpl.class);
@@ -126,19 +125,40 @@ public class DiscoveryServiceImpl implem
@Reference
private Config config;
+
+ @Reference
+ private IdMapService idMapService;
/** the slingId of the local instance **/
private String slingId;
- /** the old view previously valid and sent to the TopologyEventListeners **/
- private TopologyViewImpl oldView;
-
private ServiceRegistration mbeanRegistration;
private ViewStateManager viewStateManager;
private ReentrantLock viewStateManagerLock;
+ /** for testing only **/
+ public static BaseDiscoveryService testConstructor(ResourceResolverFactory resourceResolverFactory,
+ AnnouncementRegistry announcementRegistry,
+ ConnectorRegistry connectorRegistry,
+ ClusterViewService clusterViewService,
+ HeartbeatHandler heartbeatHandler,
+ SlingSettingsService settingsService,
+ Scheduler scheduler,
+ Config config) {
+ DiscoveryServiceImpl discoService = new DiscoveryServiceImpl();
+ discoService.resourceResolverFactory = resourceResolverFactory;
+ discoService.announcementRegistry = announcementRegistry;
+ discoService.connectorRegistry = connectorRegistry;
+ discoService.clusterViewService = clusterViewService;
+ discoService.heartbeatHandler = heartbeatHandler;
+ discoService.settingsService = settingsService;
+ discoService.scheduler = scheduler;
+ discoService.config = config;
+ return discoService;
+ }
+
public DiscoveryServiceImpl() {
viewStateManagerLock = new ReentrantLock();
final ConsistencyService consistencyService = new ConsistencyService() {
@@ -174,12 +194,23 @@ public class DiscoveryServiceImpl implem
}
}
- private void setOldView(TopologyViewImpl view) {
- if (view==null) {
- throw new IllegalArgumentException("view must not be null");
+ protected void handleIsolatedFromTopology() {
+ if (heartbeatHandler!=null) {
+ // SLING-5030 part 2: when we detect being isolated we should
+ // step at the end of the leader-election queue and
+ // that can be achieved by resetting the leaderElectionId
+ // (which will in turn take effect on the next round of
+ // voting, or also double-checked when the local instance votes)
+ //
+ //TODO:
+ // Note that when the local instance doesn't notice
+ // an 'ISOLATED_FROM_TOPOLOGY' case, then the leaderElectionId
+ // will not be reset. Which means that it then could potentially
+ // regain leadership.
+ if (heartbeatHandler.resetLeaderElectionId()) {
+ logger.info("getTopology: reset leaderElectionId to force this instance to the end of the instance order (thus incl not to remain leader)");
+ }
}
- logger.debug("setOldView: oldView is now: {}", oldView);
- oldView = view;
}
/**
@@ -209,19 +240,19 @@ public class DiscoveryServiceImpl implem
// this way for the single-instance case the clusterId can
// remain the same between a getTopology() that is invoked before
// the first TOPOLOGY_INIT and afterwards
- DefaultClusterViewImpl isolatedCluster = new DefaultClusterViewImpl(isolatedClusterId);
+ DefaultClusterView isolatedCluster = new DefaultClusterView(isolatedClusterId);
Map<String, String> emptyProperties = new HashMap<String, String>();
- DefaultInstanceDescriptionImpl isolatedInstance =
- new DefaultInstanceDescriptionImpl(isolatedCluster, true, true, slingId, emptyProperties);
+ DefaultInstanceDescription isolatedInstance =
+ new DefaultInstanceDescription(isolatedCluster, true, true, slingId, emptyProperties);
Collection<InstanceDescription> col = new ArrayList<InstanceDescription>();
col.add(isolatedInstance);
- final TopologyViewImpl topology = new TopologyViewImpl();
+ final DefaultTopologyView topology = new DefaultTopologyView();
topology.addInstances(col);
topology.setNotCurrent();
setOldView(topology);
}
- setOldView((TopologyViewImpl) getTopology());
- oldView.setNotCurrent();
+ setOldView((DefaultTopologyView) getTopology());
+ getOldView().setNotCurrent();
// make sure the first heartbeat is issued as soon as possible - which
// is right after this service starts. since the two (discoveryservice
@@ -235,7 +266,7 @@ public class DiscoveryServiceImpl implem
doUpdateProperties();
- TopologyViewImpl newView = (TopologyViewImpl) getTopology();
+ DefaultTopologyView newView = (DefaultTopologyView) getTopology();
if (newView.isCurrent()) {
viewStateManager.handleNewView(newView);
} else {
@@ -468,58 +499,6 @@ public class DiscoveryServiceImpl implem
}
/**
- * @see DiscoveryService#getTopology()
- */
- public TopologyView getTopology() {
- if (clusterViewService == null) {
- throw new IllegalStateException(
- "DiscoveryService not yet initialized with IClusterViewService");
- }
- // create a new topology view
- final TopologyViewImpl topology = new TopologyViewImpl();
-
- ClusterView localClusterView = null;
- try {
- localClusterView = clusterViewService.getClusterView();
- } catch (UndefinedClusterViewException e) {
- // SLING-5030 : when we're cut off from the local cluster we also
- // treat it as being cut off from the entire topology, ie we don't
- // update the announcements but just return
- // the previous oldView marked as !current
- logger.info("getTopology: undefined cluster view: "+e.getReason()+"] "+e);
- oldView.setNotCurrent();
- if (e.getReason()==Reason.ISOLATED_FROM_TOPOLOGY) {
- if (heartbeatHandler!=null) {
- // SLING-5030 part 2: when we detect being isolated we should
- // step at the end of the leader-election queue and
- // that can be achieved by resetting the leaderElectionId
- // (which will in turn take effect on the next round of
- // voting, or also double-checked when the local instance votes)
- //
- //TODO:
- // Note that when the local instance doesn't notice
- // an 'ISOLATED_FROM_TOPOLOGY' case, then the leaderElectionId
- // will not be reset. Which means that it then could potentially
- // regain leadership.
- if (heartbeatHandler.resetLeaderElectionId()) {
- logger.info("getTopology: reset leaderElectionId to force this instance to the end of the instance order (thus incl not to remain leader)");
- }
- }
- }
- return oldView;
- }
-
- final List<InstanceDescription> localInstances = localClusterView.getInstances();
- topology.addInstances(localInstances);
-
- Collection<InstanceDescription> attachedInstances = announcementRegistry
- .listInstances(localClusterView);
- topology.addInstances(attachedInstances);
-
- return topology;
- }
-
- /**
* Update the properties and sent a topology event if applicable
*/
public void updateProperties() {
@@ -576,7 +555,7 @@ public class DiscoveryServiceImpl implem
/** SLING-2883 : put property only if valid **/
private void putPropertyIfValid(final String name, final String val) {
- if (ResourceHelper.isValidPropertyName(name)) {
+ if (PropertyNameHelper.isValidPropertyName(name)) {
this.properties.put(name, val);
}
}
@@ -640,18 +619,26 @@ public class DiscoveryServiceImpl implem
logger.error("forcedShutdown: ignoring forced shutdown. Service is not activated.");
return;
}
- if (oldView == null) {
+ if (getOldView() == null) {
logger.error("forcedShutdown: ignoring forced shutdown. No oldView available.");
return;
}
logger.error("forcedShutdown: sending TOPOLOGY_CHANGING to all listeners");
// SLING-4638: make sure the oldView is really marked as old:
- oldView.setNotCurrent();
+ getOldView().setNotCurrent();
viewStateManager.handleChanging();
logger.error("forcedShutdown: deactivating DiscoveryService.");
// to make sure no further event is sent after this, flag this service as deactivated
activated = false;
}
}
+
+ protected ClusterViewService getClusterViewService() {
+ return clusterViewService;
+ }
+
+ protected AnnouncementRegistry getAnnouncementRegistry() {
+ return announcementRegistry;
+ }
}
Modified: sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/TopologyWebConsolePlugin.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/TopologyWebConsolePlugin.java?rev=1709601&r1=1709600&r2=1709601&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/TopologyWebConsolePlugin.java (original)
+++ sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/TopologyWebConsolePlugin.java Tue Oct 20 14:12:31 2015
@@ -52,14 +52,14 @@ import org.apache.sling.discovery.Instan
import org.apache.sling.discovery.InstanceFilter;
import org.apache.sling.discovery.TopologyEvent;
import org.apache.sling.discovery.TopologyEvent.Type;
+import org.apache.sling.discovery.base.commons.ClusterViewService;
+import org.apache.sling.discovery.base.connectors.announcement.Announcement;
+import org.apache.sling.discovery.base.connectors.announcement.AnnouncementRegistry;
+import org.apache.sling.discovery.base.connectors.announcement.CachedAnnouncement;
+import org.apache.sling.discovery.base.connectors.ping.ConnectorRegistry;
+import org.apache.sling.discovery.base.connectors.ping.TopologyConnectorClientInformation;
import org.apache.sling.discovery.TopologyEventListener;
import org.apache.sling.discovery.TopologyView;
-import org.apache.sling.discovery.impl.cluster.ClusterViewService;
-import org.apache.sling.discovery.impl.topology.announcement.Announcement;
-import org.apache.sling.discovery.impl.topology.announcement.AnnouncementRegistry;
-import org.apache.sling.discovery.impl.topology.announcement.CachedAnnouncement;
-import org.apache.sling.discovery.impl.topology.connector.ConnectorRegistry;
-import org.apache.sling.discovery.impl.topology.connector.TopologyConnectorClientInformation;
import org.osgi.framework.BundleContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -430,8 +430,8 @@ public class TopologyWebConsolePlugin ex
pw.println("<td><b>not connected</b></td>");
pw.println("<td"+tooltip+"><b>not ok (HTTP Status-Code: "+statusCode+", "+statusDetails+")</b></td>");
}
- pw.println("<td>"+beautifiedTimeDiff(topologyConnectorClient.getLastHeartbeatSent())+"</td>");
- pw.println("<td>"+beautifiedDueTime(topologyConnectorClient.getNextHeartbeatDue())+"</td>");
+ pw.println("<td>"+beautifiedTimeDiff(topologyConnectorClient.getLastPingSent())+"</td>");
+ pw.println("<td>"+beautifiedDueTime(topologyConnectorClient.getNextPingDue())+"</td>");
pw.println("<td>"+topologyConnectorClient.getLastRequestEncoding()+"</td>");
pw.println("<td>"+topologyConnectorClient.getLastResponseEncoding()+"</td>");
// //TODO fallback urls are not yet implemented!
@@ -525,7 +525,7 @@ public class TopologyWebConsolePlugin ex
} else {
pw.println("<td><i>n/a</i></td>");
}
- pw.println("<td>"+beautifiedTimeDiff(incomingCachedAnnouncement.getLastHeartbeat())+"</td>");
+ pw.println("<td>"+beautifiedTimeDiff(incomingCachedAnnouncement.getLastPing())+"</td>");
pw.println("<td>"+beautifiedDueTime(incomingCachedAnnouncement.getSecondsUntilTimeout())+"</td>");
pw.println("</tr>");
@@ -784,7 +784,7 @@ public class TopologyWebConsolePlugin ex
pw.print(incomingAnnouncement.getServerInfo());
pw.println();
}
- pw.println("Last heartbeat received : "+beautifiedTimeDiff(incomingCachedAnnouncement.getLastHeartbeat()));
+ pw.println("Last heartbeat received : "+beautifiedTimeDiff(incomingCachedAnnouncement.getLastPing()));
pw.println("Timeout : "+beautifiedDueTime(incomingCachedAnnouncement.getSecondsUntilTimeout()));
pw.println();
@@ -845,8 +845,8 @@ public class TopologyWebConsolePlugin ex
}
pw.print(" (HTTP StatusCode: "+statusCode+", "+statusDetails+")");
pw.println();
- pw.println("Last heartbeat sent : "+beautifiedTimeDiff(topologyConnectorClient.getLastHeartbeatSent()));
- pw.println("Next heartbeat due : "+beautifiedDueTime(topologyConnectorClient.getNextHeartbeatDue()));
+ pw.println("Last heartbeat sent : "+beautifiedTimeDiff(topologyConnectorClient.getLastPingSent()));
+ pw.println("Next heartbeat due : "+beautifiedDueTime(topologyConnectorClient.getNextPingDue()));
}
pw.println();
}
Modified: sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/cluster/ClusterViewServiceImpl.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/cluster/ClusterViewServiceImpl.java?rev=1709601&r1=1709600&r2=1709601&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/cluster/ClusterViewServiceImpl.java (original)
+++ sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/cluster/ClusterViewServiceImpl.java Tue Oct 20 14:12:31 2015
@@ -18,20 +18,18 @@
*/
package org.apache.sling.discovery.impl.cluster;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.Service;
import org.apache.sling.api.resource.LoginException;
import org.apache.sling.api.resource.ResourceResolver;
import org.apache.sling.api.resource.ResourceResolverFactory;
-import org.apache.sling.discovery.ClusterView;
import org.apache.sling.discovery.InstanceDescription;
+import org.apache.sling.discovery.base.commons.ClusterViewService;
+import org.apache.sling.discovery.base.commons.UndefinedClusterViewException;
+import org.apache.sling.discovery.base.commons.UndefinedClusterViewException.Reason;
+import org.apache.sling.discovery.commons.providers.spi.LocalClusterView;
import org.apache.sling.discovery.impl.Config;
-import org.apache.sling.discovery.impl.cluster.UndefinedClusterViewException.Reason;
import org.apache.sling.discovery.impl.common.View;
import org.apache.sling.discovery.impl.common.ViewHelper;
import org.apache.sling.discovery.impl.common.resource.EstablishedClusterView;
@@ -60,6 +58,15 @@ public class ClusterViewServiceImpl impl
@Reference
private Config config;
+ public static ClusterViewService testConstructor(SlingSettingsService settingsService,
+ ResourceResolverFactory factory, Config config) {
+ ClusterViewServiceImpl service = new ClusterViewServiceImpl();
+ service.settingsService = settingsService;
+ service.resourceResolverFactory = factory;
+ service.config = config;
+ return service;
+ }
+
public String getSlingId() {
if (settingsService==null) {
return null;
@@ -67,32 +74,7 @@ public class ClusterViewServiceImpl impl
return settingsService.getSlingId();
}
- public boolean contains(final String slingId) throws UndefinedClusterViewException {
- List<InstanceDescription> localInstances = getClusterView().getInstances();
- for (Iterator<InstanceDescription> it = localInstances.iterator(); it
- .hasNext();) {
- InstanceDescription aLocalInstance = it.next();
- if (aLocalInstance.getSlingId().equals(slingId)) {
- return true;
- }
- }
-
- return false;
- }
-
- public boolean containsAny(Collection<InstanceDescription> listInstances)
- throws UndefinedClusterViewException{
- for (Iterator<InstanceDescription> it = listInstances.iterator(); it
- .hasNext();) {
- InstanceDescription instanceDescription = it.next();
- if (contains(instanceDescription.getSlingId())) {
- return true;
- }
- }
- return false;
- }
-
- public ClusterView getClusterView() throws UndefinedClusterViewException {
+ public LocalClusterView getLocalClusterView() throws UndefinedClusterViewException {
if (resourceResolverFactory==null) {
logger.warn("getClusterView: no resourceResolverFactory set at the moment.");
throw new UndefinedClusterViewException(Reason.REPOSITORY_EXCEPTION,
@@ -112,16 +94,9 @@ public class ClusterViewServiceImpl impl
EstablishedClusterView clusterViewImpl = new EstablishedClusterView(
config, view, getSlingId());
- boolean foundLocal = false;
- for (Iterator<InstanceDescription> it = clusterViewImpl
- .getInstances().iterator(); it.hasNext();) {
- InstanceDescription instance = it.next();
- if (instance.isLocal()) {
- foundLocal = true;
- break;
- }
- }
- if (foundLocal) {
+
+ InstanceDescription local = clusterViewImpl.getLocalInstance();
+ if (local != null) {
return clusterViewImpl;
} else {
logger.info("getClusterView: the local instance ("+getSlingId()+") is currently not included in the existing established view! "
Modified: sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/cluster/voting/VotingHandler.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/cluster/voting/VotingHandler.java?rev=1709601&r1=1709600&r2=1709601&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/cluster/voting/VotingHandler.java (original)
+++ sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/cluster/voting/VotingHandler.java Tue Oct 20 14:12:31 2015
@@ -38,8 +38,8 @@ import org.apache.sling.api.resource.Res
import org.apache.sling.api.resource.ResourceResolver;
import org.apache.sling.api.resource.ResourceResolverFactory;
import org.apache.sling.api.resource.ValueMap;
+import org.apache.sling.discovery.commons.providers.util.ResourceHelper;
import org.apache.sling.discovery.impl.Config;
-import org.apache.sling.discovery.impl.common.resource.ResourceHelper;
import org.apache.sling.settings.SlingSettingsService;
import org.osgi.framework.Constants;
import org.osgi.service.component.ComponentContext;
@@ -84,6 +84,16 @@ public class VotingHandler implements Ev
* to ensure the leaderElectionId is correctly set upon voting
*/
private volatile String leaderElectionId;
+
+ /** for testing only **/
+ public static VotingHandler testConstructor(SlingSettingsService settingsService,
+ ResourceResolverFactory factory, Config config) {
+ VotingHandler handler = new VotingHandler();
+ handler.slingSettingsService = settingsService;
+ handler.resolverFactory = factory;
+ handler.config = config;
+ return handler;
+ }
protected void activate(final ComponentContext context) {
slingId = slingSettingsService.getSlingId();
@@ -140,6 +150,7 @@ public class VotingHandler implements Ev
*/
public synchronized void analyzeVotings(final ResourceResolver resourceResolver) throws PersistenceException {
// SLING-3406: refreshing resourceResolver/session here to get the latest state from the repository
+ logger.debug("analyzeVotings: start. slingId: {}", slingId);
resourceResolver.refresh();
VotingView winningVote = VotingHelper.getWinningVoting(
resourceResolver, config);
Modified: sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/cluster/voting/VotingHelper.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/cluster/voting/VotingHelper.java?rev=1709601&r1=1709600&r2=1709601&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/cluster/voting/VotingHelper.java (original)
+++ sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/cluster/voting/VotingHelper.java Tue Oct 20 14:12:31 2015
@@ -25,8 +25,8 @@ import java.util.List;
import org.apache.sling.api.resource.Resource;
import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.discovery.commons.providers.util.ResourceHelper;
import org.apache.sling.discovery.impl.Config;
-import org.apache.sling.discovery.impl.common.resource.ResourceHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Modified: sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/cluster/voting/VotingView.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/cluster/voting/VotingView.java?rev=1709601&r1=1709600&r2=1709601&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/cluster/voting/VotingView.java (original)
+++ sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/cluster/voting/VotingView.java Tue Oct 20 14:12:31 2015
@@ -34,10 +34,10 @@ import org.apache.sling.api.resource.Per
import org.apache.sling.api.resource.Resource;
import org.apache.sling.api.resource.ResourceResolver;
import org.apache.sling.api.resource.ValueMap;
+import org.apache.sling.discovery.commons.providers.util.ResourceHelper;
import org.apache.sling.discovery.impl.Config;
import org.apache.sling.discovery.impl.common.View;
import org.apache.sling.discovery.impl.common.ViewHelper;
-import org.apache.sling.discovery.impl.common.resource.ResourceHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Modified: sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/common/heartbeat/HeartbeatHandler.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/common/heartbeat/HeartbeatHandler.java?rev=1709601&r1=1709600&r2=1709601&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/common/heartbeat/HeartbeatHandler.java (original)
+++ sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/common/heartbeat/HeartbeatHandler.java Tue Oct 20 14:12:31 2015
@@ -18,22 +18,15 @@
*/
package org.apache.sling.discovery.impl.common.heartbeat;
-import java.util.ArrayList;
import java.util.Calendar;
-import java.util.Collection;
import java.util.Date;
-import java.util.HashMap;
import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
import java.util.Set;
import java.util.UUID;
import javax.jcr.Session;
-import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.ReferencePolicy;
@@ -45,25 +38,20 @@ import org.apache.sling.api.resource.Res
import org.apache.sling.api.resource.ResourceResolver;
import org.apache.sling.api.resource.ResourceResolverFactory;
import org.apache.sling.commons.scheduler.Scheduler;
+import org.apache.sling.discovery.base.commons.BaseViewChecker;
+import org.apache.sling.discovery.base.connectors.announcement.AnnouncementRegistry;
+import org.apache.sling.discovery.base.connectors.ping.ConnectorRegistry;
+import org.apache.sling.discovery.commons.providers.util.ResourceHelper;
import org.apache.sling.discovery.impl.Config;
import org.apache.sling.discovery.impl.DiscoveryServiceImpl;
import org.apache.sling.discovery.impl.cluster.voting.VotingHandler;
import org.apache.sling.discovery.impl.cluster.voting.VotingHelper;
import org.apache.sling.discovery.impl.cluster.voting.VotingView;
import org.apache.sling.discovery.impl.common.ViewHelper;
-import org.apache.sling.discovery.impl.common.resource.ResourceHelper;
-import org.apache.sling.discovery.impl.topology.announcement.AnnouncementRegistry;
-import org.apache.sling.discovery.impl.topology.connector.ConnectorRegistry;
import org.apache.sling.launchpad.api.StartupListener;
-import org.apache.sling.launchpad.api.StartupMode;
import org.apache.sling.settings.SlingSettingsService;
import org.osgi.framework.BundleException;
-import org.osgi.framework.Constants;
-import org.osgi.framework.ServiceReference;
-import org.osgi.service.component.ComponentContext;
import org.osgi.service.http.HttpService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* The heartbeat handler is responsible and capable of issuing both local and
@@ -77,51 +65,16 @@ import org.slf4j.LoggerFactory;
@Reference(referenceInterface=HttpService.class,
cardinality=ReferenceCardinality.OPTIONAL_MULTIPLE,
policy=ReferencePolicy.DYNAMIC)
-public class HeartbeatHandler implements Runnable, StartupListener {
+public class HeartbeatHandler extends BaseViewChecker {
private static final String PROPERTY_ID_LAST_HEARTBEAT = "lastHeartbeat";
- private static final String PROPERTY_ID_ENDPOINTS = "endpoints";
-
- private static final String PROPERTY_ID_SLING_HOME_PATH = "slingHomePath";
-
- private static final String PROPERTY_ID_RUNTIME = "runtimeId";
-
- private final Logger logger = LoggerFactory.getLogger(this.getClass());
-
- /** Endpoint service registration property from RFC 189 */
- private static final String REG_PROPERTY_ENDPOINTS = "osgi.http.service.endpoints";
-
- /** the name used for the period job with the scheduler **/
- private String NAME = "discovery.impl.heartbeat.runner.";
-
- @Reference
- private SlingSettingsService slingSettingsService;
-
- @Reference
- private ResourceResolverFactory resourceResolverFactory;
-
- @Reference
- private ConnectorRegistry connectorRegistry;
-
- @Reference
- private AnnouncementRegistry announcementRegistry;
-
- @Reference
- private Scheduler scheduler;
-
@Reference
private Config config;
@Reference
private VotingHandler votingHandler;
- /** the discovery service reference is used to get properties updated before heartbeats are sent **/
- private DiscoveryServiceImpl discoveryService;
-
- /** the sling id of the local instance **/
- private String slingId;
-
/** the id which is to be used for the next voting **/
private String nextVotingId = UUID.randomUUID().toString();
@@ -135,81 +88,50 @@ public class HeartbeatHandler implements
*/
private volatile String newLeaderElectionId;
- /** lock object for synchronizing the run method **/
- private final Object lock = new Object();
-
/** SLING-2892: remember first heartbeat written to repository by this instance **/
private long firstHeartbeatWritten = -1;
/** SLING-2892: remember the value of the heartbeat this instance has written the last time **/
private Calendar lastHeartbeatWritten = null;
- /** SLING-2895: avoid heartbeats after deactivation **/
- private volatile boolean activated = false;
-
- /** SLING-2901: the runtimeId is a unique id, set on activation, used for robust duplicate sling.id detection **/
- private String runtimeId;
-
- /** keep a reference to the component context **/
- private ComponentContext context;
-
- /** SLING-2968 : start issuing remote heartbeats only after startup finished **/
- private boolean startupFinished = false;
-
- /** SLING-3382 : force ping instructs the servlet to start the backoff from scratch again **/
- private boolean forcePing;
+ private DiscoveryServiceImpl discoveryServiceImpl;
- /** SLING-4765 : store endpoints to /clusterInstances for more verbose duplicate slingId/ghost detection **/
- private final Map<Long, String[]> endpoints = new HashMap<Long, String[]>();
-
- public void inform(StartupMode mode, boolean finished) {
- if (finished) {
- startupFinished(mode);
- }
- }
-
- public void startupFinished(StartupMode mode) {
- synchronized(lock) {
- startupFinished = true;
- issueHeartbeat();
- }
- }
-
- public void startupProgress(float ratio) {
- // we dont care
- }
-
- @Activate
- protected void activate(ComponentContext context) {
- synchronized(lock) {
- this.context = context;
-
- slingId = slingSettingsService.getSlingId();
- NAME = "discovery.impl.heartbeat.runner." + slingId;
- // on activate the resetLeaderElectionId is set to true to ensure that
- // the 'leaderElectionId' property is reset on next heartbeat issuance.
- // the idea being that a node which leaves the cluster should not
- // become leader on next join - and by resetting the leaderElectionId
- // to the current time, this is ensured.
- resetLeaderElectionId = true;
- runtimeId = UUID.randomUUID().toString();
-
- // SLING-2895: reset variables to avoid unnecessary log.error
- firstHeartbeatWritten = -1;
- lastHeartbeatWritten = null;
+ /** for testing only **/
+ public static HeartbeatHandler testConstructor(
+ SlingSettingsService slingSettingsService,
+ ResourceResolverFactory factory,
+ AnnouncementRegistry announcementRegistry,
+ ConnectorRegistry connectorRegistry,
+ Config config,
+ Scheduler scheduler) {
+ HeartbeatHandler handler = new HeartbeatHandler();
+ handler.slingSettingsService = slingSettingsService;
+ handler.resourceResolverFactory = factory;
+ handler.announcementRegistry = announcementRegistry;
+ handler.connectorRegistry = connectorRegistry;
+ handler.connectorConfig = config;
+ handler.config = config;
+ handler.scheduler = scheduler;
+ return handler;
+ }
+
+ @Override
+ protected void doActivate() {
+ // on activate the resetLeaderElectionId is set to true to ensure that
+ // the 'leaderElectionId' property is reset on next heartbeat issuance.
+ // the idea being that a node which leaves the cluster should not
+ // become leader on next join - and by resetting the leaderElectionId
+ // to the current time, this is ensured.
+ resetLeaderElectionId = true;
+ runtimeId = UUID.randomUUID().toString();
- activated = true;
- logger.info("activate: activated with runtimeId: {}, slingId: {}", runtimeId, slingId);
- }
- }
+ // SLING-2895: reset variables to avoid unnecessary log.error
+ firstHeartbeatWritten = -1;
+ lastHeartbeatWritten = null;
- @Deactivate
- protected void deactivate() {
- // SLING-3365 : dont synchronize on deactivate
- activated = false;
- scheduler.removeJob(NAME);
+ logger.info("doActivate: activated with runtimeId: {}, slingId: {}", runtimeId, slingId);
}
-
+
/**
* The initialize method is called by the DiscoveryServiceImpl.activate
* as we require the discoveryService (and the discoveryService has
@@ -228,6 +150,7 @@ public class HeartbeatHandler implements
final String initialVotingId) {
synchronized(lock) {
this.discoveryService = discoveryService;
+ this.discoveryServiceImpl = discoveryService;
this.nextVotingId = initialVotingId;
logger.info("initialize: nextVotingId="+nextVotingId);
issueHeartbeat();
@@ -236,6 +159,9 @@ public class HeartbeatHandler implements
try {
final long interval = config.getHeartbeatInterval();
logger.info("initialize: starting periodic heartbeat job for "+slingId+" with interval "+interval+" sec.");
+ if (interval==0) {
+ logger.warn("initialize: Repeat interval cannot be zero.");
+ }
scheduler.addPeriodicJob(NAME, this,
null, interval, false);
} catch (Exception e) {
@@ -243,21 +169,6 @@ public class HeartbeatHandler implements
}
}
- public void run() {
- synchronized(lock) {
- if (!activated) {
- // SLING:2895: avoid heartbeats if not activated
- return;
- }
-
- // issue a heartbeat
- issueHeartbeat();
-
- // check the view
- checkView();
- }
- }
-
/** Get or create a ResourceResolver **/
private ResourceResolver getResourceResolver() throws LoginException {
if (resourceResolverFactory == null) {
@@ -272,21 +183,6 @@ public class HeartbeatHandler implements
return config.getClusterInstancesPath() + "/" + slingId;
}
- /** Trigger the issuance of the next heartbeat asap instead of at next heartbeat interval **/
- public void triggerHeartbeat() {
- forcePing = true;
- try {
- // then fire a job immediately
- // use 'fireJobAt' here, instead of 'fireJob' to make sure the job can always be triggered
- // 'fireJob' checks for a job from the same job-class to already exist
- // 'fireJobAt' though allows to pass a name for the job - which can be made unique, thus does not conflict/already-exist
- logger.info("triggerHeartbeat: firing job to trigger heartbeat");
- scheduler.fireJobAt(NAME+UUID.randomUUID(), this, null, new Date(System.currentTimeMillis()-1000 /* make sure it gets triggered immediately*/));
- } catch (Exception e) {
- logger.info("triggerHeartbeat: Could not trigger heartbeat: " + e);
- }
- }
-
/**
* Hook that will cause a reset of the leaderElectionId
* on next invocation of issueClusterLocalHeartbeat.
@@ -333,31 +229,14 @@ public class HeartbeatHandler implements
* and then a remote heartbeat (to all the topology connectors
* which announce this part of the topology to others)
*/
- void issueHeartbeat() {
+ protected void issueHeartbeat() {
if (discoveryService == null) {
logger.error("issueHeartbeat: discoveryService is null");
} else {
discoveryService.updateProperties();
}
issueClusterLocalHeartbeat();
- issueRemoteHeartbeats();
- }
-
- /** Issue a remote heartbeat using the topology connectors **/
- private void issueRemoteHeartbeats() {
- if (connectorRegistry == null) {
- logger.error("issueRemoteHeartbeats: connectorRegistry is null");
- return;
- }
- if (!startupFinished) {
- logger.debug("issueRemoteHeartbeats: not issuing remote heartbeat yet, startup not yet finished");
- return;
- }
- if (logger.isDebugEnabled()) {
- logger.debug("issueRemoteHeartbeats: pinging outgoing topology connectors (if there is any) for "+slingId);
- }
- connectorRegistry.pingOutgoingConnectors(forcePing);
- forcePing = false;
+ issueConnectorPings();
}
/** Issue a cluster local heartbeat (into the repository) **/
@@ -429,7 +308,7 @@ public class HeartbeatHandler implements
" Check for sling.id.file in your installation of all instances in this cluster " +
"to verify this! Duplicate sling.ids are not allowed within a cluster!");
logger.error("issueClusterLocalHeartbeat: sending TOPOLOGY_CHANGING before self-disabling.");
- discoveryService.forcedShutdown();
+ discoveryServiceImpl.forcedShutdown();
logger.error("issueClusterLocalHeartbeat: disabling discovery.impl");
activated = false;
if (context!=null) {
@@ -530,18 +409,13 @@ public class HeartbeatHandler implements
/** Check whether the established view matches the reality, ie matches the
* heartbeats
*/
- void checkView() {
- // check the remotes first
- if (announcementRegistry == null) {
- logger.error("announcementRegistry is null");
- return;
- }
- announcementRegistry.checkExpiredAnnouncements();
+ protected void doCheckView() {
+ super.doCheckView();
ResourceResolver resourceResolver = null;
try {
resourceResolver = getResourceResolver();
- doCheckView(resourceResolver);
+ doCheckViewWith(resourceResolver);
} catch (LoginException e) {
logger.error("checkView: could not log in administratively: " + e,
e);
@@ -558,16 +432,16 @@ public class HeartbeatHandler implements
/** do the established-against-heartbeat view check using the given resourceResolver.
*/
- private void doCheckView(final ResourceResolver resourceResolver) throws PersistenceException {
+ private void doCheckViewWith(final ResourceResolver resourceResolver) throws PersistenceException {
if (votingHandler==null) {
- logger.info("doCheckView: votingHandler is null! slingId="+slingId);
+ logger.info("doCheckViewWith: votingHandler is null! slingId="+slingId);
} else {
votingHandler.analyzeVotings(resourceResolver);
try{
votingHandler.cleanupTimedoutVotings(resourceResolver);
} catch(Exception e) {
- logger.warn("doCheckView: Exception occurred while cleaning up votings: "+e, e);
+ logger.warn("doCheckViewWith: Exception occurred while cleaning up votings: "+e, e);
}
}
@@ -580,11 +454,11 @@ public class HeartbeatHandler implements
// settle
// but first: make sure we sent the TOPOLOGY_CHANGING
- logger.info("doCheckView: there are pending votings, marking topology as changing...");
+ logger.info("doCheckViewWith: there are pending votings, marking topology as changing...");
discoveryService.handleTopologyChanging();
if (logger.isDebugEnabled()) {
- logger.debug("doCheckView: "
+ logger.debug("doCheckViewWith: "
+ numOpenNonWinningVotes
+ " ongoing votings, no one winning yet - I shall wait for them to settle.");
}
@@ -600,19 +474,19 @@ public class HeartbeatHandler implements
// that's the normal case. the established view matches what we're
// seeing.
// all happy and fine
- logger.debug("doCheckView: no pending nor winning votes. view is fine. we're all happy.");
+ logger.debug("doCheckViewWith: no pending nor winning votes. view is fine. we're all happy.");
return;
}
// immediately send a TOPOLOGY_CHANGING - could already be sent, but just to be sure
- logger.info("doCheckView: no matching established view, marking topology as changing");
+ logger.info("doCheckViewWith: no matching established view, marking topology as changing");
discoveryService.handleTopologyChanging();
if (logger.isDebugEnabled()) {
- logger.debug("doCheckView: no pending nor winning votes. But: view does not match established or no established yet. Initiating a new voting");
+ logger.debug("doCheckViewWith: no pending nor winning votes. But: view does not match established or no established yet. Initiating a new voting");
Iterator<String> it = liveInstances.iterator();
while (it.hasNext()) {
- logger.debug("doCheckView: one of the live instances is: "
+ logger.debug("doCheckViewWith: one of the live instances is: "
+ it.next());
}
}
@@ -663,89 +537,5 @@ public class HeartbeatHandler implements
}
}
}
-
- /**
- * Bind a http service
- */
- protected void bindHttpService(final ServiceReference reference) {
- final String[] endpointUrls = toStringArray(reference.getProperty(REG_PROPERTY_ENDPOINTS));
- if ( endpointUrls != null ) {
- synchronized ( lock ) {
- this.endpoints.put((Long)reference.getProperty(Constants.SERVICE_ID), endpointUrls);
-
- // make sure this gets written on next heartbeat
- firstHeartbeatWritten = -1;
- lastHeartbeatWritten = null;
- }
- }
- }
-
- /**
- * Unbind a http service
- */
- protected void unbindHttpService(final ServiceReference reference) {
- synchronized ( lock ) {
- if ( this.endpoints.remove(reference.getProperty(Constants.SERVICE_ID)) != null ) {
- // make sure the change gets written on next heartbeat
- firstHeartbeatWritten = -1;
- lastHeartbeatWritten = null;
- }
- }
- }
-
- private String[] toStringArray(final Object propValue) {
- if (propValue == null) {
- // no value at all
- return null;
-
- } else if (propValue instanceof String) {
- // single string
- return new String[] { (String) propValue };
-
- } else if (propValue instanceof String[]) {
- // String[]
- return (String[]) propValue;
-
- } else if (propValue.getClass().isArray()) {
- // other array
- Object[] valueArray = (Object[]) propValue;
- List<String> values = new ArrayList<String>(valueArray.length);
- for (Object value : valueArray) {
- if (value != null) {
- values.add(value.toString());
- }
- }
- return values.toArray(new String[values.size()]);
-
- } else if (propValue instanceof Collection<?>) {
- // collection
- Collection<?> valueCollection = (Collection<?>) propValue;
- List<String> valueList = new ArrayList<String>(valueCollection.size());
- for (Object value : valueCollection) {
- if (value != null) {
- valueList.add(value.toString());
- }
- }
- return valueList.toArray(new String[valueList.size()]);
- }
-
- return null;
- }
- private String getEndpointsAsString() {
- final StringBuilder sb = new StringBuilder();
- boolean first = true;
- for(final String[] points : endpoints.values()) {
- for(final String point : points) {
- if ( first ) {
- first = false;
- } else {
- sb.append(",");
- }
- sb.append(point);
- }
- }
- return sb.toString();
-
- }
}
Modified: sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/common/resource/EstablishedClusterView.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/common/resource/EstablishedClusterView.java?rev=1709601&r1=1709600&r2=1709601&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/common/resource/EstablishedClusterView.java (original)
+++ sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/common/resource/EstablishedClusterView.java Tue Oct 20 14:12:31 2015
@@ -27,8 +27,8 @@ import java.util.List;
import org.apache.sling.api.resource.Resource;
import org.apache.sling.api.resource.ValueMap;
import org.apache.sling.discovery.InstanceDescription;
+import org.apache.sling.discovery.commons.providers.spi.LocalClusterView;
import org.apache.sling.discovery.impl.Config;
-import org.apache.sling.discovery.impl.common.DefaultClusterViewImpl;
import org.apache.sling.discovery.impl.common.View;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,7 +38,7 @@ import org.slf4j.LoggerFactory;
* as stored in the repository at the according location
*
*/
-public class EstablishedClusterView extends DefaultClusterViewImpl {
+public class EstablishedClusterView extends LocalClusterView {
/**
* use static logger to avoid frequent initialization as is potentially the
@@ -50,7 +50,7 @@ public class EstablishedClusterView exte
/** Construct a new established cluster view **/
public EstablishedClusterView(final Config config, final View view,
final String localId) {
- super(view.getViewId());
+ super(view.getViewId(), null /* localClusterSyncTokenId not supported */);
final Resource viewRes = view.getResource();
if (viewRes == null) {
Modified: sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/common/resource/EstablishedInstanceDescription.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/common/resource/EstablishedInstanceDescription.java?rev=1709601&r1=1709600&r2=1709601&view=diff
==============================================================================
--- sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/common/resource/EstablishedInstanceDescription.java (original)
+++ sling/trunk/bundles/extensions/discovery/impl/src/main/java/org/apache/sling/discovery/impl/common/resource/EstablishedInstanceDescription.java Tue Oct 20 14:12:31 2015
@@ -24,17 +24,17 @@ import java.util.Map;
import org.apache.sling.api.resource.Resource;
import org.apache.sling.api.resource.ValueMap;
-import org.apache.sling.discovery.impl.common.DefaultClusterViewImpl;
-import org.apache.sling.discovery.impl.common.DefaultInstanceDescriptionImpl;
+import org.apache.sling.discovery.commons.providers.DefaultClusterView;
+import org.apache.sling.discovery.commons.providers.DefaultInstanceDescription;
/**
* An InstanceDescription which reads the properties from the according location
* in the repository
*/
public class EstablishedInstanceDescription extends
- DefaultInstanceDescriptionImpl {
+ DefaultInstanceDescription {
- public EstablishedInstanceDescription(final DefaultClusterViewImpl clusterView,
+ public EstablishedInstanceDescription(final DefaultClusterView clusterView,
final Resource res, final String slingId, final boolean isLeader, final boolean isOwn) {
super(clusterView, isLeader, isOwn, slingId, null);