You are viewing a plain text version of this content. The canonical link for it is here.
Posted to cvs@cocoon.apache.org by jb...@apache.org on 2006/10/03 11:49:25 UTC

svn commit: r452380 - in /cocoon/trunk: commons/TO-SYNC-FROM-BRANCH.txt commons/status.xml core/cocoon-core/src/main/java/org/apache/cocoon/components/pipeline/impl/AbstractCachingProcessingPipeline.java

Author: jbq
Date: Tue Oct  3 02:49:24 2006
New Revision: 452380

URL: http://svn.apache.org/viewvc?view=rev&rev=452380
Log:
Ported various caching pipeline fixes and improvements from branch 2.1:
  * locking feature, see http://svn.apache.org/viewcvs.cgi?rev=379792&view=rev
  * COCOON-1279: caching point pipelines and smart caching, see http://issues.apache.org/jira/browse/COCOON-1279
  * COCOON-1799: Threads waste when reading a not found resource, see http://issues.apache.org/jira/browse/COCOON-1799

Modified:
    cocoon/trunk/commons/TO-SYNC-FROM-BRANCH.txt
    cocoon/trunk/commons/status.xml
    cocoon/trunk/core/cocoon-core/src/main/java/org/apache/cocoon/components/pipeline/impl/AbstractCachingProcessingPipeline.java

Modified: cocoon/trunk/commons/TO-SYNC-FROM-BRANCH.txt
URL: http://svn.apache.org/viewvc/cocoon/trunk/commons/TO-SYNC-FROM-BRANCH.txt?view=diff&rev=452380&r1=452379&r2=452380
==============================================================================
--- cocoon/trunk/commons/TO-SYNC-FROM-BRANCH.txt (original)
+++ cocoon/trunk/commons/TO-SYNC-FROM-BRANCH.txt Tue Oct  3 02:49:24 2006
@@ -20,8 +20,3 @@
 HtmlUnit test framework is in branch only.
 
 Tour block has been updated for 2.1.8, need to be synced and tested in trunk.
-
-AbstractCachingProcessingPipeline:
-  * locking feature, see http://svn.apache.org/viewcvs.cgi?rev=379792&view=rev
-  * COCOON-1279: caching point pipelines and smart caching, see http://issues.apache.org/jira/browse/COCOON-1279
-  * COCOON-1799: Threads waste when reading a not found resource, see http://issues.apache.org/jira/browse/COCOON-1799

Modified: cocoon/trunk/commons/status.xml
URL: http://svn.apache.org/viewvc/cocoon/trunk/commons/status.xml?view=diff&rev=452380&r1=452379&r2=452380
==============================================================================
--- cocoon/trunk/commons/status.xml (original)
+++ cocoon/trunk/commons/status.xml Tue Oct  3 02:49:24 2006
@@ -104,6 +104,7 @@
   <person name="Konstantin Piroumian" email="kpiroumian@apache.org" id="KP"/>
   <person name="Marc Portier" email="mpo@apache.org" id="MPO"/>
   <person name="Ovidiu Predescu" email="ovidiu@apache.org" id="OP"/>
+  <person name="Jean-Baptiste Quenot" email="jbq@apache.org" id="JBQ"/>
   <person name="Jeremy Quinn" email="jeremy@apache.org" id="JQ"/>
   <person name="Reinhard P&#246;tz" email="reinhard@apache.org" id="RP"/>
   <person name="Gianugo Rabellino" email="gianugo@apache.org" id="GR"/>
@@ -177,6 +178,14 @@
   <!-- These are the changes from the last 2.1.x version. -->
  <changes>
   <release version="@version@" date="@date@">
+    <action dev="JBQ" type="update">
+      Ported various caching pipeline fixes and improvements from branch 2.1:
+      <ul>
+        <li>locking feature, see http://svn.apache.org/viewcvs.cgi?rev=379792&view=rev</li>
+        <li>COCOON-1279: caching point pipelines and smart caching, see http://issues.apache.org/jira/browse/COCOON-1279</li>
+        <li>COCOON-1799: Threads waste when reading a not found resource, see http://issues.apache.org/jira/browse/COCOON-1799</li>
+      </ul>
+    </action>
     <action dev="VG" type="update">
       Move BackgroundEnvironment from Cron block into the core.
     </action>

