You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jcs-users@jakarta.apache.org by "Smuts, Aaron" <aa...@amazon.com> on 2005/01/06 19:46:00 UTC

CacheEventQueue thread pool configuration

I want to make it possible to configure the pool used by the CacheEventQueue.  By default we can have them share a queue, but each auxiliary could be configured to specify a pool name.  The way you can control the number of threads used across the system.  

I'm wonder what kind of pool behavior is right for these queues.  

If you use a queue, if it is unbounded, then the pool will never create more than the minimum number of threads.  If we bound the queue, then we have to decide what to do when we reach the maximum number of threads.  Do we drop the oldest, block, or run in the current thread?

 

-----Original Message-----
From: Travis Savo [mailto:tsavo@IFILM.com] 
Sent: Tuesday, January 04, 2005 1:11 PM
To: 'Turbine JCS Users List'
Subject: RE: Thread deadlock in CacheEventQueue class

This is already fixed in the experimental CacheEventQueue, which uses a PooledExecutor from the concurrent package instead of 'rolling it's own'.
Perhaps we should give more thought to moving that piece into head? I've been using it in production for over 3 months now and it's never given me any problems (unlike the old one... thank you for fixing it because I was stumped on it and eventually just gave up and changed it to use a PooledExecutor).

-Travis Savo

-----Original Message-----
From: Smuts, Aaron [mailto:aaronsm@amazon.com]
Sent: Tuesday, January 04, 2005 12:31 PM
To: Turbine JCS Users List
Subject: RE: Thread deadlock in CacheEventQueue class

I managed to get it to happen in a unit test and verified that the change fixes the problem.  I'll put it in tonight.

The testPutDelay method below can get it to happen.  It is very tricky to get the timing just right. 



package org.apache.jcs.engine;

/*
 * Copyright 2001-2004 The Apache Software Foundation.
 *
 * Licensed under the Apache License, Version 2.0 (the "License")
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

import java.io.IOException;
import java.io.Serializable;

import org.apache.jcs.JCS;
import org.apache.jcs.TestDiskCacheConcurrent;
import org.apache.jcs.engine.behavior.ICacheElement;
import org.apache.jcs.engine.behavior.ICacheListener;

import junit.extensions.ActiveTestSuite; import junit.framework.Test; import junit.framework.TestCase;

/**
 * This test case is designed to makes sure there are no deadlocks in the event
 * queue. The time to live should be set to a very short interval to make a
 * deadlock more likely.
 *
 * @author Aaron Smuts
 */
public class TestEventQueueConcurrent extends TestCase {

    private static CacheEventQueue queue = null;

    private static CacheListenerImpl listen = null;

    private int maxFailure = 3;

    private int waitBeforeRetry = 100;

    // very small idle time
    private int idleTime = 2;

    /**
     * Constructor for the TestDiskCache object.
     */
    public TestEventQueueConcurrent(String testName)
    {
        super(testName);
    }

    /**
     * Main method passes this test to the text test runner.
     */
    public static void main(String args[])
    {
        String[] testCaseName =
        { TestEventQueueConcurrent.class.getName() };
        junit.textui.TestRunner.main(testCaseName);
    }

    /**
     * A unit test suite for JUnit
     * 
     * @return The test suite
     */
    public static Test suite()
    {

        ActiveTestSuite suite = new ActiveTestSuite();

        suite.addTest(new TestEventQueueConcurrent("testRunPutTest1")
        {
            public void runTest() throws Exception
            {
                this.runPutTest(200, 200);
            }
        });

        suite.addTest(new TestEventQueueConcurrent("testRunPutTest2")
        {
            public void runTest() throws Exception
            {
                this.runPutTest(1200, 1400);
            }
        });

        suite.addTest(new TestEventQueueConcurrent("testRunRemoveTest1")
        {
            public void runTest() throws Exception
            {
                this.runRemoveTest(2200);
            }
        });

        suite.addTest(new TestEventQueueConcurrent("testStopProcessing1")
        {
            public void runTest() throws Exception
            {
                this.runStopProcessingTest();
            }
        });

        suite.addTest(new TestEventQueueConcurrent("testRunPutTest4")
        {
            public void runTest() throws Exception
            {
                this.runPutTest(5200, 6600);
            }
        });

        suite.addTest(new TestEventQueueConcurrent("testRunRemoveTest2")
        {
            public void runTest() throws Exception
            {
                this.runRemoveTest(5200);
            }
        });

        suite.addTest(new TestEventQueueConcurrent("testStopProcessing2")
        {
            public void runTest() throws Exception
            {
                this.runStopProcessingTest();
            }
        });

        
        suite.addTest(new TestEventQueueConcurrent("testRunPutDelayTest")
            {
                public void runTest() throws Exception
                {
                    this.runPutDelayTest(100, 6700);
                }
            });
        
        return suite;
    }

    /**
     * Test setup. Create the static queue to be used by all tests
     */
    public void setUp()
    {
        listen = new CacheListenerImpl();
        queue = new CacheEventQueue(listen, 1L, "testCache1", maxFailure, waitBeforeRetry);

        queue.setWaitToDieMillis(idleTime);
    }

