You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by st...@apache.org on 2015/10/08 16:31:46 UTC
svn commit: r1707548 [2/4] - in
/sling/trunk/bundles/extensions/discovery/commons: ./
src/main/java/org/apache/sling/discovery/commons/providers/
src/main/java/org/apache/sling/discovery/commons/providers/impl/
src/main/java/org/apache/sling/discovery/...
Added: sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/spi/impl/OakSyncTokenConsistencyService.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/spi/impl/OakSyncTokenConsistencyService.java?rev=1707548&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/spi/impl/OakSyncTokenConsistencyService.java (added)
+++ sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/spi/impl/OakSyncTokenConsistencyService.java Thu Oct 8 14:31:45 2015
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.discovery.commons.providers.spi.impl;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import javax.jcr.Session;
+
+import org.apache.sling.api.resource.LoginException;
+import org.apache.sling.api.resource.ModifiableValueMap;
+import org.apache.sling.api.resource.PersistenceException;
+import org.apache.sling.api.resource.Resource;
+import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.api.resource.ResourceResolverFactory;
+import org.apache.sling.api.resource.ValueMap;
+import org.apache.sling.commons.json.JSONArray;
+import org.apache.sling.commons.json.JSONException;
+import org.apache.sling.commons.json.JSONObject;
+import org.apache.sling.discovery.ClusterView;
+import org.apache.sling.discovery.InstanceDescription;
+import org.apache.sling.discovery.commons.providers.BaseTopologyView;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Inherits the 'sync-token' part from the SyncTokenConsistencyService
+ * and adds the 'wait while backlog' part to it, based on
+ * the Oak discovery-lite descriptor.
+ */
+public class OakSyncTokenConsistencyService extends SyncTokenConsistencyService {
+
+ private static final String IDMAP_PATH = "/var/discovery/commons/idmap";
+
+ static enum BacklogStatus {
+ UNDEFINED /* when there was an error retrieving the backlog status with oak */,
+ HAS_BACKLOG /* when oak's discovery lite descriptor indicated that there is still some backlog */,
+ NO_BACKLOG /* when oak's discovery lite descriptor declared we're backlog-free now */
+ }
+
+ private final Logger logger = LoggerFactory.getLogger(getClass());
+
+ /** TODO: avoid hardcoding the constant here but use an Oak constant class instead if possible */
+ public static final String OAK_DISCOVERYLITE_CLUSTERVIEW = "oak.discoverylite.clusterview";
+
+ private boolean initialized = false;
+
+ private final long waitWhileBacklogTimeoutMillis;
+
+ /**
+ *
+ * @param resourceResolverFactory
+ * @param slingId the local slingId
+ * @param syncTokenTimeoutMillis timeout value in millis after which the
+ * sync-token process is cancelled - or -1 if no timeout should be used there
+ * @param waitWhileBacklogTimeoutMillis timeout value in millis after which
+ * the waiting-while-backlog should be cancelled - or -1 if no timeout should be
+ * used there
+ * @throws LoginException when the login for initialization failed
+ * @throws JSONException when the descriptor wasn't proper json at init time
+ */
+ public OakSyncTokenConsistencyService(ResourceResolverFactory resourceResolverFactory,
+ String slingId, long syncTokenTimeoutMillis, long waitWhileBacklogTimeoutMillis) {
+ super(resourceResolverFactory, slingId, syncTokenTimeoutMillis);
+ this.waitWhileBacklogTimeoutMillis = waitWhileBacklogTimeoutMillis;
+ startBackgroundCheck("idmap-initializer", new BackgroundCheck() {
+
+ @Override
+ public boolean check() {
+ return ensureInitialized();
+ }
+ }, null, -1);
+ }
+
+ private boolean ensureInitialized() {
+ if (initialized) {
+ return true;
+ }
+ logger.info("ensureInitialized: initializing.");
+ try {
+ initialized = init();
+ return initialized;
+ } catch (LoginException e) {
+ logger.error("ensureInitialized: could not login: "+e, e);
+ return false;
+ } catch (JSONException e) {
+ logger.error("ensureInitialized: got JSONException: "+e, e);
+ return false;
+ } catch (PersistenceException e) {
+ logger.error("ensureInitialized: got PersistenceException: "+e, e);
+ return false;
+ }
+ }
+
+ private boolean init() throws LoginException, JSONException, PersistenceException {
+ ResourceResolver resourceResolver = null;
+ try{
+ resourceResolver = getResourceResolver();
+ JSONObject descriptor = getDescriptor(resourceResolver);
+ if (descriptor == null) {
+ logger.info("init: could not yet get descriptor '"+OAK_DISCOVERYLITE_CLUSTERVIEW+"'!");
+ return false;
+ }
+ Object meObj = descriptor.get("me");
+ if (meObj == null || !(meObj instanceof Number)) {
+ logger.info("init: 'me' value of descriptor not a Number: "+meObj+" (descriptor: "+descriptor+")");
+ return false;
+ }
+ Number me = (Number)meObj;
+ final Resource resource = getOrCreateResource(resourceResolver, IDMAP_PATH);
+ ModifiableValueMap idmap = resource.adaptTo(ModifiableValueMap.class);
+ idmap.put(slingId, me.longValue());
+ resourceResolver.commit();
+ logger.info("init: mapped slingId="+slingId+" to discoveryLiteId="+me);
+ return true;
+ } finally {
+ if (resourceResolver!=null) {
+ resourceResolver.close();
+ }
+ }
+
+ }
+
+ @Override
+ public void sync(final BaseTopologyView view, final Runnable callback) {
+ // cancel the previous backgroundCheck if it's still running
+ cancelPreviousBackgroundCheck();
+
+ // first do the wait-for-backlog part
+ logger.info("sync: doing wait-for-backlog part for view="+view);
+ waitWhileBacklog(view, new Runnable() {
+
+ @Override
+ public void run() {
+ // when done, then do the sync-token part
+ logger.info("sync: doing sync-token part for view="+view);
+ syncToken(view, callback);
+ }
+
+ });
+ }
+
+ private void waitWhileBacklog(final BaseTopologyView view, final Runnable runnable) {
+ // start backgroundChecking until the backlogStatus
+ // is NO_BACKLOG
+ startBackgroundCheck("OakSyncTokenConsistencyService-waitWhileBacklog", new BackgroundCheck() {
+
+ @Override
+ public boolean check() {
+ if (!ensureInitialized()) {
+ logger.info("waitWhileBacklog: could not initialize...");
+ return false;
+ }
+ BacklogStatus backlogStatus = getBacklogStatus(view);
+ if (backlogStatus == BacklogStatus.NO_BACKLOG) {
+ logger.info("waitWhileBacklog: no backlog (anymore), done.");
+ return true;
+ } else {
+ logger.info("waitWhileBacklog: backlogStatus still "+backlogStatus);
+ return false;
+ }
+ }
+ }, runnable, waitWhileBacklogTimeoutMillis);
+ }
+
+ private BacklogStatus getBacklogStatus(BaseTopologyView view) {
+ logger.trace("getBacklogStatus: start");
+ ResourceResolver resourceResolver = null;
+ try{
+ resourceResolver = getResourceResolver();
+ JSONObject descriptor = getDescriptor(resourceResolver);
+ if (descriptor == null) {
+ logger.warn("getBacklogStatus: could not get descriptor '"+OAK_DISCOVERYLITE_CLUSTERVIEW+"'!");
+ return BacklogStatus.UNDEFINED;
+ }
+ // backlog-free means:
+ // 1) 'deactivating' must be empty
+ // (otherwise we indeed have a backlog)
+ // 2) all active ids of the descriptor must have a mapping to slingIds
+ // (otherwise the init failed or is pending for some instance(s))
+ // 3) all 'active' instances must be in the view
+ // (otherwise discovery lite might not yet consider
+ // an instance dead but discovery-service does)
+ // instead what is fine from a backlog point of view
+ // * instances in the view but listed as 'inactive'
+ // (this might be the case for just-started instances)
+ // * instances in the view but not contained in the descriptor at all
+ // (this might be the case for just-started instances)
+
+ Object activeObj = descriptor.get("active");
+ JSONArray active = (JSONArray) activeObj;
+ Object deactivatingObj = descriptor.get("deactivating");
+ JSONArray deactivating = (JSONArray) deactivatingObj;
+ // we're not worried about 'inactive' ones - as that could
+ // be a larger list filled with legacy entries too
+ // plus once the instance is inactive there's no need to
+ // check anything further - that one is then backlog-free
+
+ // 1) 'deactivating' must be empty
+ if (deactivating.length()!=0) {
+ logger.info("getBacklogStatus: there are deactivating instances: "+deactivating);
+ return BacklogStatus.HAS_BACKLOG;
+ }
+
+ Resource resource = getOrCreateResource(resourceResolver, IDMAP_PATH);
+ ValueMap idmapValueMap = resource.adaptTo(ValueMap.class);
+ ClusterView cluster = view.getLocalInstance().getClusterView();
+ Set<String> slingIds = new HashSet<String>();
+ for (InstanceDescription instance : cluster.getInstances()) {
+ slingIds.add(instance.getSlingId());
+ }
+ Map<Long, String> idmap = new HashMap<Long, String>();
+ for (String slingId : idmapValueMap.keySet()) {
+ Object value = idmapValueMap.get(slingId);
+ if (value instanceof Number) {
+ Number number = (Number)value;
+ idmap.put(number.longValue(), slingId);
+ }
+ }
+
+ for(int i=0; i<active.length(); i++) {
+ Number activeId = (Number) active.get(i);
+ String slingId = idmap.get(activeId.longValue());
+ // 2) all ids of the descriptor must have a mapping to slingIds
+ if (slingId == null) {
+ logger.info("getBacklogStatus: no slingId found for active id: "+activeId);
+ return BacklogStatus.UNDEFINED;
+ }
+ // 3) all 'active' instances must be in the view
+ if (!slingIds.contains(slingId)) {
+ logger.info("getBacklogStatus: active instance's ("+activeId+") slingId ("+slingId+") not found in cluster ("+cluster+")");
+ return BacklogStatus.HAS_BACKLOG;
+ }
+ }
+
+ logger.info("getBacklogStatus: no backlog (anymore)");
+ return BacklogStatus.NO_BACKLOG;
+ } catch (LoginException e) {
+ logger.error("getBacklogStatus: could not login: "+e, e);
+ return BacklogStatus.UNDEFINED;
+ } catch (JSONException e) {
+ logger.error("getBacklogStatus: got JSONException: "+e, e);
+ return BacklogStatus.UNDEFINED;
+ } catch (PersistenceException e) {
+ logger.error("getBacklogStatus: got PersistenceException: "+e, e);
+ return BacklogStatus.UNDEFINED;
+ } finally {
+ logger.trace("getBacklogStatus: end");
+ if (resourceResolver!=null) {
+ resourceResolver.close();
+ }
+ }
+ }
+
+ /**
+ * {"seq":8,"final":true,"id":"aae34e9a-b08d-409e-be10-9ff4106e5387","me":4,"active":[4],"deactivating":[],"inactive":[1,2,3]}
+ */
+ private JSONObject getDescriptor(ResourceResolver resourceResolver) throws JSONException {
+ Session session = resourceResolver.adaptTo(Session.class);
+ if (session == null) {
+ return null;
+ }
+ String descriptorStr = session.getRepository().getDescriptor(OAK_DISCOVERYLITE_CLUSTERVIEW);
+ if (descriptorStr == null) {
+ return null;
+ }
+ JSONObject descriptor = new JSONObject(descriptorStr);
+ return descriptor;
+ }
+
+}
Propchange: sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/spi/impl/OakSyncTokenConsistencyService.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/spi/impl/SyncTokenConsistencyService.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/spi/impl/SyncTokenConsistencyService.java?rev=1707548&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/spi/impl/SyncTokenConsistencyService.java (added)
+++ sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/spi/impl/SyncTokenConsistencyService.java Thu Oct 8 14:31:45 2015
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.discovery.commons.providers.spi.impl;
+
+import org.apache.sling.api.resource.LoginException;
+import org.apache.sling.api.resource.ModifiableValueMap;
+import org.apache.sling.api.resource.PersistenceException;
+import org.apache.sling.api.resource.Resource;
+import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.api.resource.ResourceResolverFactory;
+import org.apache.sling.api.resource.ResourceUtil;
+import org.apache.sling.api.resource.ValueMap;
+import org.apache.sling.discovery.InstanceDescription;
+import org.apache.sling.discovery.commons.providers.BaseTopologyView;
+import org.apache.sling.discovery.commons.providers.spi.ConsistencyService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implements the 'sync-token' part of the ConsistencyService,
+ * but not the 'wait while backlog' part (which is left to subclasses
+ * if needed).
+ */
+public class SyncTokenConsistencyService implements ConsistencyService {
+
+ private final Logger logger = LoggerFactory.getLogger(getClass());
+
+ private static final String SYNCTOKEN_PATH = "/var/discovery/commons/synctokens";
+
+ private static final String DEFAULT_RESOURCE_TYPE = "sling:Folder";
+
+ protected static Resource getOrCreateResource(
+ final ResourceResolver resourceResolver, final String path)
+ throws PersistenceException {
+ return ResourceUtil.getOrCreateResource(resourceResolver, path,
+ DEFAULT_RESOURCE_TYPE, DEFAULT_RESOURCE_TYPE, true);
+ }
+
+ private final class BackgroundCheckRunnable implements Runnable {
+ private final Runnable callback;
+ private final BackgroundCheck check;
+ private final long timeoutMillis;
+ private volatile boolean cancelled;
+ private final String threadName;
+
+ // for testing only:
+ private final Object waitObj = new Object();
+ private int waitCnt;
+ private volatile boolean done;
+
+ private BackgroundCheckRunnable(Runnable callback,
+ BackgroundCheck check, long timeoutMillis, String threadName) {
+ this.callback = callback;
+ this.check = check;
+ this.timeoutMillis = timeoutMillis;
+ this.threadName = threadName;
+ }
+
+ @Override
+ public void run() {
+ logger.debug("backgroundCheck.run: start");
+ long start = System.currentTimeMillis();
+ try{
+ while(!cancelled()) {
+ if (check.check()) {
+ if (callback != null) {
+ callback.run();
+ }
+ return;
+ }
+ if (timeoutMillis != -1 &&
+ (System.currentTimeMillis() > start + timeoutMillis)) {
+ if (callback == null) {
+ logger.info("backgroundCheck.run: timeout hit (no callback to invoke)");
+ } else {
+ logger.info("backgroundCheck.run: timeout hit, invoking callback.");
+ callback.run();
+ }
+ return;
+ }
+ logger.debug("backgroundCheck.run: waiting another sec.");
+ synchronized(waitObj) {
+ waitCnt++;
+ try {
+ waitObj.notify();
+ waitObj.wait(1000);
+ } catch (InterruptedException e) {
+ logger.info("backgroundCheck.run: got interrupted");
+ }
+ }
+ }
+ logger.debug("backgroundCheck.run: this run got cancelled. {}", check);
+ } catch(RuntimeException re) {
+ logger.error("backgroundCheck.run: RuntimeException: "+re, re);
+ // nevertheless calling runnable.run in this case
+ if (callback != null) {
+ logger.info("backgroundCheck.run: RuntimeException -> invoking callback");
+ callback.run();
+ }
+ throw re;
+ } catch(Error er) {
+ logger.error("backgroundCheck.run: Error: "+er, er);
+ // not calling runnable.run in this case!
+ // since Error is typically severe
+ logger.info("backgroundCheck.run: NOT invoking callback");
+ throw er;
+ } finally {
+ logger.debug("backgroundCheck.run: end");
+ synchronized(waitObj) {
+ done = true;
+ waitObj.notify();
+ }
+ }
+ }
+
+ boolean cancelled() {
+ return cancelled;
+ }
+
+ void cancel() {
+ logger.info("cancel: "+threadName);
+ cancelled = true;
+ }
+
+ public void triggerCheck() {
+ synchronized(waitObj) {
+ int waitCntAtStart = waitCnt;
+ waitObj.notify();
+ while(!done && waitCnt<=waitCntAtStart) {
+ try {
+ waitObj.wait();
+ } catch (InterruptedException e) {
+ throw new RuntimeException("got interrupted");
+ }
+ }
+ }
+ }
+ }
+
+ interface BackgroundCheck {
+
+ boolean check();
+
+ }
+
+ protected final ResourceResolverFactory resourceResolverFactory;
+
+ protected final String slingId;
+
+ private final long syncTokenTimeoutMillis;
+
+ protected BackgroundCheckRunnable backgroundCheckRunnable;
+
+ public SyncTokenConsistencyService(ResourceResolverFactory resourceResolverFactory,
+ String slingId, long syncTokenTimeoutMillis) {
+ if (resourceResolverFactory == null) {
+ throw new IllegalArgumentException("resourceResolverFactory must not be null");
+ }
+ if (slingId == null || slingId.length() == 0) {
+ throw new IllegalArgumentException("slingId must not be null or empty: "+slingId);
+ }
+ this.slingId = slingId;
+ this.resourceResolverFactory = resourceResolverFactory;
+ this.syncTokenTimeoutMillis = syncTokenTimeoutMillis;
+ }
+
+ protected void cancelPreviousBackgroundCheck() {
+ BackgroundCheckRunnable current = backgroundCheckRunnable;
+ if (current!=null) {
+ current.cancel();
+ // leave backgroundCheckRunnable field as is
+ // as that does not represent a memory leak
+ // nor is it a problem to invoke cancel multiple times
+ // but properly synchronizing on just setting backgroundCheckRunnable
+ // back to null is error-prone and overkill
+ }
+ }
+
+ protected void startBackgroundCheck(String threadName, final BackgroundCheck check, final Runnable callback, final long timeoutMillis) {
+ // cancel the current one if it's still running
+ cancelPreviousBackgroundCheck();
+
+ if (check.check()) {
+ // then we're not even going to start the background-thread
+ // we're already done
+ logger.info("backgroundCheck: already done, backgroundCheck successful, invoking callback");
+ callback.run();
+ return;
+ }
+ logger.info("backgroundCheck: spawning background-thread for '"+threadName+"'");
+ backgroundCheckRunnable = new BackgroundCheckRunnable(callback, check, timeoutMillis, threadName);
+ Thread th = new Thread(backgroundCheckRunnable);
+ th.setName(threadName);
+ th.setDaemon(true);
+ th.start();
+ }
+
+ /** Get or create a ResourceResolver **/
+ protected ResourceResolver getResourceResolver() throws LoginException {
+ return resourceResolverFactory.getAdministrativeResourceResolver(null);
+ }
+
+ @Override
+ public void sync(BaseTopologyView view, Runnable callback) {
+ // cancel the previous background-check if it's still running
+ cancelPreviousBackgroundCheck();
+
+ syncToken(view, callback);
+ // this implementation doesn't support wait-for-backlog, so
+ // the above doSyncTokenPart will already terminate with invoking the callback
+ }
+
+ protected void syncToken(final BaseTopologyView view, final Runnable callback) {
+ // 1) first storing my syncToken
+ try {
+ storeMySyncToken(view.getLocalClusterSyncTokenId());
+ } catch (LoginException e) {
+ logger.error("syncToken: will run into timeout: could not login for storing my syncToken: "+e, e);
+ } catch (PersistenceException e) {
+ logger.error("syncToken: will run into timeout: got PersistenceException while storing my syncToken: "+e, e);
+ }
+ // if anything goes wrong above, then this will mean for the others
+ // that they will have to wait until the timeout hits
+ // which means we should do the same..
+ // hence no further action possible on error above
+
+ // 2) then check if all others have done the same already
+ startBackgroundCheck("SyncTokenConsistencyService", new BackgroundCheck() {
+
+ @Override
+ public boolean check() {
+ return seenAllSyncTokens(view);
+ }
+ }, callback, syncTokenTimeoutMillis);
+ }
+
+ private void storeMySyncToken(String syncTokenId) throws LoginException, PersistenceException {
+ logger.trace("storeMySyncToken: start");
+ ResourceResolver resourceResolver = null;
+ try{
+ resourceResolver = getResourceResolver();
+ final Resource resource = getOrCreateResource(resourceResolver, SYNCTOKEN_PATH);
+ ModifiableValueMap syncTokens = resource.adaptTo(ModifiableValueMap.class);
+ Object currentValue = syncTokens.get(slingId);
+ if (currentValue == null || !syncTokenId.equals(currentValue)) {
+ syncTokens.put(slingId, syncTokenId);
+ }
+ resourceResolver.commit();
+ logger.info("syncToken: stored syncToken of slingId="+slingId+" as="+syncTokenId);
+ } finally {
+ logger.trace("storeMySyncToken: end");
+ if (resourceResolver!=null) {
+ resourceResolver.close();
+ }
+ }
+ }
+
+ private boolean seenAllSyncTokens(BaseTopologyView view) {
+ logger.trace("seenAllSyncTokens: start");
+ ResourceResolver resourceResolver = null;
+ try{
+ resourceResolver = getResourceResolver();
+ Resource resource = getOrCreateResource(resourceResolver, SYNCTOKEN_PATH);
+ ValueMap syncTokens = resource.adaptTo(ValueMap.class);
+ String syncToken = view.getLocalClusterSyncTokenId();
+
+ for (InstanceDescription instance : view.getLocalInstance().getClusterView().getInstances()) {
+ Object currentValue = syncTokens.get(instance.getSlingId());
+ if (currentValue == null) {
+ logger.info("seenAllSyncTokens: no syncToken of "+instance);
+ return false;
+ }
+ if (!syncToken.equals(currentValue)) {
+ logger.info("seenAllSyncTokens: old syncToken of " + instance
+ + " : expected=" + syncToken + " got="+currentValue);
+ return false;
+ }
+ }
+
+ resourceResolver.commit();
+ logger.info("seenAllSyncTokens: seen all syncTokens!");
+ return true;
+ } catch (LoginException e) {
+ logger.error("seenAllSyncTokens: could not login: "+e, e);
+ return false;
+ } catch (PersistenceException e) {
+ logger.error("seenAllSyncTokens: got PersistenceException: "+e, e);
+ return false;
+ } finally {
+ logger.trace("seenAllSyncTokens: end");
+ if (resourceResolver!=null) {
+ resourceResolver.close();
+ }
+ }
+ }
+
+ /** for testing only! **/
+ protected void triggerBackgroundCheck() {
+ BackgroundCheckRunnable backgroundOp = backgroundCheckRunnable;
+ if (backgroundOp!=null) {
+ backgroundOp.triggerCheck();
+ }
+ }
+}
Propchange: sling/trunk/bundles/extensions/discovery/commons/src/main/java/org/apache/sling/discovery/commons/providers/spi/impl/SyncTokenConsistencyService.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/impl/ClusterTest.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/impl/ClusterTest.java?rev=1707548&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/impl/ClusterTest.java (added)
+++ sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/impl/ClusterTest.java Thu Oct 8 14:31:45 2015
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.discovery.commons.providers.impl;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.sling.discovery.TopologyEvent;
+import org.apache.sling.discovery.TopologyEvent.Type;
+import org.apache.sling.discovery.commons.providers.BaseTopologyView;
+import org.apache.sling.discovery.commons.providers.spi.ConsistencyService;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ClusterTest {
+
+ private final Logger logger = LoggerFactory.getLogger(this.getClass());
+
+ private List<ViewStateManagerImpl> mgrList;
+
+ private Random defaultRandom;
+
+ @Before
+ public void setup() throws Exception {
+ mgrList = new LinkedList<ViewStateManagerImpl>();
+ defaultRandom = new Random(1234123412); // I want randomness yes, but deterministic, for some methods at least
+ }
+
+ @After
+ public void teardown() throws Exception {
+ mgrList = null;
+ defaultRandom= null;
+ }
+
+ private ViewStateManagerImpl newMgr() {
+ ViewStateManagerImpl mgr = new ViewStateManagerImpl(new ReentrantLock(), new ConsistencyService() {
+
+ public void sync(BaseTopologyView view, Runnable callback) {
+ callback.run();
+ }
+ });
+ mgrList.add(mgr);
+ return mgr;
+ }
+
+ private void waitForInflightEvents(ViewStateManagerImpl mgr) throws InterruptedException {
+ if (mgr==null) {
+ throw new IllegalArgumentException("mgr must not be null");
+ }
+ if (mgr.getAsyncEventSender()==null) {
+ logger.info("waitForInflightEvents: mgr not yet activated...");
+ return;
+ }
+ while(mgr.getAsyncEventSender().hasInFlightEvent()) {
+ logger.info("waitForInflightEvents: waiting 10ms...");
+ Thread.sleep(10);
+ }
+ }
+
+ private void assertCountEvents(ViewStateManagerImpl mgr, Listener l, TopologyEvent.Type... types) throws InterruptedException {
+ waitForInflightEvents(mgr);
+ assertEquals(types.length, l.countEvents());
+ Iterator<TopologyEvent> it = l.getEvents().iterator();
+ int i=0;
+ while(it.hasNext() && (i<types.length)) {
+ TopologyEvent expectedEvent = it.next();
+ Type gotType = types[i++];
+ assertEquals(expectedEvent.getType(), gotType);
+ }
+ if (it.hasNext()) {
+ StringBuffer additionalTypes = new StringBuffer();
+ while(it.hasNext()) {
+ additionalTypes.append(",");
+ additionalTypes.append(it.next().getType());
+ }
+ fail("got more events than expected : "+additionalTypes);
+ }
+ if (i<types.length) {
+ StringBuffer additionalTypes = new StringBuffer();
+ while(i<types.length) {
+ additionalTypes.append(",");
+ additionalTypes.append(types[i++]);
+ }
+ fail("did not get all events, also expected : "+additionalTypes);
+ }
+ }
+
+ private void fail(String string) {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Test
+ public void testTwoNodes() throws Exception {
+ final ViewStateManagerImpl mgr1 = newMgr();
+ final String slingId1 = UUID.randomUUID().toString();
+ final ViewStateManagerImpl mgr2 = newMgr();
+ final String slingId2 = UUID.randomUUID().toString();
+
+ // bind l1
+ Listener l1 = new Listener();
+ mgr1.bind(l1);
+ assertCountEvents(mgr1, l1);
+
+ // bind l2
+ Listener l2 = new Listener();
+ mgr2.bind(l2);
+ assertCountEvents(mgr2, l2);
+
+ // fiddle with l1 - without any events expected to be sent
+ mgr1.handleChanging();
+ assertCountEvents(mgr1, l1);
+ mgr1.handleActivated();
+ assertCountEvents(mgr1, l1);
+ mgr1.handleChanging();
+ assertCountEvents(mgr1, l1);
+
+ // fiddle with l2 - without any events expected to be sent
+ mgr2.handleChanging();
+ assertCountEvents(mgr2, l2);
+ mgr2.handleActivated();
+ assertCountEvents(mgr2, l2);
+ mgr2.handleChanging();
+ assertCountEvents(mgr2, l2);
+
+ // call handleNewView with not-current views first...
+ BaseTopologyView vA1 = TestHelper.newView(false, slingId1, slingId1, slingId1, slingId2);
+ mgr1.handleNewView(vA1);
+ assertCountEvents(mgr1, l1);
+ assertCountEvents(mgr2, l2);
+ BaseTopologyView vB1 = TestHelper.newView(false, slingId1, slingId2, slingId1, slingId2);
+ mgr2.handleNewView(vB1);
+ assertCountEvents(mgr1, l1);
+ assertCountEvents(mgr2, l2);
+
+ // then call handleNewView with a current view - that should now sent the INIT
+ BaseTopologyView vA2 = TestHelper.newView(true, slingId1, slingId1, slingId1, slingId2);
+ mgr1.handleNewView(vA2);
+ assertCountEvents(mgr1, l1, Type.TOPOLOGY_INIT);
+ assertCountEvents(mgr2, l2);
+ BaseTopologyView vB2 = TestHelper.newView(true, slingId1, slingId2, slingId1, slingId2);
+ mgr2.handleNewView(vB2);
+ assertCountEvents(mgr1, l1, Type.TOPOLOGY_INIT);
+ assertCountEvents(mgr2, l2, Type.TOPOLOGY_INIT);
+
+ // now let instance1 get decoupled from the cluster (pseudo-network-partitioning)
+ BaseTopologyView vB3 = TestHelper.newView(true, slingId2, slingId2, slingId2);
+ mgr2.handleNewView(vB3);
+ assertCountEvents(mgr1, l1, Type.TOPOLOGY_INIT);
+ assertCountEvents(mgr2, l2, Type.TOPOLOGY_INIT, Type.TOPOLOGY_CHANGING, Type.TOPOLOGY_CHANGED);
+
+ // now let instance1 take note of this decoupling
+ mgr1.handleChanging();
+ assertCountEvents(mgr1, l1, Type.TOPOLOGY_INIT, Type.TOPOLOGY_CHANGING);
+ assertCountEvents(mgr2, l2, Type.TOPOLOGY_INIT, Type.TOPOLOGY_CHANGING, Type.TOPOLOGY_CHANGED);
+
+ // and now let instance1 rejoin
+ BaseTopologyView vA4 = TestHelper.newView(true, slingId2, slingId1, slingId1, slingId2);
+ mgr1.handleNewView(vA4);
+ assertCountEvents(mgr1, l1, Type.TOPOLOGY_INIT, Type.TOPOLOGY_CHANGING, Type.TOPOLOGY_CHANGED);
+ assertCountEvents(mgr2, l2, Type.TOPOLOGY_INIT, Type.TOPOLOGY_CHANGING, Type.TOPOLOGY_CHANGED);
+ BaseTopologyView vB4 = TestHelper.newView(true, slingId2, slingId2, slingId1, slingId2);
+ mgr2.handleNewView(vA4);
+ assertCountEvents(mgr1, l1, Type.TOPOLOGY_INIT, Type.TOPOLOGY_CHANGING, Type.TOPOLOGY_CHANGED);
+ assertCountEvents(mgr2, l2, Type.TOPOLOGY_INIT, Type.TOPOLOGY_CHANGING, Type.TOPOLOGY_CHANGED, Type.TOPOLOGY_CHANGING, Type.TOPOLOGY_CHANGED);
+ }
+
+}
Propchange: sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/impl/ClusterTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/impl/Listener.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/impl/Listener.java?rev=1707548&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/impl/Listener.java (added)
+++ sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/impl/Listener.java Thu Oct 8 14:31:45 2015
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.discovery.commons.providers.impl;
+
+import static org.junit.Assert.fail;
+
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.sling.discovery.TopologyEvent;
+import org.apache.sling.discovery.TopologyEventListener;
+import org.apache.sling.discovery.commons.providers.BaseTopologyView;
+
+public class Listener implements TopologyEventListener {
+
+ private List<TopologyEvent> events = new LinkedList<TopologyEvent>();
+ private TopologyEvent lastEvent;
+
+ public synchronized void handleTopologyEvent(TopologyEvent event) {
+ events.add(event);
+ lastEvent = event;
+ }
+
+ public synchronized List<TopologyEvent> getEvents() {
+ return Collections.unmodifiableList(events);
+ }
+
+ public synchronized int countEvents() {
+ return events.size();
+ }
+
+ public synchronized TopologyEvent getLastEvent() {
+ return lastEvent;
+ }
+
+ public synchronized void clearEvents() {
+ events.clear();
+ }
+
+ public BaseTopologyView getLastView() {
+ if (lastEvent==null) {
+ return null;
+ } else {
+ switch(lastEvent.getType()) {
+ case TOPOLOGY_INIT:
+ case PROPERTIES_CHANGED:
+ case TOPOLOGY_CHANGED: {
+ return (BaseTopologyView) lastEvent.getNewView();
+ }
+ case TOPOLOGY_CHANGING:{
+ return (BaseTopologyView) lastEvent.getOldView();
+ }
+ default: {
+ fail("no other types supported yet");
+ }
+ }
+ }
+ return null;
+ }
+
+}
\ No newline at end of file
Propchange: sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/impl/Listener.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/impl/SimpleClusterView.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/impl/SimpleClusterView.java?rev=1707548&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/impl/SimpleClusterView.java (added)
+++ sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/impl/SimpleClusterView.java Thu Oct 8 14:31:45 2015
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.discovery.commons.providers.impl;
+
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.sling.discovery.ClusterView;
+import org.apache.sling.discovery.InstanceDescription;
+
+public class SimpleClusterView implements ClusterView {
+
+ private final String id;
+ private List<InstanceDescription> instances = new LinkedList<InstanceDescription>();
+
+ public SimpleClusterView(String id) {
+ this.id = id;
+ }
+
+ @Override
+ public String getId() {
+ return id;
+ }
+
+ public void addInstanceDescription(InstanceDescription id) {
+ instances.add(id);
+ }
+
+ public boolean removeInstanceDescription(InstanceDescription id) {
+ return instances.remove(id);
+ }
+
+ @Override
+ public List<InstanceDescription> getInstances() {
+ return Collections.unmodifiableList(instances);
+ }
+
+ @Override
+ public InstanceDescription getLeader() {
+ for (InstanceDescription instanceDescription : instances) {
+ if (instanceDescription.isLeader()) {
+ return instanceDescription;
+ }
+ }
+ throw new IllegalStateException("no leader");
+ }
+
+}
Propchange: sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/impl/SimpleClusterView.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/impl/SimpleDiscoveryService.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/impl/SimpleDiscoveryService.java?rev=1707548&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/impl/SimpleDiscoveryService.java (added)
+++ sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/impl/SimpleDiscoveryService.java Thu Oct 8 14:31:45 2015
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.discovery.commons.providers.impl;
+
+import org.apache.sling.discovery.DiscoveryService;
+import org.apache.sling.discovery.TopologyView;
+import org.apache.sling.discovery.commons.providers.BaseTopologyView;
+
+public class SimpleDiscoveryService implements DiscoveryService {
+
+ private BaseTopologyView topologyView;
+
+ public void setTopoology(BaseTopologyView topologyView) {
+ this.topologyView = topologyView;
+ }
+
+ @Override
+ public TopologyView getTopology() {
+ return topologyView;
+ }
+
+}
Propchange: sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/impl/SimpleDiscoveryService.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/impl/SimpleInstanceDescription.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/impl/SimpleInstanceDescription.java?rev=1707548&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/impl/SimpleInstanceDescription.java (added)
+++ sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/impl/SimpleInstanceDescription.java Thu Oct 8 14:31:45 2015
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.discovery.commons.providers.impl;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.sling.discovery.ClusterView;
+import org.apache.sling.discovery.InstanceDescription;
+
+public class SimpleInstanceDescription implements InstanceDescription {
+
+ private ClusterView clusterView;
+ private final boolean isLeader;
+ private final boolean isLocal;
+ private final String slingId;
+ private final Map<String, String> properties;
+
+ public SimpleInstanceDescription(boolean isLeader, boolean isLocal, String slingId,
+ Map<String, String> properties) {
+ this.isLeader = isLeader;
+ this.isLocal = isLocal;
+ this.slingId = slingId;
+ this.properties = properties;
+ }
+
+ @Override
+ public String toString() {
+ return "Instance["+slingId+"]";
+ }
+
+ @Override
+ public int hashCode() {
+ return slingId.hashCode() + (isLeader?0:1) + (isLocal?0:1);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (!(obj instanceof SimpleInstanceDescription)) {
+ return false;
+ }
+ SimpleInstanceDescription other = (SimpleInstanceDescription) obj;
+ if (!slingId.equals(other.slingId)) {
+ return false;
+ }
+ if (isLeader!=other.isLeader) {
+ return false;
+ }
+ if (isLocal!=other.isLocal) {
+ return false;
+ }
+ return true;
+ }
+
+ public void setClusterView(ClusterView clusterView) {
+ this.clusterView = clusterView;
+ }
+
+ @Override
+ public ClusterView getClusterView() {
+ return clusterView;
+ }
+
+ @Override
+ public boolean isLeader() {
+ return isLeader;
+ }
+
+ @Override
+ public boolean isLocal() {
+ return isLocal;
+ }
+
+ @Override
+ public String getSlingId() {
+ return slingId;
+ }
+
+ @Override
+ public String getProperty(String name) {
+ return properties.get(name);
+ }
+
+ @Override
+ public Map<String, String> getProperties() {
+ if (properties==null) {
+ return new HashMap<String, String>();
+ }
+ return new HashMap<String,String>(properties);
+ }
+
+}
Propchange: sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/impl/SimpleInstanceDescription.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/impl/SimpleScheduler.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/impl/SimpleScheduler.java?rev=1707548&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/impl/SimpleScheduler.java (added)
+++ sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/impl/SimpleScheduler.java Thu Oct 8 14:31:45 2015
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.discovery.commons.providers.impl;
+
+import java.io.Serializable;
+import java.util.Date;
+import java.util.Map;
+import java.util.NoSuchElementException;
+
+import org.apache.sling.commons.scheduler.Scheduler;
+
+public class SimpleScheduler implements Scheduler {
+
+ private boolean failMode;
+
+ @Override
+ public void addJob(String name, Object job, Map<String, Serializable> config, String schedulingExpression,
+ boolean canRunConcurrently) throws Exception {
+ throw new IllegalStateException("not yet impl");
+ }
+
+ @Override
+ public void addPeriodicJob(String name, Object job, Map<String, Serializable> config, long period, boolean canRunConcurrently)
+ throws Exception {
+ throw new IllegalStateException("not yet impl");
+ }
+
+ @Override
+ public void addPeriodicJob(String name, Object job, Map<String, Serializable> config, long period, boolean canRunConcurrently,
+ boolean startImmediate) throws Exception {
+ throw new IllegalStateException("not yet impl");
+ }
+
+ @Override
+ public void fireJob(Object job, Map<String, Serializable> config) throws Exception {
+ throw new IllegalStateException("not yet impl");
+ }
+
+ @Override
+ public boolean fireJob(Object job, Map<String, Serializable> config, int times, long period) {
+ throw new IllegalStateException("not yet impl");
+ }
+
+ @Override
+ public void fireJobAt(String name, final Object job, Map<String, Serializable> config, final Date date) throws Exception {
+ if (!(job instanceof Runnable)) {
+ throw new IllegalArgumentException("only runnables supported for now");
+ }
+ final Runnable j = (Runnable)job;
+ Runnable r = new Runnable() {
+
+ @Override
+ public void run() {
+ while (System.currentTimeMillis()<date.getTime()) {
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ Thread.yield();
+ }
+ }
+ j.run();
+ }
+
+ };
+ async(r, name);
+ }
+
+ private void async(Runnable r, String name) {
+ if (failMode) {
+ throw new IllegalStateException("failMode");
+ }
+ Thread th = new Thread(r);
+ th.setName("async test thread for "+name);
+ th.setDaemon(true);
+ th.start();
+ }
+
+ @Override
+ public boolean fireJobAt(String name, Object job, Map<String, Serializable> config, Date date, int times, long period) {
+ throw new IllegalStateException("not yet impl");
+ }
+
+ @Override
+ public void removeJob(String name) throws NoSuchElementException {
+ throw new IllegalStateException("not yet impl");
+ }
+
+ public void failMode() {
+ failMode = true;
+ }
+
+}
Propchange: sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/impl/SimpleScheduler.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/impl/SimpleTopologyView.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/impl/SimpleTopologyView.java?rev=1707548&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/impl/SimpleTopologyView.java (added)
+++ sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/impl/SimpleTopologyView.java Thu Oct 8 14:31:45 2015
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.discovery.commons.providers.impl;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.sling.discovery.ClusterView;
+import org.apache.sling.discovery.InstanceDescription;
+import org.apache.sling.discovery.InstanceFilter;
+import org.apache.sling.discovery.commons.providers.BaseTopologyView;
+
+public class SimpleTopologyView extends BaseTopologyView {
+
+ private List<InstanceDescription> instances = new LinkedList<InstanceDescription>();
+
+ private final String id;
+
+ public SimpleTopologyView() {
+ id = UUID.randomUUID().toString();
+ }
+
+ public SimpleTopologyView(String id) {
+ this.id = id;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (!(obj instanceof SimpleTopologyView)) {
+ return false;
+ }
+ final SimpleTopologyView other = (SimpleTopologyView) obj;
+ if (this==other) {
+ return true;
+ }
+ if (!id.equals(other.id)) {
+ return false;
+ }
+ if (this.instances.size()!=other.instances.size()) {
+ return false;
+ }
+ for (Iterator<InstanceDescription> it = instances.iterator(); it.hasNext();) {
+ InstanceDescription instanceDescription = (InstanceDescription) it
+ .next();
+ boolean found = false;
+ for (Iterator<?> it2 = other.instances.iterator(); it2
+ .hasNext();) {
+ InstanceDescription otherId = (InstanceDescription) it2
+ .next();
+ if (instanceDescription.equals(otherId)) {
+ found = true;
+ break;
+ }
+ }
+ if (!found) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ int c=0;
+ for (Iterator<InstanceDescription> it = instances.iterator(); it.hasNext();) {
+ InstanceDescription instanceDescription = (InstanceDescription) it
+ .next();
+ c+=instanceDescription.hashCode();
+ }
+ return c;
+ }
+
+ public void addInstanceDescription(InstanceDescription id) {
+ instances.add(id);
+ }
+
+ @Override
+ public InstanceDescription getLocalInstance() {
+ InstanceDescription result = null;
+ for (Iterator<InstanceDescription> it = instances.iterator(); it.hasNext();) {
+ InstanceDescription instanceDescription = (InstanceDescription) it
+ .next();
+ if (instanceDescription.isLocal()) {
+ if (result!=null) {
+ throw new IllegalStateException("multiple local instances");
+ }
+ result = instanceDescription;
+ }
+ }
+ if (result==null) {
+ throw new IllegalStateException("no local instance found");
+ }
+ return result;
+ }
+
+ @Override
+ public Set<InstanceDescription> getInstances() {
+ return new HashSet<InstanceDescription>(instances);
+ }
+
+ @Override
+ public Set<InstanceDescription> findInstances(InstanceFilter filter) {
+ throw new IllegalStateException("not yet implemented");
+ }
+
+ @Override
+ public Set<ClusterView> getClusterViews() {
+ Set<ClusterView> clusters = new HashSet<ClusterView>();
+ for (InstanceDescription instanceDescription : instances) {
+ clusters.add(instanceDescription.getClusterView());
+ }
+ return clusters;
+ }
+
+ @Override
+ public String getLocalClusterSyncTokenId() {
+ return id;
+ }
+
+ public SimpleTopologyView addInstance() {
+ final String slingId = UUID.randomUUID().toString();
+ final String clusterId = UUID.randomUUID().toString();
+ final SimpleClusterView cluster = new SimpleClusterView(clusterId);
+ final SimpleInstanceDescription instance = new SimpleInstanceDescription(true, true, slingId, null);
+ instance.setClusterView(cluster);
+ cluster.addInstanceDescription(instance);
+ instances.add(instance);
+ return this;
+ }
+
+ public SimpleTopologyView addInstance(String slingId, SimpleClusterView cluster, boolean isLeader, boolean isLocal) {
+ final SimpleInstanceDescription instance = new SimpleInstanceDescription(isLeader, isLocal, slingId, null);
+ instance.setClusterView(cluster);
+ cluster.addInstanceDescription(instance);
+ instances.add(instance);
+ return this;
+ }
+
+ public SimpleTopologyView addInstance(InstanceDescription artefact) {
+ final String slingId = artefact.getSlingId();
+ final boolean isLeader = artefact.isLeader();
+ final boolean isLocal = artefact.isLocal();
+ SimpleClusterView cluster = (SimpleClusterView) artefact.getClusterView();
+ final SimpleInstanceDescription instance = new SimpleInstanceDescription(isLeader, isLocal, slingId, null);
+ instance.setClusterView(cluster);
+ cluster.addInstanceDescription(instance);
+ instances.add(instance);
+ return this;
+ }
+
+ public SimpleTopologyView removeInstance(String slingId) {
+ for (Iterator<InstanceDescription> it = instances.iterator(); it.hasNext();) {
+ InstanceDescription id = (InstanceDescription) it.next();
+ if (id.getSlingId().equals(slingId)) {
+ it.remove();
+ SimpleClusterView cluster = (SimpleClusterView) id.getClusterView();
+ if (!cluster.removeInstanceDescription(id)) {
+ throw new IllegalStateException("could not remove id: "+id);
+ }
+ return this;
+ }
+ }
+ throw new IllegalStateException("instance not found: "+slingId);
+ }
+
+ public static SimpleTopologyView clone(final SimpleTopologyView view) {
+ final SimpleTopologyView result = new SimpleTopologyView(view.id);
+ final Iterator<InstanceDescription> it = view.getInstances().iterator();
+ Map<String,SimpleClusterView> clusters = new HashMap<String, SimpleClusterView>();
+ while(it.hasNext()) {
+ InstanceDescription id = it.next();
+ SimpleInstanceDescription clone = clone(id);
+ String clusterId = id.getClusterView().getId();
+ SimpleClusterView cluster = clusters.get(clusterId);
+ if (cluster==null) {
+ cluster = new SimpleClusterView(clusterId);
+ clusters.put(clusterId, cluster);
+ }
+ clone.setClusterView(cluster);
+ result.addInstance(clone);
+ }
+ return result;
+ }
+
+ private static SimpleInstanceDescription clone(InstanceDescription id) {
+ return new SimpleInstanceDescription(id.isLeader(), id.isLocal(), id.getSlingId(), id.getProperties());
+ }
+
+ public SimpleTopologyView clone() {
+ return SimpleTopologyView.clone(this);
+ }
+}
Propchange: sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/impl/SimpleTopologyView.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/impl/TestHelper.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/impl/TestHelper.java?rev=1707548&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/impl/TestHelper.java (added)
+++ sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/impl/TestHelper.java Thu Oct 8 14:31:45 2015
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.discovery.commons.providers.impl;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
+
+import java.util.Random;
+import java.util.UUID;
+
+import org.apache.sling.discovery.TopologyEvent;
+import org.apache.sling.discovery.commons.providers.BaseTopologyView;
+import org.apache.sling.discovery.commons.providers.EventFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestHelper {
+
+ private static final Logger logger = LoggerFactory.getLogger(TestHelper.class);
+
+ public static void assertEvents(ViewStateManagerImpl mgr, Listener listener, TopologyEvent... events) {
+ waitForAsyncEvents(mgr);
+ assertEquals(events.length, listener.countEvents());
+ for (int i = 0; i < events.length; i++) {
+ TopologyEvent e = events[i];
+ assertEquals(e.getType(), listener.getEvents().get(i).getType());
+ switch(e.getType()) {
+ case TOPOLOGY_INIT: {
+ assertNull(listener.getEvents().get(i).getOldView());
+ assertEquals(e.getNewView(), listener.getEvents().get(i).getNewView());
+ break;
+ }
+ case TOPOLOGY_CHANGING: {
+ assertEquals(e.getOldView(), listener.getEvents().get(i).getOldView());
+ assertNull(listener.getEvents().get(i).getNewView());
+ break;
+ }
+ case PROPERTIES_CHANGED:
+ case TOPOLOGY_CHANGED: {
+ assertEquals(e.getOldView(), listener.getEvents().get(i).getOldView());
+ assertEquals(e.getNewView(), listener.getEvents().get(i).getNewView());
+ break;
+ }
+ default: {
+ fail("no other type supported yet");
+ }
+ }
+ }
+ listener.clearEvents();
+ }
+
+ public static void waitForAsyncEvents(ViewStateManagerImpl mgr) {
+ int sleep = 1;
+ while(true) {
+ if (!mgr.getAsyncEventSender().hasInFlightEvent()) {
+ return;
+ }
+
+ // sleep outside of synchronized to keep test-influence
+ // to a minimum
+ try {
+ Thread.sleep(sleep);
+ } catch (InterruptedException e) {
+ logger.error("waitForFlush: got interrupted: "+e, e);
+ }
+ // minor back-off up until 20ms
+ sleep = Math.min(20, sleep+1);
+ }
+ }
+
+ public static void assertNoEvents(Listener listener) {
+ assertEquals(0, listener.countEvents());
+ }
+
+ /** does couple loops randomly calling handleChanging() (or not) and then handleNewView().
+ * Note: random is passed to allow customizing and not hardcoding this method to a particular random
+ * @throws InterruptedException **/
+ public static void randomEventLoop(ViewStateManagerImpl mgr, SimpleDiscoveryService sds, int loopSize, int delayInMillis, final Random random, Listener... listeners) throws InterruptedException {
+ for(int i=0; i<loopSize; i++) {
+ final boolean shouldCallChanging = random.nextBoolean();
+ if (shouldCallChanging) {
+ // dont always do a changing
+ logger.debug("randomEventLoop: calling handleChanging...");
+ mgr.handleChanging();
+ // must first wait for async events to have been processed - as otherwise
+ // the 'getLastView()' might not return the correct view
+ logger.debug("randomEventLoop: waiting for async events....");
+ waitForAsyncEvents(mgr);
+ logger.debug("randomEventLoop: asserting CHANGING event was sent...");
+ for(int j=0; j<listeners.length; j++) {
+ assertEvents(mgr, listeners[j], EventFactory.newChangingEvent(listeners[j].getLastView()));
+ }
+ } else {
+ logger.debug("randomEventLoop: asserting no events...");
+ for(int j=0; j<listeners.length; j++) {
+ assertNoEvents(listeners[j]);
+ }
+ }
+ final BaseTopologyView view = new SimpleTopologyView().addInstance();
+ BaseTopologyView[] lastViews = new BaseTopologyView[listeners.length];
+ for(int j=0; j<listeners.length; j++) {
+ lastViews[j] = listeners[j].getLastView();
+ }
+ logger.debug("randomEventLoop: calling handleNewView");
+ if (sds!=null) {
+ sds.setTopoology(view);
+ }
+ mgr.handleNewView(view);
+ if (delayInMillis>0) {
+ logger.debug("randomEventLoop: waiting "+delayInMillis+"ms ...");
+ Thread.sleep(delayInMillis);
+ logger.debug("randomEventLoop: waiting "+delayInMillis+"ms done.");
+ }
+ if (!shouldCallChanging) {
+ // in that case I should still get a CHANGING - by contract
+ logger.debug("randomEventLoop: asserting CHANGING, CHANGED events were sent");
+ for(int j=0; j<listeners.length; j++) {
+ assertEvents(mgr, listeners[j], EventFactory.newChangingEvent(lastViews[j]), EventFactory.newChangedEvent(lastViews[j], view));
+ }
+ } else {
+ logger.debug("randomEventLoop: asserting CHANGED event was sent");
+ for(int j=0; j<listeners.length; j++) {
+ assertEvents(mgr, listeners[j], EventFactory.newChangedEvent(lastViews[j], view));
+ }
+ }
+ }
+ }
+
+ public static SimpleTopologyView newView(boolean isCurrent, String leaderId, String localId, String... slingIds) {
+ return newView(UUID.randomUUID().toString(), UUID.randomUUID().toString(), isCurrent, leaderId, localId, slingIds);
+ }
+
+ public static SimpleTopologyView newView(String syncId, String clusterId, boolean isCurrent, String leaderId, String localId, String... slingIds) {
+ SimpleTopologyView topology = new SimpleTopologyView(syncId);
+ SimpleClusterView cluster = new SimpleClusterView(clusterId);
+ for (String slingId : slingIds) {
+ SimpleInstanceDescription id = new SimpleInstanceDescription(
+ slingId.equals(leaderId), slingId.equals(localId), slingId, null);
+ id.setClusterView(cluster);
+ cluster.addInstanceDescription(id);
+ topology.addInstanceDescription(id);
+ }
+ if (!isCurrent) {
+ topology.setNotCurrent();
+ }
+ return topology;
+ }
+}
Propchange: sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/impl/TestHelper.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/impl/TestMinEventDelayHandler.java
URL: http://svn.apache.org/viewvc/sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/impl/TestMinEventDelayHandler.java?rev=1707548&view=auto
==============================================================================
--- sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/impl/TestMinEventDelayHandler.java (added)
+++ sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/impl/TestMinEventDelayHandler.java Thu Oct 8 14:31:45 2015
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.discovery.commons.providers.impl;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Random;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.log4j.Level;
+import org.apache.log4j.LogManager;
+import org.apache.sling.discovery.commons.providers.BaseTopologyView;
+import org.apache.sling.discovery.commons.providers.EventFactory;
+import org.apache.sling.discovery.commons.providers.spi.ConsistencyService;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestMinEventDelayHandler {
+
+ private final Logger logger = LoggerFactory.getLogger(this.getClass());
+
+ private ViewStateManagerImpl mgr;
+
+ private Random defaultRandom;
+
+ private SimpleDiscoveryService sds;
+
+ private Level logLevel;
+
+ private SimpleScheduler scheduler;
+
+ @Before
+ public void setup() throws Exception {
+ mgr = new ViewStateManagerImpl(new ReentrantLock(), new ConsistencyService() {
+
+ public void sync(BaseTopologyView view, Runnable callback) {
+ callback.run();
+ }
+ });
+ defaultRandom = new Random(1234123412); // I want randomness yes, but deterministic, for some methods at least
+
+ scheduler = new SimpleScheduler();
+ sds = new SimpleDiscoveryService();
+ mgr.installMinEventDelayHandler(sds, scheduler, 1);
+
+ final org.apache.log4j.Logger discoveryLogger = LogManager.getRootLogger().getLogger("org.apache.sling.discovery");
+ logLevel = discoveryLogger.getLevel();
+ discoveryLogger.setLevel(Level.DEBUG);
+ }
+
+ @After
+ public void teardown() throws Exception {
+ mgr = null;
+ defaultRandom= null;
+ final org.apache.log4j.Logger discoveryLogger = LogManager.getRootLogger().getLogger("org.apache.sling.discovery");
+ discoveryLogger.setLevel(logLevel);
+ }
+
+ private void assertNoEvents(Listener listener) {
+ assertEquals(0, listener.countEvents());
+ }
+
+ @Test
+ public void testNormalDelaying() throws Exception {
+ final Listener listener = new Listener();
+ // first activate
+ logger.info("testNormalDelaying: calling handleActivated...");
+ mgr.handleActivated();
+ assertNoEvents(listener); // paranoia
+ // then bind
+ logger.info("testNormalDelaying: calling bind...");
+ mgr.bind(listener);
+ assertNoEvents(listener); // there was no changing or changed yet
+ logger.info("testNormalDelaying: calling handleChanging...");
+ mgr.handleChanging();
+ assertNoEvents(listener);
+ final BaseTopologyView view = new SimpleTopologyView().addInstance();
+ logger.info("testNormalDelaying: calling handleNewView...");
+ mgr.handleNewView(view);
+ TestHelper.assertEvents(mgr, listener, EventFactory.newInitEvent(view));
+ for(int i=0; i<7; i++) {
+ logger.info("testNormalDelaying: calling randomEventLoop...");
+ TestHelper.randomEventLoop(mgr, sds, 4, 1500, defaultRandom, listener);
+ Thread.sleep(1000);
+ }
+ }
+
+ @Test
+ public void testFailedDelaying() throws Exception {
+ scheduler.failMode();
+ final Listener listener = new Listener();
+ // first activate
+ mgr.handleActivated();
+ assertNoEvents(listener); // paranoia
+ // then bind
+ mgr.bind(listener);
+ assertNoEvents(listener); // there was no changing or changed yet
+ mgr.handleChanging();
+ assertNoEvents(listener);
+ final BaseTopologyView view = new SimpleTopologyView().addInstance();
+ mgr.handleNewView(view);
+ TestHelper.assertEvents(mgr, listener, EventFactory.newInitEvent(view));
+ for(int i=0; i<7; i++) {
+ TestHelper.randomEventLoop(mgr, sds, 100, -1, defaultRandom, listener);
+ Thread.sleep(1000);
+ }
+ }
+}
Propchange: sling/trunk/bundles/extensions/discovery/commons/src/test/java/org/apache/sling/discovery/commons/providers/impl/TestMinEventDelayHandler.java
------------------------------------------------------------------------------
svn:eol-style = native