You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jcs-dev@jakarta.apache.org by as...@apache.org on 2009/07/16 22:14:43 UTC

svn commit: r794825 [1/2] - in /jakarta/jcs/trunk/src: java/org/apache/jcs/auxiliary/lateral/ java/org/apache/jcs/auxiliary/lateral/socket/tcp/ java/org/apache/jcs/auxiliary/lateral/socket/tcp/discovery/ java/org/apache/jcs/auxiliary/lateral/socket/tcp...

Author: asmuts
Date: Thu Jul 16 20:14:42 2009
New Revision: 794825

URL: http://svn.apache.org/viewvc?rev=794825&view=rev
Log:
Moving UDP discovery out of the tcp lateral package.  This isn't finished yet.  I'm trying to make it usable by other auxiliaries.  

Added:
    jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/socket/tcp/LateralTCPDiscoveryListener.java
    jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/socket/tcp/discovery/behavior/
    jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/
    jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/DiscoveredService.java
    jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/DiscoveryShutdownHook.java
    jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/UDPCleanupRunner.java
    jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/UDPDiscoveryAttributes.java
    jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/UDPDiscoveryInfo.java
    jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/UDPDiscoveryManager.java   (contents, props changed)
      - copied, changed from r781592, jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/socket/tcp/discovery/UDPDiscoveryManager.java
    jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/UDPDiscoveryMessage.java   (contents, props changed)
      - copied, changed from r781592, jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/socket/tcp/discovery/UDPDiscoveryMessage.java
    jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/UDPDiscoveryReceiver.java   (contents, props changed)
      - copied, changed from r792552, jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/socket/tcp/discovery/UDPDiscoveryReceiver.java
    jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/UDPDiscoverySender.java   (contents, props changed)
      - copied, changed from r781592, jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/socket/tcp/discovery/UDPDiscoverySender.java
    jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/UDPDiscoverySenderThread.java   (contents, props changed)
      - copied, changed from r781592, jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/socket/tcp/discovery/UDPDiscoverySenderThread.java
    jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/UDPDiscoveryService.java   (contents, props changed)
      - copied, changed from r781592, jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/socket/tcp/discovery/UDPDiscoveryService.java
    jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/behavior/
    jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/behavior/IDiscoveryListener.java
    jakarta/jcs/trunk/src/test/org/apache/jcs/auxiliary/lateral/socket/tcp/MockLateralCache.java   (contents, props changed)
      - copied, changed from r781592, jakarta/jcs/trunk/src/test/org/apache/jcs/auxiliary/lateral/socket/tcp/discovery/MockLateralCache.java
    jakarta/jcs/trunk/src/test/org/apache/jcs/utils/discovery/
    jakarta/jcs/trunk/src/test/org/apache/jcs/utils/discovery/MockDiscoveryListener.java
    jakarta/jcs/trunk/src/test/org/apache/jcs/utils/discovery/UDPDiscoverySenderUnitTest.java
    jakarta/jcs/trunk/src/test/org/apache/jcs/utils/discovery/UDPDiscoveryServiceUnitTest.java
    jakarta/jcs/trunk/src/test/org/apache/jcs/utils/discovery/UDPDiscoveryUnitTest.java   (contents, props changed)
      - copied, changed from r781592, jakarta/jcs/trunk/src/test/org/apache/jcs/auxiliary/lateral/socket/tcp/discovery/UDPDiscoveryUnitTest.java
Removed:
    jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/socket/tcp/discovery/UDPDiscoveryManager.java
    jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/socket/tcp/discovery/UDPDiscoveryMessage.java
    jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/socket/tcp/discovery/UDPDiscoveryReceiver.java
    jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/socket/tcp/discovery/UDPDiscoverySender.java
    jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/socket/tcp/discovery/UDPDiscoverySenderThread.java
    jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/socket/tcp/discovery/UDPDiscoveryService.java
    jakarta/jcs/trunk/src/test/org/apache/jcs/auxiliary/lateral/socket/tcp/discovery/MockLateralCache.java
    jakarta/jcs/trunk/src/test/org/apache/jcs/auxiliary/lateral/socket/tcp/discovery/UDPDiscoveryUnitTest.java
Modified:
    jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/LateralCacheNoWaitFacade.java
    jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/socket/tcp/LateralTCPCacheFactory.java
    jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/socket/tcp/LateralTCPSender.java
    jakarta/jcs/trunk/src/test-conf/log4j.properties

Modified: jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/LateralCacheNoWaitFacade.java
URL: http://svn.apache.org/viewvc/jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/LateralCacheNoWaitFacade.java?rev=794825&r1=794824&r2=794825&view=diff
==============================================================================
--- jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/LateralCacheNoWaitFacade.java (original)
+++ jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/LateralCacheNoWaitFacade.java Thu Jul 16 20:14:42 2009
@@ -90,7 +90,7 @@
      * @param noWait
      * @return true if it wasn't alreay contained
      */
