You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sling.apache.org by ro...@apache.org on 2017/11/07 09:26:26 UTC
[sling-org-apache-sling-discovery-commons] 23/38: SLING-5173 :
introducing a more explicit chain concept for ConsistencyServices than the
previous hidden/implicit one: ConsistencyServiceChain
This is an automated email from the ASF dual-hosted git repository.
rombert pushed a commit to annotated tag org.apache.sling.discovery.commons-1.0.0
in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-discovery-commons.git
commit 36ac9663903dee76f89f7472f5b1287db61f3344
Author: Stefan Egli <st...@apache.org>
AuthorDate: Thu Oct 22 15:31:45 2015 +0000
SLING-5173 : introducing a more explicit chain concept for ConsistencyServices than the previous hidden/implicit one: ConsistencyServiceChain
git-svn-id: https://svn.apache.org/repos/asf/sling/trunk/bundles/extensions/discovery/commons@1710035 13f79535-47bb-0310-9956-ffa450edef68
---
.../base/AbstractServiceWithBackgroundCheck.java | 65 ++++++++++
.../spi/base/ConsistencyServiceChain.java | 82 +++++++++++++
...vice.java => OakBacklogConsistencyService.java} | 47 ++++----
...rvice.java => SyncTokenConsistencyService.java} | 131 ++++++++++-----------
.../spi/base/SyncTokenOnlyConsistencyService.java | 101 ----------------
5 files changed, 229 insertions(+), 197 deletions(-)
diff --git a/src/main/java/org/apache/sling/discovery/commons/providers/spi/base/AbstractServiceWithBackgroundCheck.java b/src/main/java/org/apache/sling/discovery/commons/providers/spi/base/AbstractServiceWithBackgroundCheck.java
index 8ccae9f..3e8b93b 100644
--- a/src/main/java/org/apache/sling/discovery/commons/providers/spi/base/AbstractServiceWithBackgroundCheck.java
+++ b/src/main/java/org/apache/sling/discovery/commons/providers/spi/base/AbstractServiceWithBackgroundCheck.java
@@ -18,6 +18,17 @@
*/
package org.apache.sling.discovery.commons.providers.spi.base;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.sling.api.resource.LoginException;
+import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.api.resource.ResourceResolverFactory;
+import org.apache.sling.discovery.commons.providers.BaseTopologyView;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -28,8 +39,21 @@ import org.slf4j.LoggerFactory;
*/
public abstract class AbstractServiceWithBackgroundCheck {
+ class HistoryEntry {
+ BaseTopologyView view;
+ String msg;
+ String fullLine;
+ }
+
+ /** the date format used in the truncated log of topology events **/
+ private final SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss z");
+
protected final Logger logger = LoggerFactory.getLogger(getClass());
+ protected String slingId;
+
+ protected List<HistoryEntry> history = new LinkedList<HistoryEntry>();
+
/**
* The BackgroundCheckRunnable implements the details of
* calling BackgroundCheck.check and looping until it
@@ -215,4 +239,45 @@ public abstract class AbstractServiceWithBackgroundCheck {
backgroundOp.triggerCheck();
}
}
+
+ public List<String> getSyncHistory() {
+ List<HistoryEntry> snapshot;
+ synchronized(history) {
+ snapshot = Collections.unmodifiableList(history);
+ }
+ List<String> result = new ArrayList<String>(snapshot.size());
+ for (HistoryEntry historyEntry : snapshot) {
+ result.add(historyEntry.fullLine);
+ }
+ return result;
+ }
+
+ protected void addHistoryEntry(BaseTopologyView view, String msg) {
+ synchronized(history) {
+ for(int i = history.size() - 1; i>=0; i--) {
+ HistoryEntry entry = history.get(i);
+ if (!entry.view.equals(view)) {
+ // don't filter if the view starts differing,
+ // only filter for the last few entries where
+ // the view is equal
+ break;
+ }
+ if (entry.msg.equals(msg)) {
+ // if the view is equal and the msg matches
+ // then this is a duplicate entry, so ignore
+ return;
+ }
+ }
+ String fullLine = sdf.format(Calendar.getInstance().getTime()) + ": " + msg;
+ HistoryEntry newEntry = new HistoryEntry();
+ newEntry.view = view;
+ newEntry.fullLine = fullLine;
+ newEntry.msg = msg;
+ history.add(newEntry);
+ while (history.size() > 12) {
+ history.remove(0);
+ }
+ }
+ }
+
}
diff --git a/src/main/java/org/apache/sling/discovery/commons/providers/spi/base/ConsistencyServiceChain.java b/src/main/java/org/apache/sling/discovery/commons/providers/spi/base/ConsistencyServiceChain.java
new file mode 100644
index 0000000..7695fe6
--- /dev/null
+++ b/src/main/java/org/apache/sling/discovery/commons/providers/spi/base/ConsistencyServiceChain.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.discovery.commons.providers.spi.base;
+
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.sling.discovery.commons.providers.BaseTopologyView;
+import org.apache.sling.discovery.commons.providers.spi.ConsistencyService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Allows chaining of ConsistencyServices, itself implementing
+ * the ConsistencyService interface
+ */
+public class ConsistencyServiceChain implements ConsistencyService {
+
+ protected final Logger logger = LoggerFactory.getLogger(getClass());
+
+ private final List<ConsistencyService> chain;
+
+ /**
+ * Creates a new chain of ConsistencyServices that calls a
+ * cascaded sync with the provided ConsistencyServices.
+ */
+ public ConsistencyServiceChain(ConsistencyService... chain) {
+ if (chain==null || chain.length==0) {
+ throw new IllegalArgumentException("chain must be 1 or more");
+ }
+ this.chain = Arrays.asList(chain);
+ }
+
+ @Override
+ public void sync(BaseTopologyView view, Runnable callback) {
+ final Iterator<ConsistencyService> chainIt = chain.iterator();
+ chainedSync(view, callback, chainIt);
+ }
+
+ private void chainedSync(final BaseTopologyView view, final Runnable callback,
+ final Iterator<ConsistencyService> chainIt) {
+ if (!chainIt.hasNext()) {
+ logger.debug("doSync: done with sync chain, invoking callback");
+ callback.run();
+ return;
+ }
+ ConsistencyService next = chainIt.next();
+ next.sync(view, new Runnable() {
+
+ @Override
+ public void run() {
+ chainedSync(view, callback, chainIt);
+ }
+
+ });
+ }
+
+ @Override
+ public void cancelSync() {
+ for (ConsistencyService consistencyService : chain) {
+ consistencyService.cancelSync();
+ }
+ }
+
+}
diff --git a/src/main/java/org/apache/sling/discovery/commons/providers/spi/base/OakSyncTokenConsistencyService.java b/src/main/java/org/apache/sling/discovery/commons/providers/spi/base/OakBacklogConsistencyService.java
similarity index 89%
rename from src/main/java/org/apache/sling/discovery/commons/providers/spi/base/OakSyncTokenConsistencyService.java
rename to src/main/java/org/apache/sling/discovery/commons/providers/spi/base/OakBacklogConsistencyService.java
index 55c3411..48eb477 100644
--- a/src/main/java/org/apache/sling/discovery/commons/providers/spi/base/OakSyncTokenConsistencyService.java
+++ b/src/main/java/org/apache/sling/discovery/commons/providers/spi/base/OakBacklogConsistencyService.java
@@ -36,13 +36,12 @@ import org.apache.sling.discovery.commons.providers.spi.ConsistencyService;
import org.apache.sling.settings.SlingSettingsService;
/**
- * Inherits the 'sync-token' part from the SyncTokenConsistencyService
- * and adds the 'wait while backlog' part to it, based on
- * the Oak discovery-lite descriptor.
+ * The OakBacklogConsistencyService will wait until all instances
+ * in the local cluster are no longer in any backlog state.
*/
@Component(immediate = false)
-@Service(value = { ConsistencyService.class, OakSyncTokenConsistencyService.class })
-public class OakSyncTokenConsistencyService extends BaseSyncTokenConsistencyService {
+@Service(value = { ConsistencyService.class, OakBacklogConsistencyService.class })
+public class OakBacklogConsistencyService extends AbstractServiceWithBackgroundCheck implements ConsistencyService {
static enum BacklogStatus {
UNDEFINED /* when there was an error retrieving the backlog status with oak */,
@@ -62,12 +61,12 @@ public class OakSyncTokenConsistencyService extends BaseSyncTokenConsistencyServ
@Reference
protected SlingSettingsService settingsService;
- public static OakSyncTokenConsistencyService testConstructorAndActivate(
+ public static OakBacklogConsistencyService testConstructorAndActivate(
final DiscoveryLiteConfig commonsConfig,
final IdMapService idMapService,
final SlingSettingsService settingsService,
ResourceResolverFactory resourceResolverFactory) {
- OakSyncTokenConsistencyService service = testConstructor(commonsConfig, idMapService, settingsService, resourceResolverFactory);
+ OakBacklogConsistencyService service = testConstructor(commonsConfig, idMapService, settingsService, resourceResolverFactory);
service.activate();
return service;
}
@@ -84,12 +83,12 @@ public class OakSyncTokenConsistencyService extends BaseSyncTokenConsistencyServ
* @throws LoginException when the login for initialization failed
* @throws JSONException when the descriptor wasn't proper json at init time
*/
- public static OakSyncTokenConsistencyService testConstructor(
+ public static OakBacklogConsistencyService testConstructor(
final DiscoveryLiteConfig commonsConfig,
final IdMapService idMapService,
final SlingSettingsService settingsService,
ResourceResolverFactory resourceResolverFactory) {
- OakSyncTokenConsistencyService service = new OakSyncTokenConsistencyService();
+ OakBacklogConsistencyService service = new OakBacklogConsistencyService();
if (commonsConfig == null) {
throw new IllegalArgumentException("commonsConfig must not be null");
}
@@ -112,6 +111,16 @@ public class OakSyncTokenConsistencyService extends BaseSyncTokenConsistencyServ
logger.info("activate: activated with slingId="+slingId);
}
+ /** Get or create a ResourceResolver **/
+ protected ResourceResolver getResourceResolver() throws LoginException {
+ return resourceResolverFactory.getAdministrativeResourceResolver(null);
+ }
+
+ @Override
+ public void cancelSync() {
+ cancelPreviousBackgroundCheck();
+ }
+
@Override
public void sync(final BaseTopologyView view, final Runnable callback) {
// cancel the previous backgroundCheck if it's still running
@@ -119,16 +128,7 @@ public class OakSyncTokenConsistencyService extends BaseSyncTokenConsistencyServ
// first do the wait-for-backlog part
logger.info("sync: doing wait-for-backlog part for view="+view.toShortString());
- waitWhileBacklog(view, new Runnable() {
-
- @Override
- public void run() {
- // when done, then do the sync-token part
- logger.info("sync: doing sync-token part for view="+view.toShortString());
- syncToken(view, callback);
- }
-
- });
+ waitWhileBacklog(view, callback);
}
private void waitWhileBacklog(final BaseTopologyView view, final Runnable runnable) {
@@ -234,19 +234,12 @@ public class OakSyncTokenConsistencyService extends BaseSyncTokenConsistencyServ
}
}
- @Override
protected DiscoveryLiteConfig getCommonsConfig() {
return commonsConfig;
}
- @Override
- protected ResourceResolverFactory getResourceResolverFactory() {
- return resourceResolverFactory;
- }
-
- @Override
protected SlingSettingsService getSettingsService() {
return settingsService;
}
-
+
}
diff --git a/src/main/java/org/apache/sling/discovery/commons/providers/spi/base/BaseSyncTokenConsistencyService.java b/src/main/java/org/apache/sling/discovery/commons/providers/spi/base/SyncTokenConsistencyService.java
similarity index 72%
rename from src/main/java/org/apache/sling/discovery/commons/providers/spi/base/BaseSyncTokenConsistencyService.java
rename to src/main/java/org/apache/sling/discovery/commons/providers/spi/base/SyncTokenConsistencyService.java
index ba6d6ec..3750946 100644
--- a/src/main/java/org/apache/sling/discovery/commons/providers/spi/base/BaseSyncTokenConsistencyService.java
+++ b/src/main/java/org/apache/sling/discovery/commons/providers/spi/base/SyncTokenConsistencyService.java
@@ -18,13 +18,10 @@
*/
package org.apache.sling.discovery.commons.providers.spi.base;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Calendar;
-import java.util.Collections;
-import java.util.LinkedList;
-import java.util.List;
-
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.Service;
import org.apache.sling.api.resource.LoginException;
import org.apache.sling.api.resource.ModifiableValueMap;
import org.apache.sling.api.resource.PersistenceException;
@@ -37,43 +34,79 @@ import org.apache.sling.discovery.commons.providers.BaseTopologyView;
import org.apache.sling.discovery.commons.providers.spi.ConsistencyService;
import org.apache.sling.discovery.commons.providers.util.ResourceHelper;
import org.apache.sling.settings.SlingSettingsService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
- * Implements the 'sync-token' part of the ConsistencyService,
- * but not the 'wait while backlog' part (which is left to subclasses
- * if needed).
+ * Implements the syncToken idea: each instance stores a key-value
+ * pair with key=stringId and value=discoveryLiteSequenceNumber
+ * under /var/discovery/oak/syncTokens - and then waits until it
+ * sees the same token from all other instances in the cluster.
+ * This way, once the syncToken is received the local instance
+ * knows that all instances in the cluster are now in TOPOLOGY_CHANGING state
+ * (thus all topology-dependent activity is now stalled and waiting)
+ * and are aware of the new discoveryLite view.
*/
-public abstract class BaseSyncTokenConsistencyService extends AbstractServiceWithBackgroundCheck implements ConsistencyService {
+@Component(immediate = false)
+@Service(value = { ConsistencyService.class, SyncTokenConsistencyService.class })
+public class SyncTokenConsistencyService extends AbstractServiceWithBackgroundCheck implements ConsistencyService {
- class HistoryEntry {
- BaseTopologyView view;
- String msg;
- String fullLine;
- }
-
- /** the date format used in the truncated log of topology events **/
- private final SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss z");
+ protected final Logger logger = LoggerFactory.getLogger(getClass());
- protected String slingId;
+ @Reference
+ protected DiscoveryLiteConfig commonsConfig;
- protected List<HistoryEntry> history = new LinkedList<HistoryEntry>();
-
- protected abstract DiscoveryLiteConfig getCommonsConfig();
+ @Reference
+ protected ResourceResolverFactory resourceResolverFactory;
- protected abstract ResourceResolverFactory getResourceResolverFactory();
+ @Reference
+ protected SlingSettingsService settingsService;
- protected abstract SlingSettingsService getSettingsService();
+ public static SyncTokenConsistencyService testConstructorAndActivate(
+ DiscoveryLiteConfig commonsConfig,
+ ResourceResolverFactory resourceResolverFactory,
+ SlingSettingsService settingsService) {
+ SyncTokenConsistencyService service = testConstructor(commonsConfig, resourceResolverFactory, settingsService);
+ service.activate();
+ return service;
+ }
+
+ public static SyncTokenConsistencyService testConstructor(
+ DiscoveryLiteConfig commonsConfig,
+ ResourceResolverFactory resourceResolverFactory,
+ SlingSettingsService settingsService) {
+ SyncTokenConsistencyService service = new SyncTokenConsistencyService();
+ if (commonsConfig == null) {
+ throw new IllegalArgumentException("commonsConfig must not be null");
+ }
+ if (resourceResolverFactory == null) {
+ throw new IllegalArgumentException("resourceResolverFactory must not be null");
+ }
+ if (settingsService == null) {
+ throw new IllegalArgumentException("settingsService must not be null");
+ }
+ service.commonsConfig = commonsConfig;
+ service.resourceResolverFactory = resourceResolverFactory;
+ service.settingsService = settingsService;
+ return service;
+ }
+
+ @Activate
+ protected void activate() {
+ this.slingId = settingsService.getSlingId();
+ logger.info("activate: activated with slingId="+slingId);
+ }
/** Get or create a ResourceResolver **/
protected ResourceResolver getResourceResolver() throws LoginException {
- return getResourceResolverFactory().getAdministrativeResourceResolver(null);
+ return resourceResolverFactory.getAdministrativeResourceResolver(null);
}
@Override
public void cancelSync() {
cancelPreviousBackgroundCheck();
}
-
+
@Override
public void sync(BaseTopologyView view, Runnable callback) {
// cancel the previous background-check if it's still running
@@ -105,7 +138,7 @@ public abstract class BaseSyncTokenConsistencyService extends AbstractServiceWit
// 2) then check if all others have done the same already
return seenAllSyncTokens(view);
}
- }, callback, getCommonsConfig().getBgTimeoutMillis(), getCommonsConfig().getBgIntervalMillis());
+ }, callback, commonsConfig.getBgTimeoutMillis(), commonsConfig.getBgIntervalMillis());
}
private boolean storeMySyncToken(String syncTokenId) {
@@ -151,7 +184,7 @@ public abstract class BaseSyncTokenConsistencyService extends AbstractServiceWit
}
private String getSyncTokenPath() {
- return getCommonsConfig().getSyncTokenPath();
+ return commonsConfig.getSyncTokenPath();
}
private boolean seenAllSyncTokens(BaseTopologyView view) {
@@ -212,44 +245,4 @@ public abstract class BaseSyncTokenConsistencyService extends AbstractServiceWit
}
}
- public List<String> getSyncHistory() {
- List<HistoryEntry> snapshot;
- synchronized(history) {
- snapshot = Collections.unmodifiableList(history);
- }
- List<String> result = new ArrayList<String>(snapshot.size());
- for (HistoryEntry historyEntry : snapshot) {
- result.add(historyEntry.fullLine);
- }
- return result;
- }
-
- protected void addHistoryEntry(BaseTopologyView view, String msg) {
- synchronized(history) {
- for(int i = history.size() - 1; i>=0; i--) {
- HistoryEntry entry = history.get(i);
- if (!entry.view.equals(view)) {
- // don't filter if the view starts differing,
- // only filter for the last few entries where
- // the view is equal
- break;
- }
- if (entry.msg.equals(msg)) {
- // if the view is equal and the msg matches
- // then this is a duplicate entry, so ignore
- return;
- }
- }
- String fullLine = sdf.format(Calendar.getInstance().getTime()) + ": " + msg;
- HistoryEntry newEntry = new HistoryEntry();
- newEntry.view = view;
- newEntry.fullLine = fullLine;
- newEntry.msg = msg;
- history.add(newEntry);
- while (history.size() > 12) {
- history.remove(0);
- }
- }
- }
-
}
diff --git a/src/main/java/org/apache/sling/discovery/commons/providers/spi/base/SyncTokenOnlyConsistencyService.java b/src/main/java/org/apache/sling/discovery/commons/providers/spi/base/SyncTokenOnlyConsistencyService.java
deleted file mode 100644
index 32438ea..0000000
--- a/src/main/java/org/apache/sling/discovery/commons/providers/spi/base/SyncTokenOnlyConsistencyService.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.sling.discovery.commons.providers.spi.base;
-
-import org.apache.felix.scr.annotations.Activate;
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Reference;
-import org.apache.felix.scr.annotations.Service;
-import org.apache.sling.api.resource.ResourceResolverFactory;
-import org.apache.sling.discovery.commons.providers.spi.ConsistencyService;
-import org.apache.sling.settings.SlingSettingsService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Implements the 'sync-token' part of the ConsistencyService,
- * but not the 'wait while backlog' part (which is left to subclasses
- * if needed).
- */
-@Component(immediate = false)
-@Service(value = { ConsistencyService.class })
-public class SyncTokenOnlyConsistencyService extends BaseSyncTokenConsistencyService {
-
- protected final Logger logger = LoggerFactory.getLogger(getClass());
-
- @Reference
- protected DiscoveryLiteConfig commonsConfig;
-
- @Reference
- protected ResourceResolverFactory resourceResolverFactory;
-
- @Reference
- protected SlingSettingsService settingsService;
-
- public static BaseSyncTokenConsistencyService testConstructorAndActivate(
- DiscoveryLiteConfig commonsConfig,
- ResourceResolverFactory resourceResolverFactory,
- SlingSettingsService settingsService) {
- SyncTokenOnlyConsistencyService service = testConstructor(commonsConfig, resourceResolverFactory, settingsService);
- service.activate();
- return service;
- }
-
- public static SyncTokenOnlyConsistencyService testConstructor(
- DiscoveryLiteConfig commonsConfig,
- ResourceResolverFactory resourceResolverFactory,
- SlingSettingsService settingsService) {
- SyncTokenOnlyConsistencyService service = new SyncTokenOnlyConsistencyService();
- if (commonsConfig == null) {
- throw new IllegalArgumentException("commonsConfig must not be null");
- }
- if (resourceResolverFactory == null) {
- throw new IllegalArgumentException("resourceResolverFactory must not be null");
- }
- if (settingsService == null) {
- throw new IllegalArgumentException("settingsService must not be null");
- }
- service.commonsConfig = commonsConfig;
- service.resourceResolverFactory = resourceResolverFactory;
- service.settingsService = settingsService;
- return service;
- }
-
- @Activate
- protected void activate() {
- this.slingId = getSettingsService().getSlingId();
- logger.info("activate: activated with slingId="+slingId);
- }
-
- @Override
- protected DiscoveryLiteConfig getCommonsConfig() {
- return commonsConfig;
- }
-
- @Override
- protected ResourceResolverFactory getResourceResolverFactory() {
- return resourceResolverFactory;
- }
-
- @Override
- protected SlingSettingsService getSettingsService() {
- return settingsService;
- }
-
-}
--
To stop receiving notification emails like this one, please contact
"commits@sling.apache.org" <co...@sling.apache.org>.