Modified: cocoon/trunk/core/cocoon-core/src/main/java/org/apache/cocoon/components/pipeline/impl/AbstractCachingProcessingPipeline.java
URL: http://svn.apache.org/viewvc/cocoon/trunk/core/cocoon-core/src/main/java/org/apache/cocoon/components/pipeline/impl/AbstractCachingProcessingPipeline.java?view=diff&rev=452380&r1=452379&r2=452380
==============================================================================
--- cocoon/trunk/core/cocoon-core/src/main/java/org/apache/cocoon/components/pipeline/impl/AbstractCachingProcessingPipeline.java (original)
+++ cocoon/trunk/core/cocoon-core/src/main/java/org/apache/cocoon/components/pipeline/impl/AbstractCachingProcessingPipeline.java Tue Oct  3 02:49:24 2006
@@ -16,9 +16,17 @@
  */
 package org.apache.cocoon.components.pipeline.impl;
 
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Date;
+
+import org.apache.avalon.framework.component.ComponentException;
 import org.apache.avalon.framework.parameters.ParameterException;
 import org.apache.avalon.framework.parameters.Parameters;
-
+import org.apache.avalon.framework.service.ServiceException;
 import org.apache.cocoon.ProcessingException;
 import org.apache.cocoon.caching.CacheableProcessingComponent;
 import org.apache.cocoon.caching.CachedResponse;
@@ -28,17 +36,11 @@
 import org.apache.cocoon.environment.Environment;
 import org.apache.cocoon.transformation.Transformer;
 import org.apache.cocoon.util.HashUtil;
-
 import org.apache.excalibur.source.SourceValidity;
 import org.apache.excalibur.source.impl.validity.AggregatedValidity;
 import org.apache.excalibur.source.impl.validity.DeferredValidity;
 import org.apache.excalibur.source.impl.validity.NOPValidity;
-
-import java.io.ByteArrayOutputStream;
-import java.io.OutputStream;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Date;
+import org.apache.excalibur.store.Store;
 
 /**
  * This is the base class for all caching pipeline implementations
@@ -49,6 +51,8 @@
  */
 public abstract class AbstractCachingProcessingPipeline extends BaseCachingProcessingPipeline {
 
+    public static final String PIPELOCK_PREFIX = "PIPELOCK:";
+
     /** The role name of the generator */
     protected String generatorRole;
 
@@ -86,50 +90,46 @@
     /** Cache complete response */
     protected boolean cacheCompleteResponse;
 
-    /** Smart caching ? */
-    protected boolean doSmartCaching;
+    protected Store transientStore = null;
 
-    /** Default setting for smart caching */
-    protected boolean configuredDoSmartCaching;
-
-    /**
-     * Abstract methods defined in subclasses
-     */
+    /** Abstract method defined in subclasses */
     protected abstract void cacheResults(Environment environment,
-                                         OutputStream os)
-    throws Exception;
+            OutputStream os)
+        throws Exception;
 
+    /** Abstract method defined in subclasses */
     protected abstract ComponentCacheKey newComponentCacheKey(int type,
-                                                              String role,
-                                                              Serializable key);
+            String role,
+            Serializable key);
 
+    /** Abstract method defined in subclasses */
     protected abstract void connectCachingPipeline(Environment environment)
-    throws ProcessingException;
+        throws ProcessingException;
 
     /**
      * Parameterizable Interface - Configuration
      */
     public void parameterize(Parameters params)