    /**
     * Adds put events to the queue.
     * 
     * @param end
     * @param expectedPutCount
     * @throws Exception
     */
    public void runPutTest(int end, int expectedPutCount) throws Exception
    {
        for (int i = 0; i <= end; i++)
        {
            CacheElement elem = new CacheElement("testCache1", i + ":key", i
+ "data");
            queue.addPutEvent(elem);
        }

        while (!queue.isEmpty())
        {
            synchronized (this)
            {
                System.out.println("queue is still busy, waiting 250 millis");
                this.wait(250);
            }
        }
        System.out.println("queue is empty, comparing putCount");

        // this becomes less accurate with each test. It should never fail.
If
        // it does things are very off.
        assertTrue("The put count [" + listen.putCount
            + "] is below the expected minimum threshold", listen.putCount
>= expectedPutCount);

    }

    /**
     * Add remove events to the event queue.
     * 
     * @param end
     * @throws Exception
     */
    public void runRemoveTest(int end) throws Exception
    {
        for (int i = 0; i <= end; i++)
        {
            queue.addRemoveEvent(i + ":key");
        }

    }

    /**
     * Add remove events to the event queue.
     * 
     * @throws Exception
     */
    public void runStopProcessingTest() throws Exception
    {
        queue.stopProcessing();
    }

    /**
     * Test putting and a delay. Waits until queue is empty to start.
     * 
     * @param end
     * @param expectedPutCount
     * @throws Exception
     */
    public void runPutDelayTest(int end, int expectedPutCount) throws Exception
    {
        while (!queue.isEmpty())
        {
            synchronized (this)
            {
                System.out.println("queue is busy, waiting 250 millis to begin");
                this.wait(250);
            }
        }
        System.out.println("queue is empty, begin");

        // get it going
        CacheElement elem = new CacheElement("testCache1", "key","data");
        queue.addPutEvent(elem);

        for (int i = 0; i <= end; i++)
        {
            synchronized (this)
            {
                if ( i % 2 == 0)
                {
                    this.wait(idleTime);                    
                }
                else 
                {
                    this.wait( idleTime / 2 );
                }
            }
            CacheElement elem2 = new CacheElement("testCache1", i+ ":key", i
+ "data");
            queue.addPutEvent(elem2);
        }

        while (!queue.isEmpty())
        {
            synchronized (this)
            {
                System.out.println("queue is still busy, waiting 250 millis");
                this.wait(250);
            }
        }
        System.out.println("queue is empty, comparing putCount");

        // this becomes less accurate with each test. It should never fail.
If
        // it does things are very off.
        assertTrue("The put count [" + listen.putCount
            + "] is below the expected minimum threshold", listen.putCount
>= expectedPutCount);

    }

    /**
     * This is a dummy cache listener to use when testing the event queue.
     */
    private class CacheListenerImpl implements ICacheListener
    {

        protected int putCount = 0;

        protected int removeCount = 0;

        /*
         * (non-Javadoc)
         * 
         * @see
org.apache.jcs.engine.behavior.ICacheListener#handlePut(org.apache.jcs.engin
e.behavior.ICacheElement)
         */
        public void handlePut(ICacheElement item) throws IOException
        {
            synchronized (this)
            {
                putCount++;
            }
        }

        /*
         * (non-Javadoc)
         * 
         * @see
org.apache.jcs.engine.behavior.ICacheListener#handleRemove(java.lang.String,
         *      java.io.Serializable)
         */
        public void handleRemove(String cacheName, Serializable key) throws IOException
        {
            synchronized (this)
            {
                removeCount++;
            }

        }

        /*
         * (non-Javadoc)
         * 
         * @see
org.apache.jcs.engine.behavior.ICacheListener#handleRemoveAll(java.lang.Stri
ng)
         */
        public void handleRemoveAll(String cacheName) throws IOException
        {
            // TODO Auto-generated method stub

        }

        /*
         * (non-Javadoc)
         * 
         * @see
org.apache.jcs.engine.behavior.ICacheListener#handleDispose(java.lang.String
)
         */
        public void handleDispose(String cacheName) throws IOException
        {
            // TODO Auto-generated method stub

        }

        /*
         * (non-Javadoc)
         * 
         * @see
org.apache.jcs.engine.behavior.ICacheListener#setListenerId(long)
         */
        public void setListenerId(long id) throws IOException
        {
            // TODO Auto-generated method stub

        }

        /*
         * (non-Javadoc)
         * 
         * @see
org.apache.jcs.engine.behavior.ICacheListener#getListenerId()
         */
        public long getListenerId() throws IOException
        {
            // TODO Auto-generated method stub
            return 0;
        }

    }
} 

-----Original Message-----
From: Wyatt, Allen [mailto:Allen.Wyatt@travelocity.com]
Sent: Monday, January 03, 2005 1:47 PM
To: turbine-jcs-user@jakarta.apache.org
Subject: Thread deadlock in CacheEventQueue class

