You are viewing a plain text version of this content. The canonical link for it is here.
Posted to cvs@cocoon.apache.org by mp...@apache.org on 2006/02/22 16:26:29 UTC
svn commit: r379792 - in /cocoon/branches/BRANCH_2_1_X/src:
blocks/eventcache/java/org/apache/cocoon/samples/
blocks/eventcache/samples/ java/org/apache/cocoon/components/pipeline/impl/
Author: mpfingsthorn
Date: Wed Feb 22 07:26:24 2006
New Revision: 379792
URL: http://svn.apache.org/viewcvs?rev=379792&view=rev
Log:
- Extended the AbstractCachingProcessingPipeline to synchronize access, i.e. if another thread is already generating new content for a specific cache key, the new thread will wait for the other and show the new cached result instead of computing it another time in parallel with the first.
- Extended the eventcache samples a little to test the above, also with a reader.
Added:
cocoon/branches/BRANCH_2_1_X/src/blocks/eventcache/java/org/apache/cocoon/samples/EventAwareReader.java
Modified:
cocoon/branches/BRANCH_2_1_X/src/blocks/eventcache/samples/eventcache.xml
cocoon/branches/BRANCH_2_1_X/src/blocks/eventcache/samples/sitemap.xmap
cocoon/branches/BRANCH_2_1_X/src/java/org/apache/cocoon/components/pipeline/impl/AbstractCachingProcessingPipeline.java
Added: cocoon/branches/BRANCH_2_1_X/src/blocks/eventcache/java/org/apache/cocoon/samples/EventAwareReader.java
URL: http://svn.apache.org/viewcvs/cocoon/branches/BRANCH_2_1_X/src/blocks/eventcache/java/org/apache/cocoon/samples/EventAwareReader.java?rev=379792&view=auto
==============================================================================
--- cocoon/branches/BRANCH_2_1_X/src/blocks/eventcache/java/org/apache/cocoon/samples/EventAwareReader.java (added)
+++ cocoon/branches/BRANCH_2_1_X/src/blocks/eventcache/java/org/apache/cocoon/samples/EventAwareReader.java Wed Feb 22 07:26:24 2006
@@ -0,0 +1,61 @@
+/*
+ * Copyright 1999-2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cocoon.samples;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+import org.apache.cocoon.ProcessingException;
+import org.apache.cocoon.caching.validity.EventValidity;
+import org.apache.cocoon.caching.validity.NamedEvent;
+import org.apache.cocoon.environment.ObjectModelHelper;
+import org.apache.cocoon.environment.Request;
+import org.apache.cocoon.reading.ResourceReader;
+import org.apache.excalibur.source.SourceValidity;
+
+/**
+ * @author Max Pfingsthorn (mpfingsthorn@hippo.nl)
+ *
+ */
+public class EventAwareReader extends ResourceReader {
+
+ public void generate() throws IOException, ProcessingException {
+ try {
+ long DELAY_SECS = this.parameters.getParameterAsLong("DELAY_SECS", 2);
+ Thread.sleep(DELAY_SECS * 1000L);
+ } catch (InterruptedException ie) {
+ // Not much that can be done...
+ }
+ super.generate();
+ }
+
+ public Serializable getKey() {
+ final Request request = ObjectModelHelper.getRequest(this.objectModel);
+ // for our test, pages having the same value of "pageKey" will share
+ // the same cache location
+ String key = request.getParameter("pageKey") ;
+ return ((key==null||"".equals(key)) ? "foo" : key);
+ }
+
+ public SourceValidity getValidity() {
+ final Request request = ObjectModelHelper.getRequest(this.objectModel);
+ String key = request.getParameter("pageKey") ;
+ return new EventValidity(
+ new NamedEvent(
+ (key==null||"".equals(key)) ? "foo" : key));
+ }
+
+}
Modified: cocoon/branches/BRANCH_2_1_X/src/blocks/eventcache/samples/eventcache.xml
URL: http://svn.apache.org/viewcvs/cocoon/branches/BRANCH_2_1_X/src/blocks/eventcache/samples/eventcache.xml?rev=379792&r1=379791&r2=379792&view=diff
==============================================================================
--- cocoon/branches/BRANCH_2_1_X/src/blocks/eventcache/samples/eventcache.xml (original)
+++ cocoon/branches/BRANCH_2_1_X/src/blocks/eventcache/samples/eventcache.xml Wed Feb 22 07:26:24 2006
@@ -63,6 +63,12 @@
<li><a href="?pageKey=two">pageKey=two</a>
(<a href="action?pageKey=two&event=two">uncache with action</a>)
(<a href="flow?pageKey=two&event=two">uncache with flow</a>)</li>
+ <li><a href="reader?pageKey=foo">reader: pageKey=foo</a>
+ (<a href="action?pageKey=${cocoon.parameters.KEY}&event=foo">uncache with action</a>)
+ (<a href="flow?pageKey=${cocoon.parameters.KEY}&event=foo">uncache with flow</a>)</li>
+ <li><a href="reader?pageKey=bar">reader: pageKey=bar</a>
+ (<a href="action?pageKey=${cocoon.parameters.KEY}&event=bar">uncache with action</a>)
+ (<a href="flow?pageKey=${cocoon.parameters.KEY}&event=bar">uncache with flow</a>)</li>
</ul>
Note: the random numbers you see included in the url after an uncache link
serve two purposes in the example, making it easier to see the effect of the
Modified: cocoon/branches/BRANCH_2_1_X/src/blocks/eventcache/samples/sitemap.xmap
URL: http://svn.apache.org/viewcvs/cocoon/branches/BRANCH_2_1_X/src/blocks/eventcache/samples/sitemap.xmap?rev=379792&r1=379791&r2=379792&view=diff
==============================================================================
--- cocoon/branches/BRANCH_2_1_X/src/blocks/eventcache/samples/sitemap.xmap (original)
+++ cocoon/branches/BRANCH_2_1_X/src/blocks/eventcache/samples/sitemap.xmap Wed Feb 22 07:26:24 2006
@@ -26,13 +26,16 @@
<map:generators>
<map:generator name="sample" src="org.apache.cocoon.samples.EventAwareGenerator"/>
</map:generators>
+ <map:readers>
+ <map:reader name="sample" src="org.apache.cocoon.samples.EventAwareReader"/>
+ </map:readers>
<map:actions>
<map:action name="cacheevent" src="org.apache.cocoon.acting.CacheEventAction"/>
</map:actions>
<map:pipes default="caching">
<!-- A pipe must be defined configured to use the EventAware cache. -->
- <map:pipe name="event-aware" src="org.apache.cocoon.components.pipeline.impl.CachingProcessingPipeline">
+ <map:pipe logger="core.sitemap" name="event-aware" src="org.apache.cocoon.components.pipeline.impl.CachingProcessingPipeline">
<parameter name="cache-role" value="org.apache.cocoon.caching.Cache/EventAware"/>
</map:pipe>
</map:pipes>
@@ -61,19 +64,26 @@
</map:act>
<map:redirect-to uri="demo?pageKey={request-param:pageKey}&rand={random:x}"/>
</map:match>
+
+ <map:match pattern="reader">
+ <map:read type="sample" src="eventcache.xml">
+ <map:parameter name="DELAY_SECS" value="4"/>
+ </map:read>
+ </map:match>
+
<map:match pattern="*">
<map:generate type="sample" src="eventcache.xml">
- <map:parameter name="DELAY_SECS" value="2"/>
+ <map:parameter name="DELAY_SECS" value="4"/>
<map:parameter name="DATE" value="{date:date}"/>
<map:parameter name="KEY" value="{request-param:pageKey}"/>
</map:generate>
<map:transform src="context://samples/stylesheets/dynamic-page2html.xsl">
<map:parameter name="servletPath" value="{request:servletPath}"/>
- <map:parameter name="sitemapURI" value="{request:sitemapURI}"/>
- <map:parameter name="contextPath" value="{request:contextPath}"/>
- <map:parameter name="file" value="eventcache.xsp"/>
- <map:parameter name="remove" value="{0}"/>
- </map:transform>
+ <map:parameter name="sitemapURI" value="{request:sitemapURI}"/>
+ <map:parameter name="contextPath" value="{request:contextPath}"/>
+ <map:parameter name="file" value="eventcache.xsp"/>
+ <map:parameter name="remove" value="{0}"/>
+ </map:transform>
<map:serialize/>
</map:match>
</map:pipeline>
Modified: cocoon/branches/BRANCH_2_1_X/src/java/org/apache/cocoon/components/pipeline/impl/AbstractCachingProcessingPipeline.java
URL: http://svn.apache.org/viewcvs/cocoon/branches/BRANCH_2_1_X/src/java/org/apache/cocoon/components/pipeline/impl/AbstractCachingProcessingPipeline.java?rev=379792&r1=379791&r2=379792&view=diff
==============================================================================
--- cocoon/branches/BRANCH_2_1_X/src/java/org/apache/cocoon/components/pipeline/impl/AbstractCachingProcessingPipeline.java (original)
+++ cocoon/branches/BRANCH_2_1_X/src/java/org/apache/cocoon/components/pipeline/impl/AbstractCachingProcessingPipeline.java Wed Feb 22 07:26:24 2006
@@ -15,6 +15,7 @@
*/
package org.apache.cocoon.components.pipeline.impl;
+import org.apache.avalon.framework.component.ComponentException;
import org.apache.avalon.framework.parameters.ParameterException;
import org.apache.avalon.framework.parameters.Parameters;
@@ -35,8 +36,10 @@
import org.apache.excalibur.source.impl.validity.AggregatedValidity;
import org.apache.excalibur.source.impl.validity.DeferredValidity;
import org.apache.excalibur.source.impl.validity.NOPValidity;
+import org.apache.excalibur.store.Store;
import java.io.ByteArrayOutputStream;
+import java.io.IOException;
import java.io.OutputStream;
import java.io.Serializable;
import java.util.ArrayList;
@@ -53,6 +56,8 @@
*/
public abstract class AbstractCachingProcessingPipeline extends BaseCachingProcessingPipeline {
+ public static final String PIPELOCK_PREFIX = "PIPELOCK:";
+
/** The role name of the generator */
protected String generatorRole;
@@ -99,6 +104,8 @@
/** Default setting for smart caching */
protected boolean configuredDoSmartCaching;
+
+ protected Store transientStore = null;
/** Abstract method defined in subclasses */
@@ -124,6 +131,16 @@
super.parameterize(params);
this.configuredDoSmartCaching =
params.getParameterAsBoolean("smart-caching", true);
+
+ String storeRole = params.getParameter("store-role",Store.TRANSIENT_STORE);
+
+ try {
+ transientStore = (Store) manager.lookup(storeRole);
+ } catch (ComponentException e) {
+ if(getLogger().isDebugEnabled()) {
+ getLogger().debug("Could not look up transient store, synchronizing requests will not work!",e);
+ }
+ }
}
/**
@@ -173,7 +190,107 @@
super.setReader(role, source, param, mimeType);
this.readerRole = role;
}
-
+
+ protected boolean waitForLock(Object key) {
+ if(transientStore != null) {
+ Object lock = null;
+ synchronized(transientStore) {
+ String lockKey = PIPELOCK_PREFIX+key;
+ if(transientStore.containsKey(lockKey)) {
+ // cache content is currently being generated, wait for other thread
+ lock = transientStore.get(lockKey);
+ }
+ }
+ if(lock != null) {
+ try {
+ // become owner of monitor
+ synchronized(lock) {
+ lock.wait();
+ }
+ } catch (InterruptedException e) {
+ if(getLogger().isDebugEnabled()) {
+ getLogger().debug("Got interrupted waiting for other pipeline to finish processing, retrying...",e);
+ }
+ return false;
+ }
+ if(getLogger().isDebugEnabled()) {
+ getLogger().debug("Other pipeline finished processing, retrying to get cached response.");
+ }
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**
+ * makes the lock (instantiates a new object and puts it into the store)
+ */
+ protected boolean generateLock(Object key) {
+ boolean succeeded = true;
+
+ if( transientStore != null && key != null ) {
+ String lockKey = PIPELOCK_PREFIX+key;
+ synchronized(transientStore) {
+ if(transientStore.containsKey(lockKey)) {
+ succeeded = false;
+ if(getLogger().isDebugEnabled()) {
+ getLogger().debug("Lock already present in the store!");
+ }
+ } else {
+ Object lock = new Object();
+ try {
+ transientStore.store(lockKey, lock);
+ } catch (IOException e) {
+ if(getLogger().isDebugEnabled()) {
+ getLogger().debug("Could not put lock in the store!",e);
+ }
+ succeeded = false;
+ }
+ }
+ }
+ }
+
+ return succeeded;
+ }
+
+ /**
+ * releases the lock (notifies it and removes it from the store)
+ */
+ protected boolean releaseLock(Object key) {
+ boolean succeeded = true;
+
+ if( transientStore != null && key != null ) {
+ String lockKey = PIPELOCK_PREFIX+key;
+ Object lock = null;
+ synchronized(transientStore) {
+ if(!transientStore.containsKey(lockKey)) {
+ succeeded = false;
+ if(getLogger().isDebugEnabled()) {
+ getLogger().debug("Lock not present in the store!");
+ }
+ } else {
+ try {
+ lock = transientStore.get(lockKey);
+ transientStore.remove(lockKey);
+ } catch (Exception e) {
+ if(getLogger().isDebugEnabled()) {
+ getLogger().debug("Could not get lock from the store!",e);
+ }
+ succeeded = false;
+ }
+ }
+ }
+ if(succeeded && lock != null) {
+ // become monitor owner
+ synchronized(lock) {
+ lock.notifyAll();
+ }
+ }
+ }
+
+ return succeeded;
+ }
+
/**
* Process the given <code>Environment</code>, producing the output.
*/
@@ -216,6 +333,8 @@
"' using key " + this.toCacheKey);
}
+ generateLock(this.toCacheKey);
+
try {
OutputStream os = null;
@@ -279,6 +398,8 @@
} catch (Exception e) {
handleException(e);
+ } finally {
+ releaseLock(this.toCacheKey);
}
return true;
@@ -580,6 +701,12 @@
}
} else {
+ // check if there might be one being generated
+ if(!waitForLock(this.fromCacheKey)) {
+ finished = false;
+ continue;
+ }
+
// no cached response found
if (this.getLogger().isDebugEnabled()) {
this.getLogger().debug(
@@ -666,7 +793,9 @@
readerKey = new Long(((Cacheable)super.reader).generateKey());
}
+ boolean finished = false;
if (readerKey != null) {
+
// response is cacheable, build the key
pcKey = new PipelineCacheKey();
pcKey.addKey(new ComponentCacheKey(ComponentCacheKey.ComponentType_Reader,
@@ -674,129 +803,148 @@
readerKey)
);
- // now we have the key to get the cached object
- CachedResponse cachedObject = this.cache.get(pcKey);
- if (cachedObject != null) {
- if (getLogger().isDebugEnabled()) {
- getLogger().debug("Found cached response for '" +
- environment.getURI() + "' using key: " + pcKey);
- }
-
- SourceValidity[] validities = cachedObject.getValidityObjects();
- if (validities == null || validities.length != 1) {
- // to avoid getting here again and again, we delete it
- this.cache.remove(pcKey);
- if (getLogger().isDebugEnabled()) {
- getLogger().debug("Cached response for '" + environment.getURI() +
- "' using key: " + pcKey + " is invalid.");
- }
- this.cachedResponse = null;
- } else {
- SourceValidity cachedValidity = validities[0];
- boolean isValid = false;
- int valid = cachedValidity.isValid();
- if (valid == SourceValidity.UNKNOWN) {
- // get reader validity and compare
- if (isCacheableProcessingComponent) {
- readerValidity = ((CacheableProcessingComponent) super.reader).getValidity();
- } else {
- CacheValidity cv = ((Cacheable) super.reader).generateValidity();
- if (cv != null) {
- readerValidity = CacheValidityToSourceValidity.createValidity(cv);
- }
- }
- if (readerValidity != null) {
- valid = cachedValidity.isValid(readerValidity);
- if (valid == SourceValidity.UNKNOWN) {
- readerValidity = null;
- } else {
- isValid = (valid == SourceValidity.VALID);
- }
- }
- } else {
- isValid = (valid == SourceValidity.VALID);
- }
-
- if (isValid) {
- if (getLogger().isDebugEnabled()) {
- getLogger().debug("processReader: using valid cached content for '" +
- environment.getURI() + "'.");
- }
- byte[] response = cachedObject.getResponse();
- if (response.length > 0) {
- usedCache = true;
- if (cachedObject.getContentType() != null) {
- environment.setContentType(cachedObject.getContentType());
- } else {
- setMimeTypeForReader(environment);
- }
- outputStream = environment.getOutputStream(0);
- environment.setContentLength(response.length);
- outputStream.write(response);
- }
- } else {
- if (getLogger().isDebugEnabled()) {
- getLogger().debug("processReader: cached content is invalid for '" +
- environment.getURI() + "'.");
- }
- // remove invalid cached object
- this.cache.remove(pcKey);
- }
- }
- }
+ while(!finished) {
+ finished = true;
+ // now we have the key to get the cached object
+ CachedResponse cachedObject = this.cache.get(pcKey);
+ if (cachedObject != null) {
+ if (getLogger().isDebugEnabled()) {
+ getLogger().debug("Found cached response for '" +
+ environment.getURI() + "' using key: " + pcKey);
+ }
+
+ SourceValidity[] validities = cachedObject.getValidityObjects();
+ if (validities == null || validities.length != 1) {
+ // to avoid getting here again and again, we delete it
+ this.cache.remove(pcKey);
+ if (getLogger().isDebugEnabled()) {
+ getLogger().debug("Cached response for '" + environment.getURI() +
+ "' using key: " + pcKey + " is invalid.");
+ }
+ this.cachedResponse = null;
+ } else {
+ SourceValidity cachedValidity = validities[0];
+ boolean isValid = false;
+ int valid = cachedValidity.isValid();
+ if (valid == SourceValidity.UNKNOWN) {
+ // get reader validity and compare
+ if (isCacheableProcessingComponent) {
+ readerValidity = ((CacheableProcessingComponent) super.reader).getValidity();
+ } else {
+ CacheValidity cv = ((Cacheable) super.reader).generateValidity();
+ if (cv != null) {
+ readerValidity = CacheValidityToSourceValidity.createValidity(cv);
+ }
+ }
+ if (readerValidity != null) {
+ valid = cachedValidity.isValid(readerValidity);
+ if (valid == SourceValidity.UNKNOWN) {
+ readerValidity = null;
+ } else {
+ isValid = (valid == SourceValidity.VALID);
+ }
+ }
+ } else {
+ isValid = (valid == SourceValidity.VALID);
+ }
+
+ if (isValid) {
+ if (getLogger().isDebugEnabled()) {
+ getLogger().debug("processReader: using valid cached content for '" +
+ environment.getURI() + "'.");
+ }
+ byte[] response = cachedObject.getResponse();
+ if (response.length > 0) {
+ usedCache = true;
+ if (cachedObject.getContentType() != null) {
+ environment.setContentType(cachedObject.getContentType());
+ } else {
+ setMimeTypeForReader(environment);
+ }
+ outputStream = environment.getOutputStream(0);
+ environment.setContentLength(response.length);
+ outputStream.write(response);
+ }
+ } else {
+ if (getLogger().isDebugEnabled()) {
+ getLogger().debug("processReader: cached content is invalid for '" +
+ environment.getURI() + "'.");
+ }
+ // remove invalid cached object
+ this.cache.remove(pcKey);
+ }
+ }
+ } else {
+ // check if something is being generated right now
+ if(!waitForLock(pcKey)) {
+ finished = false;
+ continue;
+ }
+ }
+ }
}
if (!usedCache) {
- if (pcKey != null) {
- if (getLogger().isDebugEnabled()) {
- getLogger().debug("processReader: caching content for further requests of '" +
- environment.getURI() + "'.");
- }
-
- if (readerValidity == null) {
- if (isCacheableProcessingComponent) {
- readerValidity = ((CacheableProcessingComponent)super.reader).getValidity();
- } else {
- CacheValidity cv = ((Cacheable)super.reader).generateValidity();
- if ( cv != null ) {
- readerValidity = CacheValidityToSourceValidity.createValidity( cv );
- }
- }
- }
-
- if (readerValidity != null) {
- outputStream = environment.getOutputStream(this.outputBufferSize);
- outputStream = new CachingOutputStream(outputStream);
- } else {
- pcKey = null;
- }
- }
-
- setMimeTypeForReader(environment);
- if (this.reader.shouldSetContentLength()) {
- ByteArrayOutputStream os = new ByteArrayOutputStream();
- this.reader.setOutputStream(os);
- this.reader.generate();
- environment.setContentLength(os.size());
- if (outputStream == null) {
- outputStream = environment.getOutputStream(0);
- }
- os.writeTo(outputStream);
- } else {
- if (outputStream == null) {
- outputStream = environment.getOutputStream(this.outputBufferSize);
- }
- this.reader.setOutputStream(outputStream);
- this.reader.generate();
- }
-
- // store the response
- if (pcKey != null) {
- final CachedResponse res = new CachedResponse(new SourceValidity[] {readerValidity},
- ((CachingOutputStream)outputStream).getContent());
- res.setContentType(environment.getContentType());
- this.cache.store(pcKey, res);
- }
+ // make sure lock will be released
+ try {
+ if (pcKey != null) {
+ if (getLogger().isDebugEnabled()) {
+ getLogger().debug("processReader: caching content for further requests of '" +
+ environment.getURI() + "'.");
+ }
+ generateLock(pcKey);
+
+ if (readerValidity == null) {
+ if (isCacheableProcessingComponent) {
+ readerValidity = ((CacheableProcessingComponent)super.reader).getValidity();
+ } else {
+ CacheValidity cv = ((Cacheable)super.reader).generateValidity();
+ if ( cv != null ) {
+ readerValidity = CacheValidityToSourceValidity.createValidity( cv );
+ }
+ }
+ }
+
+ if (readerValidity != null) {
+ outputStream = environment.getOutputStream(this.outputBufferSize);
+ outputStream = new CachingOutputStream(outputStream);
+ } else {
+ pcKey = null;
+ }
+ }
+
+ setMimeTypeForReader(environment);
+ if (this.reader.shouldSetContentLength()) {
+ ByteArrayOutputStream os = new ByteArrayOutputStream();
+ this.reader.setOutputStream(os);
+ this.reader.generate();
+ environment.setContentLength(os.size());
+ if (outputStream == null) {
+ outputStream = environment.getOutputStream(0);
+ }
+ os.writeTo(outputStream);
+ } else {
+ if (outputStream == null) {
+ outputStream = environment.getOutputStream(this.outputBufferSize);
+ }
+ this.reader.setOutputStream(outputStream);
+ this.reader.generate();
+ }
+
+ // store the response
+ if (pcKey != null) {
+ final CachedResponse res = new CachedResponse(new SourceValidity[] {readerValidity},
+ ((CachingOutputStream)outputStream).getContent());
+ res.setContentType(environment.getContentType());
+ this.cache.store(pcKey, res);
+ }
+
+ } finally {
+ if (pcKey != null) {
+ releaseLock(pcKey);
+ }
+ }
+
}
} catch (Exception e) {
handleException(e);