You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@avalon.apache.org by le...@apache.org on 2001/04/15 19:06:28 UTC

cvs commit: jakarta-avalon/src/java/org/apache/avalon/util/thread/semaphore ConditionalEvent.java DjikstraSemaphore.java ThreadBarrier.java

leosimons    01/04/15 10:06:28

  Added:       src/java/org/apache/avalon/util/thread/semaphore
                        ConditionalEvent.java DjikstraSemaphore.java
                        ThreadBarrier.java
  Log:
  adding some useful threading utilities.
  
  Submitted by Karthik Rangaraju <kr...@sapient.com>.
  
  Revision  Changes    Path
  1.1                  jakarta-avalon/src/java/org/apache/avalon/util/thread/semaphore/ConditionalEvent.java
  
  Index: ConditionalEvent.java
  ===================================================================
  /*
   * Copyright (C) The Apache Software Foundation. All rights reserved.
   *
   * This software is published under the terms of the Apache Software License
   * version 1.1, a copy of which has been included with this distribution in
   * the LICENSE file.
   */
  package org.apache.avalon.util.thread.semaphore;
  
  /**
   * This class implements a POSIX style "Event" object. The difference
   * between the ConditionalEvent and the java wait()/notify() technique is in
   * handling of event state. If a ConditionalEvent is signalled, a thread
   * that subsequently waits on it is immediately released. In case of auto
   * reset EventObjects, the object resets (unsignalled) itself as soon as it
   * is signalled and waiting thread(s) are released (based on whether signal()
   * or signalAll() was called).
   *
   * @author <a href="mailto:kranga@sapient.com">Karthik Rangaraju</a>
   */
  public class ConditionalEvent
  {
      private boolean m_state = false;
      private boolean m_autoReset = false;
      
      // TODO: Need to add methods that block until a specified time and
      // return (though in real-life, I've never known what to do if a thread
      // timesout other than call the method again)!
      
      /**
       * Creates a manual reset ConditionalEvent with a specified initial state
       * @param pInitialState Sets the initial state of the ConditionalEvent.
       * Signalled if pInitialState is true, unsignalled otherwise.
       */
      public ConditionalEvent( boolean initialState )
      {
          m_state = initialState;
      }
      
      /**
       * Creates a ConditionalEvent with the defined initial state
       * @param pInitialState if true, the ConditionalEvent is signalled when
       * created.
       * @param pAutoReset if true creates an auto-reset ConditionalEvent
       */
      public ConditionalEvent( boolean initialState, boolean autoReset )
      {
          m_state = initialState;
          m_autoReset = autoReset;
      }
      
      /**
       * Checks if the event is signalled. Does not block on the operation
       * @return true is event is signalled, false otherwise. Does not reset
       * an autoreset event
       */
      public boolean isSignalled()
      {
          return m_state;
      }
  
      /**
       * Signals the event. A single thread blocked on waitForSignal() is released
       * @see #signalAll()
       * @see #waitForSignal()
       */
      public void signal()
      {
          synchronized ( this )
          {
              m_state = true;
              notify();
          }
      }
  
      /**
       * Current implementation only works with manual reset events. Releases
       * all threads blocked on waitForSignal()
       * @see #waitForSignal()
       */
      public void signalAll()
      {
          synchronized ( this )
          {
              m_state = true;
              notifyAll();
          }
      }
  
      /**
       * Resets the event to an unsignalled state
       */
      public void reset()
      {
          synchronized ( this )
          {
              m_state = false;
          }
      }
      
      /**
       * If the event is signalled, this method returns immediately resetting the
       * signal, otherwise it blocks until the event is signalled.
       * @throws InterruptedException if the thread is interrupted when blocked
       */
      public void waitForSignal()
          throws InterruptedException
      {
          synchronized ( this )
          {
              while ( m_state == false )
              {
                  wait();
              }
              if ( m_autoReset == true )
              {
                  m_state = false;
              }
          }
      }
  
  }
  
  
  
  1.1                  jakarta-avalon/src/java/org/apache/avalon/util/thread/semaphore/DjikstraSemaphore.java
  
  Index: DjikstraSemaphore.java
  ===================================================================
  /*
   * Copyright (C) The Apache Software Foundation. All rights reserved.
   *
   * This software is published under the terms of the Apache Software License
   * version 1.1, a copy of which has been included with this distribution in
   * the LICENSE file.
   */
  package org.apache.avalon.util.thread.semaphore;
  
  /**
   * Also called counting semaphores, Djikstra semaphores are used to control
   * access to a set of resources. A Djikstra semaphore has a count associated
   * with it and each acquire() call reduces the count. A thread that tries to
   * acquire() a Djikstra semaphore with a zero count blocks until someone else
   * calls release() thus increasing the count.
   *
   * @author <a href="mailto:kranga@sapient.com">Karthik Rangaraju</a>
   */
  public class DjikstraSemaphore
  {
      private int m_count;
      private int m_maxCount;
      private Object m_starvationLock = new Object();
  
      /**
       * Creates a Djikstra semaphore with the specified max count and initial
       * count set to the max count (all resources released)
       * @param pMaxCount is the max semaphores that can be acquired
       */
      public DjikstraSemaphore( int maxCount )
      {
          this( maxCount, maxCount);
      }
  
      /**
       * Creates a Djikstra semaphore with the specified max count and an initial
       * count of acquire() operations that are assumed to have already been
       * performed.
       * @param pMaxCount is the max semaphores that can be acquired
       * @pInitialCount is the current count (setting it to zero means all
       * semaphores have already been acquired). 0 <= pInitialCount <= pMaxCount
       */
      public DjikstraSemaphore( int maxCount, int initialCount )
      {
          m_count = initialCount;
          m_maxCount = maxCount;
      }
  
      /**
       * If the count is non-zero, acquires a semaphore and decrements the count
       * by 1, otherwise blocks until a release() is executed by some other thread.
       * @throws InterruptedException is the thread is interrupted when blocked
       * @see #tryAcquire()
       * @see #acquireAll()
       */
      public void acquire()
          throws InterruptedException
      {
          synchronized ( this )
          {
              // Using a spin lock to take care of rogue threads that can enter
              // before a thread that has exited the wait state acquires the monitor
              while ( m_count == 0 )
              {
                  wait();
              }
              m_count--;
              synchronized ( m_starvationLock )
              {
                  if ( m_count == 0 )
                  {
                      m_starvationLock.notify();
                  }
              }
          }
      }
  
      /**
       * Non-blocking version of acquire().
       * @return true if semaphore was acquired (count is decremented by 1), false
       * otherwise
       */
      public boolean tryAcquire()
      {
          synchronized ( this )
          {
              if ( m_count != 0 )
              {
                  m_count--;
                  synchronized ( m_starvationLock )
                  {
                      if ( m_count == 0 )
                      {
                          m_starvationLock.notify();
                      }
                  }
                  return true;
              }
              else
              {
                  return false;
              }
          }
      }
  
      /**
       * Releases a previously acquires semaphore and increments the count by one.
       * Does not check if the thread releasing the semaphore was a thread that
       * acquired the semaphore previously. If more releases are performed than
       * acquires, the count is not increased beyond the max count specified during
       * construction.
       * @see #release( int pCount )
       * @see #releaseAll()
       */
      public void release()
      {
          synchronized ( this )
          {
              m_count++;
              if ( m_count > m_maxCount )
              {
                  m_count = m_maxCount;
              }
              notify();
          }
      }
  
      /**
       * Same as release() except that the count is increased by pCount instead
       * of 1. The resulting count is capped at max count specified in the
       * constructor
       * @param pCount is the amount by which the counter should be incremented
       * @see #release()
       */
      public void release(int count)
      {
          synchronized ( this )
          {
              if ( m_count + count > m_maxCount )
              {
                  m_count = m_maxCount;
              }
              else
              {
                  m_count += count;
              }
              notifyAll();
          }
      }
  
      /**
       * Tries to acquire all the semaphores thus bringing the count to zero.
       * @throws InterruptedException if the thread is interrupted when blocked on
       * this call
       * @see #acquire()
       * @see #releaseAll()
       */
      public void acquireAll()
          throws InterruptedException
      {
          synchronized ( this )
          {
              for ( int index = 0; index < m_maxCount; index++ )
              {
                  acquire();
              }
          }
      }
  
      /**
       * Releases all semaphores setting the count to max count.
       * Warning: If this method is called by a thread that did not make a
       * corresponding acquireAll() call, then you better know what you are doing!
       * @see #acquireAll()
       */
      public void releaseAll()
      {
          synchronized ( this )
          {
              release( m_maxCount );
              notifyAll();
          }
      }
  
      /**
       * This method blocks the calling thread until the count drops to zero.
       * The method is not stateful and hence a drop to zero will not be recognized
       * if a release happens before this call. You can use this method to implement
       * threads that dynamically increase the resource pool or that log occurences
       * of resource starvation. Also called a reverse-sensing semaphore
       * @throws InterruptedException if the thread is interrupted while waiting
       */
      public void starvationCheck()
          throws InterruptedException
      {
          synchronized ( m_starvationLock )
          {
              if ( m_count != 0 )
              {
                  m_starvationLock.wait();
              }
          }
      }
  }
  
  
  
  
  1.1                  jakarta-avalon/src/java/org/apache/avalon/util/thread/semaphore/ThreadBarrier.java
  
  Index: ThreadBarrier.java
  ===================================================================
  /*
   * Copyright (C) The Apache Software Foundation. All rights reserved.
   *
   * This software is published under the terms of the Apache Software License
   * version 1.1, a copy of which has been included with this distribution in
   * the LICENSE file.
   */
  package org.apache.avalon.util.thread.semaphore;
  
  /**
   * A thread barrier blocks all threads hitting it until a pre-defined number
   * of threads arrive at the barrier. This is useful for implementing release
   * consistent concurrency where you don't want to take the performance penalty
   * of providing mutual exclusion to shared resources
   *
   * @author <a href="mailto:kranga@sapient.com">Karthik Rangaraju</a>
   */
  public class ThreadBarrier
  {
      private int m_threshold;
      private int m_count;
  
      /**
       * Initializes a thread barrier object with a given thread count
       * @param pCount is the number of threads that need to block on
       * barrierSynchronize() before they will be allowed to pass through
       * @see #barrierSynchronize()
       */
      public ThreadBarrier( int count )
      {
          m_threshold = count;
          m_count = 0;
      }
  
      /**
       * This method blocks all threads calling it until the threshold number of
       * threads have called it. It then releases all threads blocked by it
       * @throws InterruptedException if any thread blocked during the call is
       * interrupted
       */
      public void barrierSynchronize()
          throws InterruptedException
      {
          synchronized ( this )
          {
              if ( m_count != m_threshold - 1 )
              {
                  m_count++;
                  wait();
              }
              else
              {
                  m_count = 0;
                  notifyAll();
              }
          }
      }
  
  }
  
  
  

---------------------------------------------------------------------
To unsubscribe, e-mail: avalon-dev-unsubscribe@jakarta.apache.org
For additional commands, e-mail: avalon-dev-help@jakarta.apache.org