-    public boolean addNoWait( LateralCacheNoWait noWait )
+    public synchronized boolean addNoWait( LateralCacheNoWait noWait )
     {
         if ( noWait == null )
         {
@@ -120,9 +120,50 @@
         noWaits = newArray;
 
         return true;
+    }
 
+    /**
+     * Removes a no wait from the list if it is already there.
+     * <p>
+     * @param noWait
+     * @return true if it was already in the array
+     */
+    public synchronized boolean removeNoWait( LateralCacheNoWait noWait )
+    {
+        if ( noWait == null )
+        {
+            return false;
+        }
+
+        int position = -1;
+        for ( int i = 0; i < noWaits.length; i++ )
+        {
+            // we know noWait isn't null
+            if ( noWait.equals( noWaits[i] ) )
+            {
+                position = i;
+            }
+        }
+
+        if ( position == -1 )
+        {
+            return false;
+        }
+        
+        LateralCacheNoWait[] newArray = new LateralCacheNoWait[noWaits.length -1];
+
+        if ( position > 0 )
+        {
+            System.arraycopy( noWaits, 0, newArray, 0, position -1 );
+        }
+        System.arraycopy( noWaits, position +1, newArray, 0, noWaits.length );
+
+        noWaits = newArray;
+
+        return true;
     }
 
+    
     /**
      * @param ce
      * @throws IOException

Modified: jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/socket/tcp/LateralTCPCacheFactory.java
URL: http://svn.apache.org/viewvc/jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/socket/tcp/LateralTCPCacheFactory.java?rev=794825&r1=794824&r2=794825&view=diff
==============================================================================
--- jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/socket/tcp/LateralTCPCacheFactory.java (original)
+++ jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/socket/tcp/LateralTCPCacheFactory.java Thu Jul 16 20:14:42 2009
@@ -31,12 +31,12 @@
 import org.apache.jcs.auxiliary.lateral.LateralCacheNoWaitFacade;
 import org.apache.jcs.auxiliary.lateral.behavior.ILateralCacheAttributes;
 import org.apache.jcs.auxiliary.lateral.socket.tcp.behavior.ITCPLateralCacheAttributes;
-import org.apache.jcs.auxiliary.lateral.socket.tcp.discovery.UDPDiscoveryManager;
-import org.apache.jcs.auxiliary.lateral.socket.tcp.discovery.UDPDiscoveryService;
 import org.apache.jcs.engine.behavior.ICache;
 import org.apache.jcs.engine.behavior.ICompositeCacheManager;
 import org.apache.jcs.engine.behavior.IElementSerializer;
 import org.apache.jcs.engine.logging.behavior.ICacheEventLogger;
+import org.apache.jcs.utils.discovery.UDPDiscoveryManager;
+import org.apache.jcs.utils.discovery.UDPDiscoveryService;
 
 /**
  * Constructs a LateralCacheNoWaitFacade for the given configuration. Each lateral service / local
@@ -154,8 +154,8 @@
      * @param lac ITCPLateralCacheAttributes
      * @param lcnwf
      * @param cacheMgr
-     * @param cacheEventLogger 
-     * @param elementSerializer 
+     * @param cacheEventLogger
+     * @param elementSerializer
      * @return null if none is created.
      */
     private UDPDiscoveryService createDiscoveryService( ITCPLateralCacheAttributes lac, LateralCacheNoWaitFacade lcnwf,
@@ -168,13 +168,19 @@
         // create the UDP discovery for the TCP lateral
         if ( lac.isUdpDiscoveryEnabled() )
         {
+            LateralTCPDiscoveryListener discoveryListener = new LateralTCPDiscoveryListener( cacheMgr, cacheEventLogger,
+                                                                                    elementSerializer );
+
+            discoveryListener.addNoWaitFacade( lcnwf, lac.getCacheName() );
+            
             // need a factory for this so it doesn't
             // get dereferenced, also we don't want one for every region.
-            discovery = UDPDiscoveryManager.getInstance().getService( lac, cacheMgr, cacheEventLogger,
+            discovery = UDPDiscoveryManager.getInstance().getService( lac.getUdpDiscoveryAddr(), lac.getUdpDiscoveryPort(), lac.getTcpListenerPort(), cacheMgr, cacheEventLogger,
                                                                       elementSerializer );
 
-            discovery.addNoWaitFacade( lcnwf, lac.getCacheName() );
-
+            discovery.addParticipatingCacheName( lac.getCacheName() );
+            discovery.setDiscoveryListener( discoveryListener );
+                        
             if ( log.isInfoEnabled() )
             {
                 log.info( "Created UDPDiscoveryService for TCP lateral cache." );

Added: jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/socket/tcp/LateralTCPDiscoveryListener.java
URL: http://svn.apache.org/viewvc/jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/socket/tcp/LateralTCPDiscoveryListener.java?rev=794825&view=auto
==============================================================================
--- jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/socket/tcp/LateralTCPDiscoveryListener.java (added)
+++ jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/socket/tcp/LateralTCPDiscoveryListener.java Thu Jul 16 20:14:42 2009
@@ -0,0 +1,208 @@
+package org.apache.jcs.auxiliary.lateral.socket.tcp;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.jcs.auxiliary.lateral.LateralCacheAttributes;
+import org.apache.jcs.auxiliary.lateral.LateralCacheNoWait;
+import org.apache.jcs.auxiliary.lateral.LateralCacheNoWaitFacade;
+import org.apache.jcs.auxiliary.lateral.socket.tcp.behavior.ITCPLateralCacheAttributes;
+import org.apache.jcs.engine.behavior.ICache;
+import org.apache.jcs.engine.behavior.ICompositeCacheManager;
+import org.apache.jcs.engine.behavior.IElementSerializer;
+import org.apache.jcs.engine.logging.behavior.ICacheEventLogger;
+import org.apache.jcs.utils.discovery.DiscoveredService;
+import org.apache.jcs.utils.discovery.behavior.IDiscoveryListener;
+
+/** This knows how to add and remove discovered services. */
+public class LateralTCPDiscoveryListener
+    implements IDiscoveryListener
+{
+    /** The log factory */
+    private final static Log log = LogFactory.getLog( LateralTCPDiscoveryListener.class );
+
+    /**
+     * Map of no wait facades. these are used to determine which regions are locally configured to
+     * use laterals.
+     */
+    private Map facades = new HashMap();
+
+    /** The cache manager. */
+    private ICompositeCacheManager cacheMgr;
+
+    /** The event logger. */
+    protected ICacheEventLogger cacheEventLogger;
+
+    /** The serializer. */
+    protected IElementSerializer elementSerializer;
+
+    /**
+     * This plugs into the udp discovery system. It will receive add and remove events.
+     * <p>
+     * @param cacheMgr
+     * @param cacheEventLogger
+     * @param elementSerializer
+     */
+    protected LateralTCPDiscoveryListener( ICompositeCacheManager cacheMgr, ICacheEventLogger cacheEventLogger,
+                                           IElementSerializer elementSerializer )
+    {
+        this.cacheMgr = cacheMgr;
+        this.cacheEventLogger = cacheEventLogger;
+        this.elementSerializer = elementSerializer;
+    }
+
+    /**
+     * Adds a nowait facade under this cachename. If one already existed, it will be overridden.
+     * <p>
+     * This adds nowaits to a facade for the region name. If the region has no facade, then it is
+     * not configured to use the lateral cache, and no facade will be created.
+     * <p>
+     * @param facade
+     * @param cacheName
+     * @return true if the facade was not already registered.
+     */
+    public synchronized boolean addNoWaitFacade( LateralCacheNoWaitFacade facade, String cacheName )
+    {
+        boolean isNew = !facades.containsKey( cacheName );
+
+        // override or put anew, it doesn't matter
+        facades.put( cacheName, facade );
+
+        return isNew;
+    }
+
+    /**
+     * When a broadcast is received from the UDP Discovery receiver, for each cacheName in the
+     * message, the add no wait will be called here. To add a no wait, the facade is looked up for
+     * this cache name.
+     * <p>
+     * @param noWait
+     */
+    protected void addNoWait( LateralCacheNoWait noWait )
+    {
+        LateralCacheNoWaitFacade facade = (LateralCacheNoWaitFacade) facades.get( noWait.getCacheName() );
+        if ( log.isDebugEnabled() )
+        {
+            log.debug( "Got facade for " + noWait.getCacheName() + " = " + facade );
+        }
+
+        if ( facade != null )
+        {
+            boolean isNew = facade.addNoWait( noWait );
+            if ( log.isDebugEnabled() )
+            {
+                log.debug( "Called addNoWait, isNew = " + isNew );
+            }
+        }
+        else
+        {
+            if ( log.isInfoEnabled() )
+            {
+                log.info( "Different nodes are configured differently.  Region [" + noWait.getCacheName()
+                    + "] is not configured to use the lateral cache." );
+            }
+        }
+    }
+
+    /**
+     * Look up the facade for the name. If it doesn't exist, then the region is not configured for
+     * use with the lateral cache. If it is present, remove the item from the no wait list.
+     * <p>
+     * @param noWait
+     */
+    protected void removeNoWait( LateralCacheNoWait noWait )
+    {
+        LateralCacheNoWaitFacade facade = (LateralCacheNoWaitFacade) facades.get( noWait.getCacheName() );
+        if ( log.isDebugEnabled() )
+        {
+            log.debug( "Got facade for " + noWait.getCacheName() + " = " + facade );
+        }
+
+        if ( facade != null )
+        {
+            boolean isNew = facade.addNoWait( noWait );
+            if ( log.isDebugEnabled() )
+            {
+                log.debug( "Called addNoWait, isNew = " + isNew );
+            }
+        }
+        else
+        {
+            if ( log.isInfoEnabled() )
+            {
+                log.info( "Different nodes are configured differently.  Region [" + noWait.getCacheName()
+                    + "] is not configured to use the lateral cache." );
+            }
+        }
+    }
+
+    /**
+     * Creates the lateral cache if needed.
+     * <p>
+     * @param service
+     */
+    public void addDiscoveredService( DiscoveredService service )
+    {
+        // get a cache and add it to the no waits
+        // the add method should not add the same.
+        // we need the listener port from the original config.
+        ITCPLateralCacheAttributes lca = new TCPLateralCacheAttributes();
+        lca.setTransmissionType( LateralCacheAttributes.TCP );
+        lca.setTcpServer( service.getServiceAddress() + ":" + service.getServicePort() );
+        LateralTCPCacheManager lcm = LateralTCPCacheManager.getInstance( lca, cacheMgr, cacheEventLogger,
+                                                                         elementSerializer );
+
+        ArrayList regions = service.getCacheNames();
+        if ( regions != null )
+        {
+            // for each region get the cache
+            Iterator it = regions.iterator();
+            while ( it.hasNext() )
+            {
+                String cacheName = (String) it.next();
+
+                try
+                {
+                    ICache ic = lcm.getCache( cacheName );
+
+                    if ( log.isDebugEnabled() )
+                    {
+                        log.debug( "Got cache, ic = " + ic );
+                    }
+
+                    // add this to the nowaits for this cachename
+                    if ( ic != null )
+                    {
+                        addNoWait( (LateralCacheNoWait) ic );
+                        if ( log.isDebugEnabled() )
+                        {
+                            log.debug( "Called addNoWait for cacheName " + cacheName );
+                        }
+                    }
+                }
+                catch ( Exception e )
+                {
+                    log.error( "Problem creating no wait", e );
+                }
+            }
+            // end while
+        }
+        else
+        {
+            log.warn( "No cache names found in message " + service );
+        }
+    }
+
+    /**
+     * Removes the lateral cache.
+     * @param service
+     */
+    public void removeDiscoveredService( DiscoveredService service )
+    {
+        // TODO Auto-generated method stub
+    }
+}

Modified: jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/socket/tcp/LateralTCPSender.java
URL: http://svn.apache.org/viewvc/jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/socket/tcp/LateralTCPSender.java?rev=794825&r1=794824&r2=794825&view=diff
==============================================================================
--- jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/socket/tcp/LateralTCPSender.java (original)
+++ jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/socket/tcp/LateralTCPSender.java Thu Jul 16 20:14:42 2009
@@ -32,7 +32,7 @@
 import org.apache.jcs.auxiliary.lateral.socket.tcp.utils.SocketOpener;
 
 /**
- * This class is based on the log4j SocketAppender class. I'm using a differnet repair structure, so
+ * This class is based on the log4j SocketAppender class. I'm using a different repair structure, so
  * it is significantly different.
  */
 public class LateralTCPSender
@@ -66,7 +66,7 @@
 
     // reset the ObjectOutputStream every 70 calls
     // private static final int RESET_FREQUENCY = 70;
-    // Perhaps we need to resett every time until we move to jdk 1.4
+    // Perhaps we need to reset every time until we move to jdk 1.4
     // then we can call writeUnshared to make sure
     // that the object definetely gets across and not
     // a stream cached version.
@@ -270,7 +270,7 @@
             // Synchronized to insure that the get requests to server from this
             // sender and the responses are processed in order, else you could
             // return the wrong item from the cache.
-            // This is a big block of code. May need to rethink this strategy.
+            // This is a big block of code. May need to re-think this strategy.
             // This may not be necessary.
             // Normal puts, etc to laterals do not have to be synchronized.
             synchronized ( this.getLock )
@@ -307,8 +307,7 @@
                         String message = "Could not open ObjectInputStream to " + socket;
                         if ( socket != null )
                         {
-                            message += " SoTimeout [" + socket.getSoTimeout() + "]";
-                            // this is 1.4 specific -- Connected [" + socket.isConnected() + "]";
+                            message += " SoTimeout [" + socket.getSoTimeout() + "] Connected [" + socket.isConnected() + "]";
                         }
                         log.error( message, ioe );
                         throw ioe;
@@ -322,8 +321,7 @@
                     {
                         counter = 0;
                         // Failing to reset the object output stream every now
-                        // and
-                        // then creates a serious memory leak.
+                        // and then creates a serious memory leak.
                         log.info( "Doing oos.reset()" );
                         oos.reset();
                     }

Added: jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/DiscoveredService.java
URL: http://svn.apache.org/viewvc/jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/DiscoveredService.java?rev=794825&view=auto
==============================================================================
--- jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/DiscoveredService.java (added)
+++ jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/DiscoveredService.java Thu Jul 16 20:14:42 2009
@@ -0,0 +1,140 @@
+package org.apache.jcs.utils.discovery;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+
+import org.apache.commons.lang.builder.EqualsBuilder;
+import org.apache.commons.lang.builder.HashCodeBuilder;
+
+/**
+ * This contains info about a discovered service. These objects are stored in a set in the
+ * UDPDiscoveryService.
+ * <p>
+ * @author Aaron Smuts
+ */
+public class DiscoveredService
+    implements Serializable
+{
+    /** For serialization. Don't change. */
+    private static final long serialVersionUID = -7810164772089509751L;
+
+    /** region names */
+    private ArrayList cacheNames;
+
+    /** service address */
+    private String serviceAddress;
+
+    /** service port */
+    private int servicePort;
+
+    /** last time we heard from this service? */
+    private long lastHearFromTime = 0;
+
+    /**
+     * @param cacheNames the cacheNames to set
+     */
+    public void setCacheNames( ArrayList cacheNames )
+    {
+        this.cacheNames = cacheNames;
+    }
+
+    /**
+     * @return the cacheNames
+     */
+    public ArrayList getCacheNames()
+    {
+        return cacheNames;
+    }
+
+    /**
+     * @param serviceAddress The serviceAddress to set.
+     */
+    public void setServiceAddress( String serviceAddress )
+    {
+        this.serviceAddress = serviceAddress;
+    }
+
+    /**
+     * @return Returns the serviceAddress.
+     */
+    public String getServiceAddress()
+    {
+        return serviceAddress;
+    }
+
+    /**
+     * @param servicePort The servicePort to set.
+     */
+    public void setServicePort( int servicePort )
+    {
+        this.servicePort = servicePort;
+    }
+
+    /**
+     * @return Returns the servicePort.
+     */
+    public int getServicePort()
+    {
+        return servicePort;
+    }
+
+    /**
+     * @param lastHearFromTime The lastHearFromTime to set.
+     */
+    public void setLastHearFromTime( long lastHearFromTime )
+    {
+        this.lastHearFromTime = lastHearFromTime;
+    }
+
+    /**
+     * @return Returns the lastHearFromTime.
+     */
+    public long getLastHearFromTime()
+    {
+        return lastHearFromTime;
+    }
+
+    /** @return hashcode based on address/port/name */
+    public int hashCode()
+    {
+        HashCodeBuilder builder = new HashCodeBuilder();
+        builder.append( this.getServiceAddress() );
+        builder.append( this.getServicePort() );
+        builder.append( this.getCacheNames() );
+        return builder.toHashCode();
+    }
+
+    /**
+     * NOTE - this object is often put into sets, so equals needs to be overridden.
+     * <p>
+     * @param otherArg other
+     * @return equality based on the address/port/name
+     */
+    public boolean equals( Object otherArg )
+    {
+        if ( otherArg instanceof DiscoveredService )
+        {
+            DiscoveredService other = (DiscoveredService) otherArg;
+            EqualsBuilder builder = new EqualsBuilder();
+            builder.append( this.getServiceAddress(), other.getServiceAddress() );
+            builder.append( this.getServicePort(), other.getServicePort() );
+            builder.append( this.getCacheNames(), other.getCacheNames() );
+            return builder.isEquals();
+        }
+        return false;
+    }
+
+    /**
+     * @return string for debugging purposes.
+     */
+    public String toString()
+    {
+        StringBuffer buf = new StringBuffer();
+        buf.append( "\n DiscoveredService" );
+        buf.append( "\n CacheNames = [" + getCacheNames() + "]" );
+        buf.append( "\n ServiceAddress = [" + getServiceAddress() + "]" );
+        buf.append( "\n ServicePort = [" + getServicePort() + "]" );
+        buf.append( "\n LastHearFromTime = [" + getLastHearFromTime() + "]" );
+        return buf.toString();
+    }
+}

Added: jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/DiscoveryShutdownHook.java
URL: http://svn.apache.org/viewvc/jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/DiscoveryShutdownHook.java?rev=794825&view=auto
==============================================================================
--- jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/DiscoveryShutdownHook.java (added)
+++ jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/DiscoveryShutdownHook.java Thu Jul 16 20:14:42 2009
@@ -0,0 +1,50 @@
+package org.apache.jcs.utils.discovery;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Calls shutdown on the manager when the vm is existing. On an orderly shutdown, then will result
+ * in the manager issuing a remove message.
+ * <p>
+ * The service is also registered with the composite cache manager to receive shutdown notification.
+ * <p>
+ * @author Aaron Smuts
+ */
+public class DiscoveryShutdownHook
+    extends Thread
+{
+    /** log instance */
+    private static final Log log = LogFactory.getLog( DiscoveryShutdownHook.class );
+
+    /** service to shut down */
+    private UDPDiscoveryService service;
+
+    /**
+     * @param serviceArg service
+     */
+    public DiscoveryShutdownHook( UDPDiscoveryService serviceArg )
+    {
+        this.service = serviceArg;
+    }
+
+    /**
+     * Just calls shutdown on the service.
+     */
+    public void run()
+    {
+        if ( log.isInfoEnabled() )
+        {
+            log.info( "UDP Discovery shutdown hook called." );
+        }
+
+        if ( service != null )
+        {
+            service.shutdown();
+        }
+        else
+        {
+            log.warn( "UDP Discovery Service was null." );
+        }
+    }
+}

Added: jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/UDPCleanupRunner.java
URL: http://svn.apache.org/viewvc/jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/UDPCleanupRunner.java?rev=794825&view=auto
==============================================================================
--- jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/UDPCleanupRunner.java (added)
+++ jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/UDPCleanupRunner.java Thu Jul 16 20:14:42 2009
@@ -0,0 +1,82 @@
+package org.apache.jcs.utils.discovery;
+
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * This class periodically check the lastHeardFrom time on the services.
+ * <p>
+ * If they exceed the configurable limit, it removes them from the set.
+ * <p>
+ * @author Aaron Smuts
+ */
+public class UDPCleanupRunner
+    implements Runnable
+{
+    /** log instance */
+    private static final Log log = LogFactory.getLog( UDPCleanupRunner.class );
+
+    /** UDP discovery service */
+    private UDPDiscoveryService discoveryService;
+
+    /** default for max idle time, in seconds */
+    private static final long DEFAULT_MAX_IDLE_TIME_SECONDS = 180;
+
+    /** The configured max idle time, in seconds */
+    private long maxIdleTimeSeconds = DEFAULT_MAX_IDLE_TIME_SECONDS;
+
+    /**
+     * @param service UDPDiscoveryService
+     */
+    public UDPCleanupRunner( UDPDiscoveryService service )
+    {
+        this.discoveryService = service;
+    }
+
+    /**
+     * This goes through the list of services and removes those that we haven't heard from in longer
+     * than the max idle time.
+     * <p>
+     * @see java.lang.Runnable#run()
+     */
+    public void run()
+    {
+        long now = System.currentTimeMillis();
+
+        // iterate through the set
+        // it is thread safe
+        // http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/CopyOnWriteArraySet.
+        // html
+        // TODO this should get a copy.  you can't simply remove from this.
+        // the listeners need to be notified.
+        Iterator it = discoveryService.getDiscoveredServices().iterator();
+
+        Set toRemove = new HashSet();
+        // can't remove via the iterator. must remove directly
+        while ( it.hasNext() )
+        {
+            DiscoveredService service = (DiscoveredService) it.next();
+            if ( ( now - service.getLastHearFromTime() ) > ( maxIdleTimeSeconds * 1000 ) )
+            {
+                if ( log.isInfoEnabled() )
+                {
+                    log.info( "Removing service, since we haven't heard from it in " + maxIdleTimeSeconds
+                        + " seconds.  service = " + service );
+                }
+                toRemove.add( service );
+            }
+        }
+
+        // remove the bad ones
+        Iterator toRemoveIt = toRemove.iterator();
+        while ( toRemoveIt.hasNext() )
+        {
+            // call this so the listeners get notified
+            discoveryService.removeDiscoveredService( (DiscoveredService) toRemoveIt.next() );
+        }
+    }
+}

Added: jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/UDPDiscoveryAttributes.java
URL: http://svn.apache.org/viewvc/jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/UDPDiscoveryAttributes.java?rev=794825&view=auto
==============================================================================
--- jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/UDPDiscoveryAttributes.java (added)
+++ jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/UDPDiscoveryAttributes.java Thu Jul 16 20:14:42 2009
@@ -0,0 +1,210 @@
+package org.apache.jcs.utils.discovery;
+
+/**
+ * Configuration properties for UDP discover service.
+ * <p>
+ * The service will allow out applications to find each other.
+ * <p>
+ * @author Aaron Smuts
+ */
+public class UDPDiscoveryAttributes
+    implements Cloneable
+{
+    /** service name */
+    private String serviceName;
+
+    /** service address */
+    private String serviceAddress;
+
+    /** service port */
+    private int servicePort;
+
+    /**
+     * false -> this service instance is not ready to receive requests. true -> ready for use
+     */
+    private boolean isDark;
+
+    /** default udp discovery address */
+    private static final String DEFAULT_UDP_DISCOVERY_ADDRESS = "228.4.5.6";
+
+    /** default udp discovery port */
+    private static final int DEFAULT_UDP_DISCOVERY_PORT = 5678;
+
+    /** udp discovery address */
+    private String udpDiscoveryAddr = DEFAULT_UDP_DISCOVERY_ADDRESS;
+
+    /** udp discovery port */
+    private int udpDiscoveryPort = DEFAULT_UDP_DISCOVERY_PORT;
+
+    /** default delay between sending passive broadcasts */
+    private static final int DEFAULT_SEND_DELAY_SEC = 60;
+
+    /** delay between sending passive broadcasts */
+    private int sendDelaySec = DEFAULT_SEND_DELAY_SEC;
+
+    /** default amount of time before we remove services that we haven't heard from */
+    private static final int DEFAULT_MAX_IDLE_TIME_SEC = 180;
+
+    /** amount of time before we remove services that we haven't heard from */
+    private int maxIdleTimeSec = DEFAULT_MAX_IDLE_TIME_SEC;
+
+    /**
+     * @param serviceName The serviceName to set.
+     */
+    public void setServiceName( String serviceName )
+    {
+        this.serviceName = serviceName;
+    }
+
+    /**
+     * @return Returns the serviceName.
+     */
+    public String getServiceName()
+    {
+        return serviceName;
+    }
+
+    /**
+     * @param serviceAddress The serviceAddress to set.
+     */
+    public void setServiceAddress( String serviceAddress )
+    {
+        this.serviceAddress = serviceAddress;
+    }
+
+    /**
+     * @return Returns the serviceAddress.
+     */
+    public String getServiceAddress()
+    {
+        return serviceAddress;
+    }
+
+    /**
+     * @param servicePort The servicePort to set.
+     */
+    public void setServicePort( int servicePort )
+    {
+        this.servicePort = servicePort;
+    }
+
+    /**
+     * @return Returns the servicePort.
+     */
+    public int getServicePort()
+    {
+        return servicePort;
+    }
+
+    /**
+     * @param udpDiscoveryAddr The udpDiscoveryAddr to set.
+     */
+    public void setUdpDiscoveryAddr( String udpDiscoveryAddr )
+    {
+        this.udpDiscoveryAddr = udpDiscoveryAddr;
+    }
+
+    /**
+     * @return Returns the udpDiscoveryAddr.
+     */
+    public String getUdpDiscoveryAddr()
+    {
+        return udpDiscoveryAddr;
+    }
+
+    /**
+     * @param udpDiscoveryPort The udpDiscoveryPort to set.
+     */
+    public void setUdpDiscoveryPort( int udpDiscoveryPort )
+    {
+        this.udpDiscoveryPort = udpDiscoveryPort;
+    }
+
+    /**
+     * @return Returns the udpDiscoveryPort.
+     */
+    public int getUdpDiscoveryPort()
+    {
+        return udpDiscoveryPort;
+    }
+
+    /**
+     * @param sendDelaySec The sendDelaySec to set.
+     */
+    public void setSendDelaySec( int sendDelaySec )
+    {
+        this.sendDelaySec = sendDelaySec;
+    }
+
+    /**
+     * @return Returns the sendDelaySec.
+     */
+    public int getSendDelaySec()
+    {
+        return sendDelaySec;
+    }
+
+    /**
+     * @param maxIdleTimeSec The maxIdleTimeSec to set.
+     */
+    public void setMaxIdleTimeSec( int maxIdleTimeSec )
+    {
+        this.maxIdleTimeSec = maxIdleTimeSec;
+    }
+
+    /**
+     * @return Returns the maxIdleTimeSec.
+     */
+    public int getMaxIdleTimeSec()
+    {
+        return maxIdleTimeSec;
+    }
+
+    /**
+     * @return Returns the isDark.
+     */
+    public boolean isDark()
+    {
+        return isDark;
+    }
+
+    /**
+     * @param isDark The isDark to set.
+     */
+    public void setDark( boolean isDark )
+    {
+        this.isDark = isDark;
+    }
+
+    /** @return a clone of this object */
+    public Object clone()
+    {
+        UDPDiscoveryAttributes attributes = new UDPDiscoveryAttributes();
+        attributes.setSendDelaySec( this.getSendDelaySec() );
+        attributes.setMaxIdleTimeSec( this.getMaxIdleTimeSec() );
+        attributes.setServiceName( this.getServiceName() );
+        attributes.setServicePort( this.getServicePort() );
+        attributes.setUdpDiscoveryAddr( this.getUdpDiscoveryAddr() );
+        attributes.setUdpDiscoveryPort( this.getUdpDiscoveryPort() );
+        attributes.setDark( this.isDark() );
+        return attributes;
+    }
+
+    /**
+     * @return string for debugging purposes.
+     */
+    public String toString()
+    {
+        StringBuffer buf = new StringBuffer();
+        buf.append( "\n UDPDiscoveryAttributes" );
+        buf.append( "\n ServiceName = [" + getServiceName() + "]" );
+        buf.append( "\n ServiceAddress = [" + getServiceAddress() + "]" );
+        buf.append( "\n ServicePort = [" + getServicePort() + "]" );
+        buf.append( "\n UdpDiscoveryAddr = [" + getUdpDiscoveryAddr() + "]" );
+        buf.append( "\n UdpDiscoveryPort = [" + getUdpDiscoveryPort() + "]" );
+        buf.append( "\n SendDelaySec = [" + getSendDelaySec() + "]" );
+        buf.append( "\n MaxIdleTimeSec = [" + getMaxIdleTimeSec() + "]" );
+        buf.append( "\n IsDark = [" + isDark() + "]" );
+        return buf.toString();
+    }
+}

Added: jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/UDPDiscoveryInfo.java
URL: http://svn.apache.org/viewvc/jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/UDPDiscoveryInfo.java?rev=794825&view=auto
==============================================================================
--- jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/UDPDiscoveryInfo.java (added)
+++ jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/UDPDiscoveryInfo.java Thu Jul 16 20:14:42 2009
@@ -0,0 +1,21 @@
+package org.apache.jcs.utils.discovery;
+
+import java.rmi.dgc.VMID;
+
+/**
+ * Provides info for the udp discovery service.
+ * <p>
+ * @author Aaron Smuts
+ */
+public class UDPDiscoveryInfo
+{
+    /**
+     * jvm unique identifier.
+     */
+    protected static VMID vmid = new VMID();
+
+    /**
+     * Identifies the listener, so we don't add ourselves to the list of known services.
+     */
+    public static long listenerId = vmid.hashCode();
+}

Copied: jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/UDPDiscoveryManager.java (from r781592, jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/socket/tcp/discovery/UDPDiscoveryManager.java)
URL: http://svn.apache.org/viewvc/jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/UDPDiscoveryManager.java?p2=jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/UDPDiscoveryManager.java&p1=jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/socket/tcp/discovery/UDPDiscoveryManager.java&r1=781592&r2=794825&rev=794825&view=diff
==============================================================================
--- jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/socket/tcp/discovery/UDPDiscoveryManager.java (original)
+++ jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/UDPDiscoveryManager.java Thu Jul 16 20:14:42 2009
@@ -1,4 +1,4 @@
-package org.apache.jcs.auxiliary.lateral.socket.tcp.discovery;
+package org.apache.jcs.utils.discovery;
 
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -24,9 +24,9 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.jcs.auxiliary.lateral.socket.tcp.behavior.ITCPLateralCacheAttributes;
 import org.apache.jcs.engine.behavior.ICompositeCacheManager;
 import org.apache.jcs.engine.behavior.IElementSerializer;
+import org.apache.jcs.engine.behavior.IShutdownObservable;
 import org.apache.jcs.engine.logging.behavior.ICacheEventLogger;
 
 /**
@@ -63,31 +63,10 @@
     }
 
     /**
-     * Returns the UDP Discovery service associated with this instance.
-     * <p>
-     * @param lca ITCPLateralCacheAttributes
-     * @param cacheMgr
-     * @param cacheEventLogger
-     * @param elementSerializer
-     * @return instance for this address
-     */
-    public synchronized UDPDiscoveryService getService( ITCPLateralCacheAttributes lca,
-                                                        ICompositeCacheManager cacheMgr,
-                                                        ICacheEventLogger cacheEventLogger,
-                                                        IElementSerializer elementSerializer )
-    {
-        UDPDiscoveryService service = getService( lca.getUdpDiscoveryAddr(), lca.getUdpDiscoveryPort(), lca
-            .getTcpListenerPort(), cacheMgr, cacheEventLogger, elementSerializer );
-
-        // TODO find a way to remote these attributes from the service, the manager needs it on disocvery.
-        service.setTcpLateralCacheAttributes( lca );
-        return service;
-    }
-
-    /**
      * Creates a service for the address and port if one doesn't exist already.
      * <p>
-     * TODO we may need to key this using the listener port too
+     * We need to key this using the listener port too. TODO think of making one discovery service
+     * work for mutliple types of clients.
      * <p>
      * @param discoveryAddress
      * @param discoveryPort
@@ -97,23 +76,29 @@
      * @param elementSerializer
      * @return UDPDiscoveryService
      */
-    private synchronized UDPDiscoveryService getService( String discoveryAddress, int discoveryPort, int servicePort,
-                                                         ICompositeCacheManager cacheMgr,
-                                                         ICacheEventLogger cacheEventLogger,
-                                                         IElementSerializer elementSerializer )
+    public synchronized UDPDiscoveryService getService( String discoveryAddress, int discoveryPort, int servicePort,
+                                                        ICompositeCacheManager cacheMgr,
+                                                        ICacheEventLogger cacheEventLogger,
+                                                        IElementSerializer elementSerializer )
     {
-        String key = discoveryAddress + ":" + discoveryPort;
+        String key = discoveryAddress + ":" + discoveryPort + ":" + servicePort;
 
         UDPDiscoveryService service = (UDPDiscoveryService) services.get( key );
         if ( service == null )
         {
             if ( log.isInfoEnabled() )
             {
-                log.info( "Creating service for address:port [" + key + "]" );
+                log.info( "Creating service for address:port:servicePort [" + key + "]" );
             }
 
-            service = new UDPDiscoveryService( discoveryAddress, discoveryPort, servicePort, cacheMgr,
-                                               cacheEventLogger, elementSerializer );
+            UDPDiscoveryAttributes attributes = new UDPDiscoveryAttributes();
+            attributes.setUdpDiscoveryAddr( discoveryAddress );
+            attributes.setUdpDiscoveryPort( discoveryPort );
+            attributes.setServicePort( servicePort );
+
+            service = new UDPDiscoveryService( attributes, cacheEventLogger );
+            ( (IShutdownObservable) cacheMgr ).registerShutdownObserver( service );
+
             services.put( key, service );
         }
 

Propchange: jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/UDPDiscoveryManager.java
------------------------------------------------------------------------------
    cvs2svn:cvs-rev = 1.2

Propchange: jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/UDPDiscoveryManager.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/UDPDiscoveryManager.java
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Revision

Propchange: jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/UDPDiscoveryManager.java
------------------------------------------------------------------------------
    svn:mergeinfo = 

Copied: jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/UDPDiscoveryMessage.java (from r781592, jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/socket/tcp/discovery/UDPDiscoveryMessage.java)
URL: http://svn.apache.org/viewvc/jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/UDPDiscoveryMessage.java?p2=jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/UDPDiscoveryMessage.java&p1=jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/socket/tcp/discovery/UDPDiscoveryMessage.java&r1=781592&r2=794825&rev=794825&view=diff
==============================================================================
--- jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/socket/tcp/discovery/UDPDiscoveryMessage.java (original)
+++ jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/UDPDiscoveryMessage.java Thu Jul 16 20:14:42 2009
@@ -1,4 +1,4 @@
-package org.apache.jcs.auxiliary.lateral.socket.tcp.discovery;
+package org.apache.jcs.utils.discovery;
 
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -30,7 +30,7 @@
     implements Serializable
 {
     /** Don't change */
-    private static final long serialVersionUID = -5332377899560951794L;
+    private static final long serialVersionUID = -5332377899560951793L;
 
     /**
      * This is the periodic broadcast of a servers location. This type of message is also sent in
@@ -42,6 +42,11 @@
      * This asks recipients to broadcast their location. This is used on startup.
      */
     public static final int REQUEST_BROADCAST = 1;
+    
+    /**
+     * This message instructs the receiver to remove this service from its list.
+     */
+    public static final int REMOVE_BROADCAST = 2;    
 
     /** The message type */
     private int messageType = PASSIVE_BROADCAST;
@@ -57,7 +62,7 @@
 
     /** Names of regions */
     private ArrayList cacheNames = new ArrayList();
-
+    
     /**
      * @param port The port to set.
      */
@@ -137,7 +142,7 @@
     {
         return cacheNames;
     }
-
+    
     /**
      * @return debugging string
      */
@@ -148,7 +153,6 @@
         buf.append( "\n port = [" + port + "]" );
         buf.append( "\n requesterId = [" + requesterId + "]" );
         buf.append( "\n messageType = [" + messageType + "]" );
-
         buf.append( "\n Cache Names" );
         Iterator it = cacheNames.iterator();
         while ( it.hasNext() )

Propchange: jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/UDPDiscoveryMessage.java
------------------------------------------------------------------------------
    cvs2svn:cvs-rev = 1.2

Propchange: jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/UDPDiscoveryMessage.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/UDPDiscoveryMessage.java
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Revision

Propchange: jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/UDPDiscoveryMessage.java
------------------------------------------------------------------------------
    svn:mergeinfo = 

Copied: jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/UDPDiscoveryReceiver.java (from r792552, jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/socket/tcp/discovery/UDPDiscoveryReceiver.java)
URL: http://svn.apache.org/viewvc/jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/UDPDiscoveryReceiver.java?p2=jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/UDPDiscoveryReceiver.java&p1=jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/socket/tcp/discovery/UDPDiscoveryReceiver.java&r1=792552&r2=794825&rev=794825&view=diff
==============================================================================
--- jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/socket/tcp/discovery/UDPDiscoveryReceiver.java (original)
+++ jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/UDPDiscoveryReceiver.java Thu Jul 16 20:14:42 2009
@@ -1,4 +1,4 @@
-package org.apache.jcs.auxiliary.lateral.socket.tcp.discovery;
+package org.apache.jcs.utils.discovery;
 
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -25,22 +25,11 @@
 import java.net.DatagramPacket;
 import java.net.InetAddress;
 import java.net.MulticastSocket;
-import java.util.ArrayList;
-import java.util.Iterator;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.jcs.auxiliary.lateral.LateralCacheAttributes;
 import org.apache.jcs.auxiliary.lateral.LateralCacheInfo;
-import org.apache.jcs.auxiliary.lateral.LateralCacheNoWait;
-import org.apache.jcs.auxiliary.lateral.socket.tcp.LateralTCPCacheManager;
-import org.apache.jcs.auxiliary.lateral.socket.tcp.TCPLateralCacheAttributes;
-import org.apache.jcs.auxiliary.lateral.socket.tcp.behavior.ITCPLateralCacheAttributes;
-import org.apache.jcs.engine.behavior.ICache;
-import org.apache.jcs.engine.behavior.ICompositeCacheManager;
-import org.apache.jcs.engine.behavior.IElementSerializer;
 import org.apache.jcs.engine.behavior.IShutdownObserver;
-import org.apache.jcs.engine.logging.behavior.ICacheEventLogger;
 
 import EDU.oswego.cs.dl.util.concurrent.BoundedBuffer;
 import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
@@ -54,16 +43,16 @@
     private final static Log log = LogFactory.getLog( UDPDiscoveryReceiver.class );
 
     /** buffer */
-    private final byte[] m_buffer = new byte[65536];
+    private final byte[] mBuffer = new byte[65536];
 
     /** The socket used for communication. */
-    private MulticastSocket m_socket;
+    private MulticastSocket mSocket;
 
     /**
      * TODO: Consider using the threadpool manager to get this thread pool. For now place a tight
      * restriction on the pool size
      */
-    private static final int maxPoolSize = 10;
+    private static final int maxPoolSize = 2;
 
     /** The processor */
     private PooledExecutor pooledExecutor = null;
@@ -80,18 +69,9 @@
     /** The port */
     private int multicastPort = 0;
 
-    /** The cache manager. */
-    private ICompositeCacheManager cacheMgr;
-
     /** Is it shutdown. */
     private boolean shutdown = false;
 
-    /** The event logger. */
-    protected ICacheEventLogger cacheEventLogger;
-
-    /** The serializer. */
-    protected IElementSerializer elementSerializer;
-
     /**
      * Constructor for the LateralUDPReceiver object.
      * <p>
@@ -100,32 +80,24 @@
      * @param service
      * @param multicastAddressString
      * @param multicastPort
-     * @param cacheMgr
-     * @param cacheEventLogger
-     * @param elementSerializer
      * @exception IOException
      */
-    public UDPDiscoveryReceiver( UDPDiscoveryService service, String multicastAddressString, int multicastPort,
-                                 ICompositeCacheManager cacheMgr, ICacheEventLogger cacheEventLogger,
-                                 IElementSerializer elementSerializer )
+    public UDPDiscoveryReceiver( UDPDiscoveryService service, String multicastAddressString, int multicastPort )
         throws IOException
     {
         this.service = service;
         this.multicastAddressString = multicastAddressString;
         this.multicastPort = multicastPort;
-        this.cacheMgr = cacheMgr;
-        this.cacheEventLogger = cacheEventLogger;
-        this.elementSerializer = elementSerializer;
 
-        // create a small thread pool to handle a barage
+        // create a small thread pool to handle a barrage
         pooledExecutor = new PooledExecutor( new BoundedBuffer( 100 ), maxPoolSize );
         pooledExecutor.discardOldestWhenBlocked();
-        //pooledExecutor.setMinimumPoolSize(1);
+        pooledExecutor.setMinimumPoolSize(1);
         pooledExecutor.setThreadFactory( new MyThreadFactory() );
 
         if ( log.isInfoEnabled() )
         {
-            log.info( "constructing listener, [" + this.multicastAddressString + ":" + this.multicastPort + "]" );
+            log.info( "Constructing listener, [" + this.multicastAddressString + ":" + this.multicastPort + "]" );
         }
 
         try
@@ -134,7 +106,7 @@
         }
         catch ( IOException ioe )
         {
-            // consider eatign this so we can go on, or constructing the socket
+            // consider eating this so we can go on, or constructing the socket
             // later
             throw ioe;
         }
@@ -142,6 +114,7 @@
 
     /**
      * Creates the socket for this class.
+     * <p>
      * @param multicastAddressString
      * @param multicastPort
      * @throws IOException
@@ -151,8 +124,8 @@
     {
         try
         {
-            m_socket = new MulticastSocket( multicastPort );
-            m_socket.joinGroup( InetAddress.getByName( multicastAddressString ) );
+            mSocket = new MulticastSocket( multicastPort );
+            mSocket.joinGroup( InetAddress.getByName( multicastAddressString ) );
         }
         catch ( IOException e )
         {
@@ -162,27 +135,37 @@
     }
 
     /**
-     * Highly unreliable. If it is processing one message while another comes in , the second
-     * message is lost. This is for low concurency peppering.
+     * Highly unreliable. If it is processing one message while another comes in, the second
+     * message is lost. This is for low concurrency peppering.
+     * <p>
      * @return the object message
      * @throws IOException
      */
     public Object waitForMessage()
         throws IOException
     {
-        final DatagramPacket packet = new DatagramPacket( m_buffer, m_buffer.length );
+        final DatagramPacket packet = new DatagramPacket( mBuffer, mBuffer.length );
 
         Object obj = null;
         try
         {
-            m_socket.receive( packet );
+            mSocket.receive( packet );
 
-            final ByteArrayInputStream byteStream = new ByteArrayInputStream( m_buffer, 0, packet.getLength() );
+            if ( log.isDebugEnabled() )
+            {
+                log.debug( "Received packet from address [" + packet.getSocketAddress() + "]" );
+            }
+            
+            final ByteArrayInputStream byteStream = new ByteArrayInputStream( mBuffer, 0, packet.getLength() );
 
             final ObjectInputStream objectStream = new ObjectInputStream( byteStream );
 
             obj = objectStream.readObject();
-
+            
+            if ( log.isDebugEnabled() )
+            {
+                log.debug( "Read object from address [" + packet.getSocketAddress() + "], object=[" + obj + "]" );
+            }            
         }
         catch ( Exception e )
         {
@@ -275,7 +258,7 @@
     public class MessageHandler
         implements Runnable
     {
-        /** The message to handle.  Passed in during construction. */
+        /** The message to handle. Passed in during construction. */
         private UDPDiscoveryMessage message = null;
 
         /**
@@ -296,92 +279,64 @@
             {
                 if ( log.isDebugEnabled() )
                 {
-                    log.debug( "from self" );
+                    log.debug( "Ignoring message sent from self" );
                 }
             }
             else
             {
                 if ( log.isDebugEnabled() )
                 {
-                    log.debug( "from another" );
+                    log.debug( "Process message sent from another" );
                     log.debug( "Message = " + message );
                 }
 
-                // if this is a request message, have the service handle it and
-                // return
-                if ( message.getMessageType() == UDPDiscoveryMessage.REQUEST_BROADCAST )
+                if ( message.getHost() == null || message.getCacheNames() == null || message.getCacheNames().isEmpty() )
                 {
                     if ( log.isDebugEnabled() )
                     {
-                        log.debug( "Message is a Request Broadcase, will have the service handle it." );
+                        log.debug( "Ignoring invalid message: " + message );
                     }
-                    service.serviceRequestBroadcast();
-                    return;
                 }
-
-                try
+                else
                 {
-                    // get a cache and add it to the no waits
-                    // the add method should not add the same.
-                    // we need the listener port from the original config.
-                    ITCPLateralCacheAttributes lca = null;
-                    if ( service.getTcpLateralCacheAttributes() != null )
-                    {
-                        lca = (ITCPLateralCacheAttributes) service.getTcpLateralCacheAttributes().copy();
-                    }
-                    else
-                    {
-                        lca = new TCPLateralCacheAttributes();
-                    }
-                    lca.setTransmissionType( LateralCacheAttributes.TCP );
-                    lca.setTcpServer( message.getHost() + ":" + message.getPort() );
-                    LateralTCPCacheManager lcm = LateralTCPCacheManager.getInstance( lca, cacheMgr, cacheEventLogger,
-                                                                                     elementSerializer );
-
-                    ArrayList regions = message.getCacheNames();
-                    if ( regions != null )
-                    {
-                        // for each region get the cache
-                        Iterator it = regions.iterator();
-                        while ( it.hasNext() )
-                        {
-                            String cacheName = (String) it.next();
+                    processMessage();
+                }
+            }
+        }
 
-                            try
-                            {
-                                ICache ic = lcm.getCache( cacheName );
-
-                                if ( log.isDebugEnabled() )
-                                {
-                                    log.debug( "Got cache, ic = " + ic );
-                                }
-
-                                // add this to the nowaits for this cachename
-                                if ( ic != null )
-                                {
-                                    service.addNoWait( (LateralCacheNoWait) ic );
-                                    if ( log.isDebugEnabled() )
-                                    {
-                                        log.debug( "Called addNoWait for cacheName " + cacheName );
-                                    }
-                                }
-                            }
-                            catch ( Exception e )
-                            {
-                                log.error( "Problem creating no wait", e );
-                            }
-                        }
-                        // end while
-                    }
-                    else
-                    {
-                        log.warn( "No cache names found in message " + message );
-                    }
+        /**
+         * Process the incoming message.
+         */
+        private void processMessage()
+        {
+            DiscoveredService discoveredService = new DiscoveredService();
+            discoveredService.setServiceAddress( message.getHost() );
+            discoveredService.setCacheNames( message.getCacheNames() );
+            discoveredService.setServicePort( message.getPort() );
+            discoveredService.setLastHearFromTime( System.currentTimeMillis() );
+
+            // if this is a request message, have the service handle it and
+            // return
+            if ( message.getMessageType() == UDPDiscoveryMessage.REQUEST_BROADCAST )
+            {
+                if ( log.isDebugEnabled() )
+                {
+                    log.debug( "Message is a Request Broadcase, will have the service handle it." );
                 }
-                catch ( Exception e )
+                service.serviceRequestBroadcast();
+                return;
+            }
+            else if ( message.getMessageType() == UDPDiscoveryMessage.REMOVE_BROADCAST )
+            {
+                if ( log.isDebugEnabled() )
                 {
-                    log.error( "Problem getting lateral maanger", e );
+                    log.debug( "Removing service from set " + discoveredService );
                 }
+                service.removeDiscoveredService( discoveredService );
+            }
+            else
+            {
+                service.addOrUpdateService( discoveredService );
             }
         }
     }
@@ -415,7 +370,7 @@
         try
         {
             shutdown = true;
-            m_socket.close();
+            mSocket.close();
             pooledExecutor.shutdownNow();
         }
         catch ( Exception e )
@@ -424,4 +379,3 @@
         }
     }
 }
-// end class

Propchange: jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/UDPDiscoveryReceiver.java
------------------------------------------------------------------------------
    cvs2svn:cvs-rev = 1.3

Propchange: jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/UDPDiscoveryReceiver.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/UDPDiscoveryReceiver.java
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Revision

Propchange: jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/UDPDiscoveryReceiver.java
------------------------------------------------------------------------------
    svn:mergeinfo = 

Copied: jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/UDPDiscoverySender.java (from r781592, jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/socket/tcp/discovery/UDPDiscoverySender.java)
URL: http://svn.apache.org/viewvc/jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/UDPDiscoverySender.java?p2=jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/UDPDiscoverySender.java&p1=jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/socket/tcp/discovery/UDPDiscoverySender.java&r1=781592&r2=794825&rev=794825&view=diff
==============================================================================
--- jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/socket/tcp/discovery/UDPDiscoverySender.java (original)
+++ jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/UDPDiscoverySender.java Thu Jul 16 20:14:42 2009
@@ -1,4 +1,4 @@
-package org.apache.jcs.auxiliary.lateral.socket.tcp.discovery;
+package org.apache.jcs.utils.discovery;
 
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -42,13 +42,13 @@
     private final static Log log = LogFactory.getLog( UDPDiscoverySender.class );
 
     /** The socket */
-    private MulticastSocket m_localSocket;
+    private MulticastSocket localSocket;
 
     /** The address */
-    private InetAddress m_multicastAddress;
+    private InetAddress multicastAddress;
 
     /** The port */
-    private int m_multicastPort;
+    private int multicastPort;
 
     /**
      * Constructor for the UDPDiscoverySender object
@@ -56,6 +56,7 @@
      * This sender can be used to send multiple messages.
      * <p>
      * When you are done sending, you should destroy the socket sender.
+     * <p>
      * @param host
      * @param port
      * @exception IOException
@@ -65,10 +66,14 @@
     {
         try
         {
-            m_localSocket = new MulticastSocket();
+            if ( log.isInfoEnabled() )
+            {
+                log.info( "Constructing socket for sender." );
+            }            
+            localSocket = new MulticastSocket();
 
             // Remote address.
-            m_multicastAddress = InetAddress.getByName( host );
+            multicastAddress = InetAddress.getByName( host );
         }
         catch ( IOException e )
         {
@@ -77,7 +82,7 @@
             throw e;
         }
 
-        m_multicastPort = port;
+        multicastPort = port;
     }
 
     /**
@@ -87,11 +92,9 @@
     {
         try
         {
-            // TODO when we move to jdk 1.4 reinstate the isClosed check
-            if ( this.m_localSocket != null )
-            // && !this.m_localSocket.isClosed() )
+            if ( this.localSocket != null && !this.localSocket.isClosed() )
             {
-                this.m_localSocket.close();
+                this.localSocket.close();
             }
         }
         catch ( Exception e )
@@ -102,6 +105,7 @@
 
     /**
      * Just being careful about closing the socket.
+     * <p>
      * @throws Throwable
      */
     public void finalize()
@@ -113,27 +117,27 @@
 
     /**
      * Send messages.
+     * <p>
      * @param message
      * @throws IOException
      */
     public void send( UDPDiscoveryMessage message )
         throws IOException
     {
-        if ( this.m_localSocket == null )
+        if ( this.localSocket == null )
         {
             throw new IOException( "Socket is null, cannot send message." );
         }
 
-        // TODO when we move to jdk 1.4 reinstate the isClosed check
-        // if (this.m_localSocket.isClosed() )
-        // {
-        // throw new IOException( "Socket is closed, cannot send message." );
-        // }
+        if ( this.localSocket.isClosed() )
+        {
+            throw new IOException( "Socket is closed, cannot send message." );
+        }
 
         if ( log.isDebugEnabled() )
         {
-            log.debug( "sending UDPDiscoveryMessage, message = " + message );
-        }
+            log.debug( "sending UDPDiscoveryMessage, address [" + multicastAddress + "], port [" + multicastPort
+                       + "], message = " + message );        }
 
         try
         {
@@ -145,9 +149,14 @@
             final byte[] bytes = byteStream.getBytes();
 
             // put the byte array in a packet
-            final DatagramPacket packet = new DatagramPacket( bytes, bytes.length, m_multicastAddress, m_multicastPort );
+            final DatagramPacket packet = new DatagramPacket( bytes, bytes.length, multicastAddress, multicastPort );
 
-            m_localSocket.send( packet );
+            if ( log.isDebugEnabled() )
+            {
+                log.debug( "Sending DatagramPacket. bytes.length [" + bytes.length + "] to " + multicastAddress + ":" + multicastPort  );
+            }
+            
+            localSocket.send( packet );
         }
         catch ( IOException e )
         {
@@ -159,6 +168,7 @@
     /**
      * Ask other to broadcast their info the the multicast address. If a lateral is non receiving it
      * can use this. This is also called on startup so we can get info.
+     * <p>
      * @throws IOException
      */
     public void requestBroadcast()
@@ -170,13 +180,13 @@
         }
 
         UDPDiscoveryMessage message = new UDPDiscoveryMessage();
-        message.setRequesterId( LateralCacheInfo.listenerId );
+        message.setRequesterId( UDPDiscoveryInfo.listenerId );
         message.setMessageType( UDPDiscoveryMessage.REQUEST_BROADCAST );
         send( message );
     }
 
     /**
-     * This sends a message braodcasting our that the host and port is available for connections.
+     * This sends a message broadcasting our that the host and port is available for connections.
      * <p>
      * It uses the vmid as the requesterDI
      * @param host
@@ -192,9 +202,10 @@
 
     /**
      * This allows you to set the sender id. This is mainly for testing.
+     * <p>
      * @param host
      * @param port
-     * @param cacheNames
+     * @param cacheNames names of the cache regions
      * @param listenerId
      * @throws IOException
      */
@@ -214,10 +225,53 @@
         message.setMessageType( UDPDiscoveryMessage.PASSIVE_BROADCAST );
         send( message );
     }
+
+    /**
+     * This sends a message broadcasting our that the host and port is no longer available.
+     * <p>
+     * It uses the vmid as the requesterID
+     * <p>
+     * @param host host
+     * @param port port
+     * @param cacheNames names of the cache regions
+     * @throws IOException on error
+     */
+    public void removeBroadcast( String host, int port, ArrayList cacheNames )
+        throws IOException
+    {
+        removeBroadcast( host, port, cacheNames, LateralCacheInfo.listenerId );
+    }
+
+    /**
+     * This allows you to set the sender id. This is mainly for testing.
+     * <p>
+     * @param host host
+     * @param port port
+     * @param cacheNames names of the cache regions
+     * @param listenerId listener ID
+     * @throws IOException on error
+     */
+    protected void removeBroadcast( String host, int port, ArrayList cacheNames, long listenerId )
+        throws IOException
+    {
+        if ( log.isDebugEnabled() )
+        {
+            log.debug( "sending removeBroadcast " );
+        }
+
+        UDPDiscoveryMessage message = new UDPDiscoveryMessage();
+        message.setHost( host );
+        message.setPort( port );
+        message.setCacheNames( cacheNames );
+        message.setRequesterId( listenerId );
+        message.setMessageType( UDPDiscoveryMessage.REMOVE_BROADCAST );
+        send( message );
+    }
 }
 
 /**
  * This allows us to get the byte array from an output stream.
+ * <p>
  * @author asmuts
  * @created January 15, 2002
  */
@@ -234,4 +288,3 @@
         return buf;
     }
 }
-// end class

Propchange: jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/UDPDiscoverySender.java
------------------------------------------------------------------------------
    cvs2svn:cvs-rev = 1.3

Propchange: jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/UDPDiscoverySender.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/UDPDiscoverySender.java
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Revision

Propchange: jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/UDPDiscoverySender.java
------------------------------------------------------------------------------
    svn:mergeinfo = 

Copied: jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/UDPDiscoverySenderThread.java (from r781592, jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/socket/tcp/discovery/UDPDiscoverySenderThread.java)
URL: http://svn.apache.org/viewvc/jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/UDPDiscoverySenderThread.java?p2=jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/UDPDiscoverySenderThread.java&p1=jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/socket/tcp/discovery/UDPDiscoverySenderThread.java&r1=781592&r2=794825&rev=794825&view=diff
==============================================================================
--- jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/socket/tcp/discovery/UDPDiscoverySenderThread.java (original)
+++ jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/UDPDiscoverySenderThread.java Thu Jul 16 20:14:42 2009
@@ -1,4 +1,4 @@
-package org.apache.jcs.auxiliary.lateral.socket.tcp.discovery;
+package org.apache.jcs.utils.discovery;
 
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
@@ -33,17 +33,10 @@
     /** The logger. */
     private final static Log log = LogFactory.getLog( UDPDiscoverySenderThread.class );
 
-    /** the UDP multicast port */
-    private String discoveryAddress = "";
-
-    /** The port */
-    private int discoveryPort = 0;
-
-    /** the host and port we listen on for TCP socket connections */
-    private String myHostName = null;
-
-    /** The udp port */
-    private int myPort = 0;
+    /**
+     * details of the host, port, and service being advertised to listen for TCP socket connections
+     */
+    private UDPDiscoveryAttributes attributes;
 
     /** List of known regions. */
     private ArrayList cacheNames = new ArrayList();
@@ -72,34 +65,27 @@
      * Constructs the sender with the port to tell others to connect to.
      * <p>
      * On construction the sender will request that the other caches let it know their addresses.
-     * @param discoveryAddress host to broadcast to
-     * @param discoveryPort port to broadcast to
-     * @param myHostName host name we can be found at
-     * @param myPort port we are listening on
-     * @param cacheNames List of strings of the names of the regiond participating.
+     * @param attributes host, port, etc.
+     * @param cacheNames List of strings of the names of the region participating.
      */
-    public UDPDiscoverySenderThread( String discoveryAddress, int discoveryPort, String myHostName, int myPort,
-                                     ArrayList cacheNames )
+    public UDPDiscoverySenderThread( UDPDiscoveryAttributes attributes, ArrayList cacheNames )
     {
-        this.discoveryAddress = discoveryAddress;
-        this.discoveryPort = discoveryPort;
-
-        this.myHostName = myHostName;
-        this.myPort = myPort;
+        this.attributes = attributes;
 
         this.cacheNames = cacheNames;
 
         if ( log.isDebugEnabled() )
         {
-            log.debug( "Creating sender thread for discoveryAddress = [" + discoveryAddress + "] and discoveryPort = ["
-                + discoveryPort + "] myHostName = [" + myHostName + "] and port = [" + myPort + "]" );
+            log.debug( "Creating sender thread for discoveryAddress = [" + attributes.getUdpDiscoveryAddr()
+                + "] and discoveryPort = [" + attributes.getUdpDiscoveryPort() + "] myHostName = ["
+                + attributes.getServiceAddress() + "] and port = [" + attributes.getServicePort() + "]" );
         }
 
         UDPDiscoverySender sender = null;
         try
         {
             // move this to the run method and determine how often to call it.
-            sender = new UDPDiscoverySender( discoveryAddress, discoveryPort );
+            sender = new UDPDiscoverySender( attributes.getUdpDiscoveryAddr(), attributes.getUdpDiscoveryPort() );
             sender.requestBroadcast();
 
             if ( log.isDebugEnabled() )
@@ -137,9 +123,9 @@
         {
             // create this connection each time.
             // more robust
-            sender = new UDPDiscoverySender( discoveryAddress, discoveryPort );
+            sender = new UDPDiscoverySender( attributes.getUdpDiscoveryAddr(), attributes.getUdpDiscoveryPort() );
 
-            sender.passiveBroadcast( myHostName, myPort, cacheNames );
+            sender.passiveBroadcast( attributes.getServiceAddress(), attributes.getServicePort(), cacheNames );
 
             // todo we should consider sending a request broadcast every so
             // often.
@@ -152,7 +138,8 @@
         }
         catch ( Exception e )
         {
-            log.error( "Problem calling the UDP Discovery Sender [" + discoveryAddress + ":" + discoveryPort + "]", e );
+            log.error( "Problem calling the UDP Discovery Sender [" + attributes.getUdpDiscoveryAddr() + ":"
+                + attributes.getUdpDiscoveryPort() + "]", e );
         }
         finally
         {
@@ -166,4 +153,43 @@
             }
         }
     }
+
+    /**
+     * Issues a remove broadcast to the others.
+     */
+    protected void shutdown()
+    {
+        UDPDiscoverySender sender = null;
+        try
+        {
+            // create this connection each time.
+            // more robust
+            sender = new UDPDiscoverySender( attributes.getUdpDiscoveryAddr(), attributes.getUdpDiscoveryPort() );
+
+            sender.removeBroadcast( attributes.getServiceAddress(), attributes.getServicePort(), cacheNames );
+
+            if ( log.isDebugEnabled() )
+            {
+                log.debug( "Called sender to issue a remove broadcast in shudown." );
+            }
+        }
+        catch ( Exception e )
+        {
+            log.error( "Problem calling the UDP Discovery Sender", e );
+        }
+        finally
+        {
+            try
+            {
+                if ( sender != null )
+                {
+                    sender.destroy();
+                }
+            }
+            catch ( Exception e )
+            {
+                log.error( "Problem closing Remote Broadcast sender", e );
+            }
+        }
+    }
 }

Propchange: jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/UDPDiscoverySenderThread.java
------------------------------------------------------------------------------
    cvs2svn:cvs-rev = 1.2

Propchange: jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/UDPDiscoverySenderThread.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/UDPDiscoverySenderThread.java
------------------------------------------------------------------------------
    svn:keywords = Author Date Id Revision

Propchange: jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/UDPDiscoverySenderThread.java
------------------------------------------------------------------------------
    svn:mergeinfo = 



---------------------------------------------------------------------
To unsubscribe, e-mail: jcs-dev-unsubscribe@jakarta.apache.org
For additional commands, e-mail: jcs-dev-help@jakarta.apache.org