-    throws ParameterException {
+        throws ParameterException {
         super.parameterize(params);
-        this.configuredDoSmartCaching = params.getParameterAsBoolean("smart-caching", true);
-    }
 
-    /**
-     * Setup this component
-     */
-    public void setup(Parameters params) {
-        super.setup(params);
-        this.doSmartCaching = params.getParameterAsBoolean("smart-caching",
-                                                           this.configuredDoSmartCaching);
+        String storeRole = params.getParameter("store-role",Store.TRANSIENT_STORE); 
+
+        try {
+            transientStore = (Store) manager.lookup(storeRole);
+        } catch (ServiceException e) {
+            if(getLogger().isDebugEnabled()) {
+                getLogger().debug("Could not look up transient store, synchronizing requests will not work!",e);
+            }
+        }
     }
 
     /**
      * Set the generator.
      */
     public void setGenerator (String role, String source, Parameters param,
-                              Parameters hintParam)
-    throws ProcessingException {
+            Parameters hintParam)
+        throws ProcessingException {
         super.setGenerator(role, source, param, hintParam);
         this.generatorRole = role;
     }
@@ -138,8 +138,7 @@
      * Add a transformer.
      */
     public void addTransformer (String role, String source, Parameters param,
-                                Parameters hintParam)
-    throws ProcessingException {
+            Parameters hintParam) throws ProcessingException {
         super.addTransformer(role, source, param, hintParam);
         this.transformerRoles.add(role);
     }
@@ -148,8 +147,7 @@
      * Set the serializer.
      */
     public void setSerializer (String role, String source, Parameters param,
-                               Parameters hintParam, String mimeType)
-    throws ProcessingException {
+            Parameters hintParam, String mimeType) throws ProcessingException {
         super.setSerializer(role, source, param, hintParam, mimeType);
         this.serializerRole = role;
     }
@@ -158,17 +156,117 @@
      * Set the Reader.
      */
     public void setReader (String role, String source, Parameters param,
-                           String mimeType)
-    throws ProcessingException {
+            String mimeType)
+        throws ProcessingException {
         super.setReader(role, source, param, mimeType);
         this.readerRole = role;
     }
 
+    protected boolean waitForLock(Object key) {
+        if(transientStore != null) {
+            Object lock = null;
+            synchronized(transientStore) {
+                String lockKey = PIPELOCK_PREFIX+key;
+                if(transientStore.containsKey(lockKey)) {
+                    // cache content is currently being generated, wait for other thread
+                    lock = transientStore.get(lockKey);
+                }
+            }
+            if(lock != null) {
+                try {
+                    // become owner of monitor
+                    synchronized(lock) {
+                        lock.wait();
+                    }
+                } catch (InterruptedException e) {
+                    if(getLogger().isDebugEnabled()) {
+                        getLogger().debug("Got interrupted waiting for other pipeline to finish processing, retrying...",e);
+                    }
+                    return false;
+                }
+                if(getLogger().isDebugEnabled()) {
+                    getLogger().debug("Other pipeline finished processing, retrying to get cached response.");
+                }
+                return false;
+            }
+        }
+        return true;
+    }
+
+    /**
+     * makes the lock (instantiates a new object and puts it into the store)
+     */
+    protected boolean generateLock(Object key) {
+        boolean succeeded = true;
+
+        if( transientStore != null && key != null ) {
+            String lockKey = PIPELOCK_PREFIX+key;
+            synchronized(transientStore) {
+                if(transientStore.containsKey(lockKey)) {
+                    succeeded = false;
+                    if(getLogger().isDebugEnabled()) {
+                        getLogger().debug("Lock already present in the store!");
+                    }
+                } else {
+                    Object lock = new Object();
+                    try {
+                        transientStore.store(lockKey, lock);
+                    } catch (IOException e) {
+                        if(getLogger().isDebugEnabled()) {
+                            getLogger().debug("Could not put lock in the store!",e);
+                        }
+                        succeeded = false;
+                    }
+                }	
+            }
+        }
+
+        return succeeded;
+    }
+
+    /**
+     * releases the lock (notifies it and removes it from the store)
+     */
+    protected boolean releaseLock(Object key) {
+        boolean succeeded = true;
+
+        if( transientStore != null && key != null ) {
+            String lockKey = PIPELOCK_PREFIX+key;
+            Object lock = null;
+            synchronized(transientStore) {
+                if(!transientStore.containsKey(lockKey)) {
+                    succeeded = false;
+                    if(getLogger().isDebugEnabled()) {
+                        getLogger().debug("Lock not present in the store!");
+                    }
+                } else {
+                    try {
+                        lock = transientStore.get(lockKey);
+                        transientStore.remove(lockKey);
+                    } catch (Exception e) {
+                        if(getLogger().isDebugEnabled()) {
+                            getLogger().debug("Could not get lock from the store!",e);
+                        }
+                        succeeded = false;
+                    }
+                }
+            }
+            if(succeeded && lock != null) {
+                // become monitor owner
+                synchronized(lock) {
+                    lock.notifyAll();
+                }
+            }
+        }
+
+        return succeeded;
+    }
+
     /**
      * Process the given <code>Environment</code>, producing the output.
      */
     protected boolean processXMLPipeline(Environment environment)
-    throws ProcessingException {
+        throws ProcessingException {
         if (this.toCacheKey == null && this.cachedResponse == null) {
             return super.processXMLPipeline(environment);
         }
@@ -202,12 +300,15 @@
             setMimeTypeForSerializer(environment);
             if (getLogger().isDebugEnabled() && this.toCacheKey != null) {
                 getLogger().debug("processXMLPipeline: caching content for further" +
-                                  " requests of '" + environment.getURI() +
-                                  "' using key " + this.toCacheKey);
+                        " requests of '" + environment.getURI() +
+                        "' using key " + this.toCacheKey);
             }
 
+            generateLock(this.toCacheKey);
+
             try {
                 OutputStream os = null;
+
                 if (this.cacheCompleteResponse && this.toCacheKey != null) {
                     os = new CachingOutputStream(environment.getOutputStream(this.outputBufferSize));
                 }
@@ -268,6 +369,8 @@
 
             } catch (Exception e) {
                 handleException(e);
+            } finally {
+                releaseLock(this.toCacheKey);
             }
 
             return true;
@@ -280,7 +383,7 @@
      * The components of the pipeline are checked if they are Cacheable.
      */
     protected void generateCachingKey(Environment environment)
-    throws ProcessingException {
+        throws ProcessingException {
 
         this.toCacheKey = null;
 
@@ -303,8 +406,8 @@
             this.toCacheKey = new PipelineCacheKey();
             this.toCacheKey.addKey(
                     this.newComponentCacheKey(
-                            ComponentCacheKey.ComponentType_Generator,
-                            this.generatorRole, key));
+                        ComponentCacheKey.ComponentType_Generator,
+                        this.generatorRole, key));
 
             // now testing transformers
             final int transformerSize = super.transformers.size();
@@ -320,10 +423,10 @@
                 if (key != null) {
                     this.toCacheKey.addKey(
                             this.newComponentCacheKey(
-                                    ComponentCacheKey.ComponentType_Transformer,
-                                    (String)this.transformerRoles.get(
-                                            this.firstNotCacheableTransformerIndex),
-                                            key));
+                                ComponentCacheKey.ComponentType_Transformer,
+                                (String)this.transformerRoles.get(
+                                                                  this.firstNotCacheableTransformerIndex),
+                                key));
 
                     this.firstNotCacheableTransformerIndex++;
                 } else {
@@ -342,12 +445,12 @@
                 if (key != null) {
                     this.toCacheKey.addKey(
                             this.newComponentCacheKey(
-                                    ComponentCacheKey.ComponentType_Serializer,
-                                    this.serializerRole,
-                                    key));
+                                ComponentCacheKey.ComponentType_Serializer,
+                                this.serializerRole,
+                                key));
                     this.cacheCompleteResponse = true;
                 }