I've encountered a deadlock in the org.apache.jcs.engine.CacheEventQueue
class.  One thread synchronizes on the queueLock object and then tries synchronizing on the CacheEventQueue object itself while another thread synchronizes on the CacheEventQueue object and then tries synchronizing on the queueLock object.  Here are the stacks of the threads:

Thread #1:
----------
at org.apache.jcs.engine.CacheEventQueue.put(CacheEventQueue.java:299) - waiting to lock <0x546eaac0> (a java.lang.Object (the queueLock object)) at
org.apache.jcs.engine.CacheEventQueue.addPutEvent(CacheEventQueue.java:2
11) - locked <0x546ea628> (a org.apache.jcs.engine.CacheEventQueue)
at
org.apache.jcs.auxiliary.disk.AbstractDiskCache.update(AbstractDiskCache
.java:148)
at
org.apache.jcs.engine.control.CompositeCache.spoolToDisk(CompositeCache.
java:346)
at
org.apache.jcs.engine.memory.AbstractMemoryCache.waterfal(AbstractMemory
Cache.java:230)
at
org.apache.jcs.engine.memory.shrinking.ShrinkerThread.shrink(ShrinkerThr
ead.java:247)
at
org.apache.jcs.engine.memory.shrinking.ShrinkerThread.run(ShrinkerThread
.java:119)

Thread #2 (running the Qprocessor inner class):
----------
at
org.apache.jcs.engine.CacheEventQueue.stopProcessing(CacheEventQueue.jav
a:126) - waiting to lock <0x546ea628> (a
org.apache.jcs.engine.CacheEventQueue)
at
org.apache.jcs.engine.CacheEventQueue$QProcessor.run(CacheEventQueue.jav
a:454) - locked <0x546eaac0> (a java.lang.Object (the queueLock object))

Is this a known problem?  Is there a fix?  I tried looking at cvs.apache.org/viewcvs/jakarta-turbine-jcs/.../CacheEventQueue.java and the code doesn't have a fix as far as I can tell.

I was thinking this could be fixed by changing the code in the QProcessor inner class's run() method from:

    public void run()
    {
        AbstractCacheEvent r = null;

        while ( queue.isAlive() )
        {
            r = queue.take();
    
            if ( log.isDebugEnabled() )
            {
                log.debug( "Event from queue = " + r );
            }

            if ( r == null )
            {
                synchronized ( queueLock )
                {
                    try
                    {
                        queueLock.wait( queue.getWaitToDieMillis() );
                    }
                    catch ( InterruptedException e )
                    {
                        log.warn(
                            "Interrupted while waiting for another event to come in before we die." );
                        return;
                    }
                    r = queue.take();
                    if ( log.isDebugEnabled() )
                    {
                        log.debug( "Event from queue after sleep = " + r );
                    }
                    if ( r == null )
                    {
                        queue.stopProcessing();
                    }
                }
            }

            if ( queue.isWorking() && queue.isAlive() && r != null )
            {
                r.run();
            }
        }
        if ( log.isInfoEnabled() )
        {
            log.info( "QProcessor exiting for " + queue );
        }
    }

to the following code:

    public void run()
    {
        AbstractCacheEvent r = null;

        while ( queue.isAlive() )
        {
            r = queue.take();
    
            if ( log.isDebugEnabled() )
            {
                log.debug( "Event from queue = " + r );
            }

            if ( r == null )
            {
                synchronized ( queueLock )
                {
                    try
                    {
                        queueLock.wait( queue.getWaitToDieMillis() );
                    }
                    catch ( InterruptedException e )
                    {
                        log.warn(
                            "Interrupted while waiting for another event to come in before we die." );
                        return;
                    }
                    r = queue.take();
                    if ( log.isDebugEnabled() )
                    {
                        log.debug( "Event from queue after sleep = " + r );
                    }
                    /*** MOVED CODE FROM HERE (inside synchronized
block) TO BELOW (outside synchronized block) ***/
                }
                /*** MOVED CODE STARTS BELOW: ****/
                if ( r == null )
                {
                    queue.stopProcessing();
                }
                /*** END OF MOVED CODE ****/
            }

            if ( queue.isWorking() && queue.isAlive() && r != null )
            {
                r.run();
            }
        }
        if ( log.isInfoEnabled() )
        {
            log.info( "QProcessor exiting for " + queue );
        }
    }

Does this sound reasonable?

---------------------------------------------------------------------
To unsubscribe, e-mail: turbine-jcs-user-unsubscribe@jakarta.apache.org
For additional commands, e-mail: turbine-jcs-user-help@jakarta.apache.org

---------------------------------------------------------------------
To unsubscribe, e-mail: turbine-jcs-user-unsubscribe@jakarta.apache.org
For additional commands, e-mail: turbine-jcs-user-help@jakarta.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: turbine-jcs-user-unsubscribe@jakarta.apache.org
For additional commands, e-mail: turbine-jcs-user-help@jakarta.apache.org