-            }
+                    }
         }
     }
 
@@ -361,7 +464,7 @@
             // a cached response or when the cached response does
             // cache less than now is cacheable
             if (this.fromCacheKey == null
-                || this.fromCacheKey.size() < this.toCacheKey.size()) {
+                    || this.fromCacheKey.size() < this.toCacheKey.size()) {
 
                 this.toCacheSourceValidities =
                     new SourceValidity[this.toCacheKey.size()];
@@ -373,7 +476,7 @@
 
                     if (validity == null) {
                         if (i > 0
-                            && (this.fromCacheKey == null
+                                && (this.fromCacheKey == null
                                     || i > this.fromCacheKey.size())) {
                             // shorten key
                             for (int m=i; m < this.toCacheSourceValidities.length; m++) {
@@ -412,7 +515,7 @@
      * handle expires properly.
      */
     protected void validatePipeline(Environment environment)
-    throws ProcessingException {
+        throws ProcessingException {
         this.completeResponseIsCached = this.cacheCompleteResponse;
         this.fromCacheKey = this.toCacheKey.copy();
         this.firstProcessedTransformerIndex = this.firstNotCacheableTransformerIndex;
@@ -427,7 +530,7 @@
             if (response != null) {
                 if (getLogger().isDebugEnabled()) {
                     getLogger().debug("Found cached response for '" + environment.getURI() +
-                                      "' using key: " + this.fromCacheKey);
+                            "' using key: " + this.fromCacheKey);
                 }
 
                 boolean responseIsValid = true;
@@ -439,21 +542,21 @@
 
                 if (responseExpires != null) {
                     if (getLogger().isDebugEnabled()) {
-                       getLogger().debug("Expires time found for " + environment.getURI());
+                        getLogger().debug("Expires time found for " + environment.getURI());
                     }
 
                     if (responseExpires.longValue() > System.currentTimeMillis()) {
                         if (getLogger().isDebugEnabled()) {
                             getLogger().debug("Expires time still fresh for " + environment.getURI() +
-                                              ", ignoring all other cache settings. This entry expires on "+
-                                              new Date(responseExpires.longValue()));
+                                    ", ignoring all other cache settings. This entry expires on "+
+                                    new Date(responseExpires.longValue()));
                         }
                         this.cachedResponse = response;
                         return;
                     } else {
                         if (getLogger().isDebugEnabled()) {
                             getLogger().debug("Expires time has expired for " + environment.getURI() +
-                                              ", regenerating content.");
+                                    ", regenerating content.");
                         }
 
                         // If an expires parameter was provided, use it. If this parameter is not available
@@ -510,12 +613,12 @@
                             responseIsUsable = false;
                             if (getLogger().isDebugEnabled()) {
                                 getLogger().debug("validatePipeline: responseIsUsable is false, valid=" +
-                                                  valid + " at index " + i);
+                                        valid + " at index " + i);
                             }
                         } else {
                             if (getLogger().isDebugEnabled()) {
                                 getLogger().debug("validatePipeline: responseIsValid is false due to " +
-                                                  validity);
+                                        validity);
                             }
                         }
                     } else {
@@ -526,7 +629,7 @@
                 if (responseIsValid) {
                     if (getLogger().isDebugEnabled()) {
                         getLogger().debug("validatePipeline: using valid cached content for '" +
-                                          environment.getURI() + "'.");
+                                environment.getURI() + "'.");
                     }
 
                     // we are valid, ok that's it
@@ -535,7 +638,7 @@
                 } else {
                     if (getLogger().isDebugEnabled()) {
                         getLogger().debug("validatePipeline: cached content is invalid for '" +
-                                          environment.getURI() + "'.");
+                                environment.getURI() + "'.");
                     }
                     // we are not valid!
 
@@ -574,59 +677,58 @@
                 }
             } else {
 
+                // check if there might be one being generated
+                if(!waitForLock(this.fromCacheKey)) {
+                    finished = false;
+                    continue;
+                }
+
                 // no cached response found
                 if (this.getLogger().isDebugEnabled()) {
                     this.getLogger().debug(
-                        "Cached response not found for '" + environment.getURI() +
-                        "' using key: " +  this.fromCacheKey
-                    );
+                            "Cached response not found for '" + environment.getURI() +
+                            "' using key: " +  this.fromCacheKey
+                            );
                 }
 
-                if (!this.doSmartCaching) {
-                    // try a shorter key
-                    if (this.fromCacheKey.size() > 1) {
-                        this.fromCacheKey.removeLastKey();
-                        if (!this.completeResponseIsCached) {
-                            this.firstProcessedTransformerIndex--;
-                        }
-                        finished = false;
-                    } else {
-                        this.fromCacheKey = null;
-                    }
-                } else {
-                    // stop on longest key for smart caching
-                    this.fromCacheKey = null;
-                }
+                finished = setupFromCacheKey();
                 this.completeResponseIsCached = false;
             }
         }
 
     }
 
+    boolean setupFromCacheKey() {
+        // stop on longest key for smart caching
+        this.fromCacheKey = null;
+        return true;
+    }
+
     /**
      * Setup the evenet pipeline.
      * The components of the pipeline are checked if they are
      * Cacheable.
      */
     protected void setupPipeline(Environment environment)
-    throws ProcessingException {
-        super.setupPipeline( environment );
+        throws ProcessingException {
+        super.setupPipeline(environment);
 
-        // generate the key to fill the cache
-        this.generateCachingKey(environment);
+        // Generate the key to fill the cache
+        generateCachingKey(environment);
 
-        // test the cache for a valid response
+        // Test the cache for a valid response
         if (this.toCacheKey != null) {
-            this.validatePipeline(environment);
+            validatePipeline(environment);
         }
-        this.setupValidities();
+
+        setupValidities();
     }
 
     /**
      * Connect the pipeline.
      */
     protected void connectPipeline(Environment   environment)
-    throws ProcessingException {
+        throws ProcessingException {
         if (this.toCacheKey == null && this.cachedResponse == null) {
             super.connectPipeline(environment);
             return;
@@ -642,7 +744,7 @@
      * @throws ProcessingException if an error occurs
      */
     protected boolean processReader(Environment  environment)
-    throws ProcessingException {
+        throws ProcessingException {
         try {
             boolean usedCache = false;
             OutputStream outputStream = null;
@@ -655,123 +757,142 @@
                 readerKey = ((CacheableProcessingComponent)super.reader).getKey();
             }
 
+            boolean finished = false;
+
             if (readerKey != null) {
                 // response is cacheable, build the key
                 pcKey = new PipelineCacheKey();
                 pcKey.addKey(new ComponentCacheKey(ComponentCacheKey.ComponentType_Reader,
-                                                   this.readerRole,
-                                                   readerKey)
-                            );
-
-                // now we have the key to get the cached object
-                CachedResponse cachedObject = this.cache.get(pcKey);
-                if (cachedObject != null) {
-                    if (getLogger().isDebugEnabled()) {
-                        getLogger().debug("Found cached response for '" +
-                                          environment.getURI() + "' using key: " + pcKey);
-                    }
-
-                    SourceValidity[] validities = cachedObject.getValidityObjects();
-                    if (validities == null || validities.length != 1) {
-                        // to avoid getting here again and again, we delete it
-                        this.cache.remove(pcKey);
+                            this.readerRole,
+                            readerKey)
+                        );
+
+                while(!finished) {
+                    finished = true;
+                    // now we have the key to get the cached object
+                    CachedResponse cachedObject = this.cache.get(pcKey);
+                    if (cachedObject != null) {
                         if (getLogger().isDebugEnabled()) {
-                            getLogger().debug("Cached response for '" + environment.getURI() +
-                                              "' using key: " + pcKey + " is invalid.");
-                        }
-                        this.cachedResponse = null;
-                    } else {
-                        SourceValidity cachedValidity = validities[0];
-                        boolean isValid = false;
-                        int valid = cachedValidity.isValid();
-                        if (valid == SourceValidity.UNKNOWN) {
-                            // get reader validity and compare
-                            readerValidity = ((CacheableProcessingComponent) super.reader).getValidity();
-                            if (readerValidity != null) {
-                                valid = cachedValidity.isValid(readerValidity);
-                                if (valid == SourceValidity.UNKNOWN) {
-                                    readerValidity = null;
-                                } else {
-                                    isValid = (valid == SourceValidity.VALID);
-                                }
-                            }
-                        } else {
-                            isValid = (valid == SourceValidity.VALID);
+                            getLogger().debug("Found cached response for '" +
+                                    environment.getURI() + "' using key: " + pcKey);
                         }
 
-                        if (isValid) {
+                        SourceValidity[] validities = cachedObject.getValidityObjects();
+                        if (validities == null || validities.length != 1) {
+                            // to avoid getting here again and again, we delete it
+                            this.cache.remove(pcKey);
                             if (getLogger().isDebugEnabled()) {
-                                getLogger().debug("processReader: using valid cached content for '" +
-                                                  environment.getURI() + "'.");
+                                getLogger().debug("Cached response for '" + environment.getURI() +
+                                        "' using key: " + pcKey + " is invalid.");
                             }
-                            byte[] response = cachedObject.getResponse();
-                            if (response.length > 0) {
-                                usedCache = true;
-                                if (cachedObject.getContentType() != null) {
-                                    environment.setContentType(cachedObject.getContentType());
-                                } else {
-                                    setMimeTypeForReader(environment);
+                            this.cachedResponse = null;
+                        } else {
+                            SourceValidity cachedValidity = validities[0];
+                            boolean isValid = false;
+                            int valid = cachedValidity.isValid();
+                            if (valid == SourceValidity.UNKNOWN) {
+                                // get reader validity and compare
+                                readerValidity = ((CacheableProcessingComponent) super.reader).getValidity();
+                                if (readerValidity != null) {
+                                    valid = cachedValidity.isValid(readerValidity);
+                                    if (valid == SourceValidity.UNKNOWN) {
+                                        readerValidity = null;
+                                    } else {
+                                        isValid = (valid == SourceValidity.VALID);
+                                    }
                                 }
-                                outputStream = environment.getOutputStream(0);
-                                environment.setContentLength(response.length);
-                                outputStream.write(response);
+                            } else {
+                                isValid = (valid == SourceValidity.VALID);
                             }
-                        } else {
-                            if (getLogger().isDebugEnabled()) {
-                                getLogger().debug("processReader: cached content is invalid for '" +
-                                                  environment.getURI() + "'.");
+
+                            if (isValid) {
+                                if (getLogger().isDebugEnabled()) {
+                                    getLogger().debug("processReader: using valid cached content for '" +
+                                            environment.getURI() + "'.");
+                                }
+                                byte[] response = cachedObject.getResponse();
+                                if (response.length > 0) {
+                                    usedCache = true;
+                                    if (cachedObject.getContentType() != null) {
+                                        environment.setContentType(cachedObject.getContentType());
+                                    } else {
+                                        setMimeTypeForReader(environment);
+                                    }
+                                    outputStream = environment.getOutputStream(0);
+                                    environment.setContentLength(response.length);
+                                    outputStream.write(response);
+                                }
+                            } else {
+                                if (getLogger().isDebugEnabled()) {
+                                    getLogger().debug("processReader: cached content is invalid for '" +
+                                            environment.getURI() + "'.");
+                                }
+                                // remove invalid cached object
+                                this.cache.remove(pcKey);
                             }
-                            // remove invalid cached object
-                            this.cache.remove(pcKey);
+                        }
+                    } else {
+                        // check if something is being generated right now
+                        if(!waitForLock(pcKey)) {
+                            finished = false;
+                            continue;
                         }
                     }
                 }
             }
 
             if (!usedCache) {
-                if (pcKey != null) {
-                    if (getLogger().isDebugEnabled()) {
-                        getLogger().debug("processReader: caching content for further requests of '" +
-                                          environment.getURI() + "'.");
-                    }
+                // make sure lock will be released
+                try {
+                    if (pcKey != null) {
+                        if (getLogger().isDebugEnabled()) {
+                            getLogger().debug("processReader: caching content for further requests of '" +
+                                    environment.getURI() + "'.");
+                        }
+                        generateLock(pcKey);
+
+                        if (readerValidity == null) {
+                            readerValidity = ((CacheableProcessingComponent)super.reader).getValidity();
+                        }
 
-                    if (readerValidity == null) {
-                        readerValidity = ((CacheableProcessingComponent)super.reader).getValidity();
+                        if (readerValidity != null) {
+                            outputStream = environment.getOutputStream(this.outputBufferSize);
+                            outputStream = new CachingOutputStream(outputStream);
+                        }
                     }
 
-                    if (readerValidity != null) {
-                        outputStream = environment.getOutputStream(this.outputBufferSize);
-                        outputStream = new CachingOutputStream(outputStream);
+                    setMimeTypeForReader(environment);
+                    if (this.reader.shouldSetContentLength()) {
+                        ByteArrayOutputStream os = new ByteArrayOutputStream();
+                        this.reader.setOutputStream(os);
+                        this.reader.generate();
+                        environment.setContentLength(os.size());
+                        if (outputStream == null) {
+                            outputStream = environment.getOutputStream(0);
+                        }
+                        os.writeTo(outputStream);
                     } else {
-                        pcKey = null;
+                        if (outputStream == null) {
+                            outputStream = environment.getOutputStream(this.outputBufferSize);
+                        }
+                        this.reader.setOutputStream(outputStream);
+                        this.reader.generate();
                     }
-                }
 
-                setMimeTypeForReader(environment);
-                if (this.reader.shouldSetContentLength()) {
-                    ByteArrayOutputStream os = new ByteArrayOutputStream();
-                    this.reader.setOutputStream(os);
-                    this.reader.generate();
-                    environment.setContentLength(os.size());
-                    if (outputStream == null) {
-                        outputStream = environment.getOutputStream(0);
+                    // store the response
+                    if (pcKey != null && readerValidity != null) {
+                        final CachedResponse res = new CachedResponse(new SourceValidity[] {readerValidity},
+                                ((CachingOutputStream)outputStream).getContent());
+                        res.setContentType(environment.getContentType());
+                        this.cache.store(pcKey, res);
                     }
-                    os.writeTo(outputStream);
-                } else {
-                    if (outputStream == null) {
-                        outputStream = environment.getOutputStream(this.outputBufferSize);
+
+                } finally {
+                    if (pcKey != null) {
+                        releaseLock(pcKey);
                     }
-                    this.reader.setOutputStream(outputStream);
-                    this.reader.generate();
                 }
 
-                // store the response
-                if (pcKey != null) {
-                    final CachedResponse res = new CachedResponse(new SourceValidity[] {readerValidity},
-                            ((CachingOutputStream)outputStream).getContent());
-                    res.setContentType(environment.getContentType());
-                    this.cache.store(pcKey, res);
-                }
             }
         } catch (Exception e) {
             handleException(e);
@@ -799,10 +920,10 @@
                     this.firstNotCacheableTransformerIndex < super.transformers.size()) {
                 // Cache contains only partial pipeline.
                 return null;
-            }
+                    }
 
             if (this.toCacheSourceValidities != null) {
-                // This means the pipeline is valid based on the validities
+                // This means that the pipeline is valid based on the validities
                 // of the individual components
                 final AggregatedValidity validity = new AggregatedValidity();
                 for (int i=0; i < this.toCacheSourceValidities.length; i++) {
@@ -825,7 +946,7 @@
                     && !this.completeResponseIsCached
                     && this.firstProcessedTransformerIndex == super.transformers.size()) {
                 vals = this.fromCacheKey.size();
-            }
+                    }
 
             if (vals > 0) {
                 final AggregatedValidity validity = new AggregatedValidity();
@@ -849,15 +970,15 @@
         }
 
         if (null != this.toCacheKey
-             && !this.cacheCompleteResponse
-             && this.firstNotCacheableTransformerIndex == super.transformers.size()) {
-             return String.valueOf(HashUtil.hash(this.toCacheKey.toString()));
-        }
+                && !this.cacheCompleteResponse
+                && this.firstNotCacheableTransformerIndex == super.transformers.size()) {
+            return String.valueOf(HashUtil.hash(this.toCacheKey.toString()));
+                }
         if (null != this.fromCacheKey
-             && !this.completeResponseIsCached
-             && this.firstProcessedTransformerIndex == super.transformers.size()) {
+                && !this.completeResponseIsCached
+                && this.firstProcessedTransformerIndex == super.transformers.size()) {
             return String.valueOf(HashUtil.hash(this.fromCacheKey.toString()));
-        }
+                }
 
         return null;
     }
@@ -896,7 +1017,6 @@
      * Recyclable Interface
      */
     public void recycle() {
-
         this.generatorRole = null;
         this.transformerRoles.clear();
         this.serializerRole = null;