You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jcs-dev@jakarta.apache.org by as...@apache.org on 2009/07/22 23:13:39 UTC

svn commit: r796882 - in /jakarta/jcs/trunk/src: conf/ java/org/apache/jcs/auxiliary/lateral/ java/org/apache/jcs/auxiliary/lateral/socket/tcp/ java/org/apache/jcs/utils/discovery/ test/org/apache/jcs/auxiliary/lateral/

Author: asmuts
Date: Wed Jul 22 21:13:38 2009
New Revision: 796882

URL: http://svn.apache.org/viewvc?rev=796882&view=rev
Log:
Improving lateral TCP discovery.

Added:
    jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/socket/tcp/LateralTCPDiscoveryListenerManager.java
    jakarta/jcs/trunk/src/test/org/apache/jcs/auxiliary/lateral/LateralCacheNoWaitFacadeUnitTest.java
Modified:
    jakarta/jcs/trunk/src/conf/cacheTCP2.ccf
    jakarta/jcs/trunk/src/conf/cacheTCP3.ccf
    jakarta/jcs/trunk/src/conf/cacheTCP4.ccf
    jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/LateralCache.java
    jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/LateralCacheAbstractManager.java
    jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/LateralCacheNoWait.java
    jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/LateralCacheNoWaitFacade.java
    jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/socket/tcp/LateralTCPCacheFactory.java
    jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/socket/tcp/LateralTCPCacheManager.java
    jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/socket/tcp/LateralTCPDiscoveryListener.java
    jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/socket/tcp/LateralTCPListener.java
    jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/UDPDiscoveryManager.java
    jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/UDPDiscoverySender.java
    jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/UDPDiscoveryService.java

Modified: jakarta/jcs/trunk/src/conf/cacheTCP2.ccf
URL: http://svn.apache.org/viewvc/jakarta/jcs/trunk/src/conf/cacheTCP2.ccf?rev=796882&r1=796881&r2=796882&view=diff
==============================================================================
Binary files - no diff available.

Modified: jakarta/jcs/trunk/src/conf/cacheTCP3.ccf
URL: http://svn.apache.org/viewvc/jakarta/jcs/trunk/src/conf/cacheTCP3.ccf?rev=796882&r1=796881&r2=796882&view=diff
==============================================================================
Binary files - no diff available.

Modified: jakarta/jcs/trunk/src/conf/cacheTCP4.ccf
URL: http://svn.apache.org/viewvc/jakarta/jcs/trunk/src/conf/cacheTCP4.ccf?rev=796882&r1=796881&r2=796882&view=diff
==============================================================================
Binary files - no diff available.

Modified: jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/LateralCache.java
URL: http://svn.apache.org/viewvc/jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/LateralCache.java?rev=796882&r1=796881&r2=796882&view=diff
==============================================================================
--- jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/LateralCache.java (original)
+++ jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/LateralCache.java Wed Jul 22 21:13:38 2009
@@ -111,12 +111,12 @@
         catch ( NullPointerException npe )
         {
             log.error( "Failure updating lateral. lateral = " + lateral, npe );
-            handleException( npe, "Failed to put [" + ce.getKey() + "] to " + ce.getCacheName() );
+            handleException( npe, "Failed to put [" + ce.getKey() + "] to " + ce.getCacheName() + "@" + cattr );
             return;
         }
         catch ( Exception ex )
         {
-            handleException( ex, "Failed to put [" + ce.getKey() + "] to " + ce.getCacheName() );
+            handleException( ex, "Failed to put [" + ce.getKey() + "] to " + ce.getCacheName() + "@" + cattr );
         }
     }
 
@@ -143,7 +143,7 @@
         catch ( Exception e )
         {
             log.error( e );
-            handleException( e, "Failed to get [" + key + "] from " + this.cattr.getCacheName() );
+            handleException( e, "Failed to get [" + key + "] from " + cattr.getCacheName() + "@" + cattr );
         }
         return obj;
     }
@@ -170,7 +170,7 @@
         catch ( Exception e )
         {
             log.error( e );
-            handleException( e, "Failed to getMatching [" + pattern + "] from " + this.cattr.getCacheName() );
+            handleException( e, "Failed to getMatching [" + pattern + "] from " + cattr.getCacheName() + "@" + cattr );
         }
         return elements;
     }
@@ -239,7 +239,7 @@
         }
         catch ( Exception ex )
         {
-            handleException( ex, "Failed to remove " + key + " from " + this.cattr.getCacheName() );
+            handleException( ex, "Failed to remove " + key + " from " + cattr.getCacheName() + "@" + cattr );
         }
         return false;
     }
@@ -259,7 +259,7 @@
         }
         catch ( Exception ex )
         {
-            handleException( ex, "Failed to remove all from " + this.cattr.getCacheName() );
+            handleException( ex, "Failed to remove all from " + cattr.getCacheName() + "@" + cattr );
         }
     }
 
@@ -285,7 +285,7 @@
         catch ( Exception ex )
         {
             log.error( "Couldn't dispose", ex );
-            handleException( ex, "Failed to dispose " + this.cattr.getCacheName() );
+            handleException( ex, "Failed to dispose " + cattr.getCacheName() );
         }
     }
 

Modified: jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/LateralCacheAbstractManager.java
URL: http://svn.apache.org/viewvc/jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/LateralCacheAbstractManager.java?rev=796882&r1=796881&r2=796882&view=diff
==============================================================================
--- jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/LateralCacheAbstractManager.java (original)
+++ jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/LateralCacheAbstractManager.java Wed Jul 22 21:13:38 2009
@@ -33,6 +33,7 @@
 import org.apache.jcs.auxiliary.lateral.behavior.ILateralCacheObserver;
 import org.apache.jcs.auxiliary.lateral.behavior.ILateralCacheService;
 import org.apache.jcs.engine.behavior.IElementSerializer;
+import org.apache.jcs.engine.behavior.IShutdownObserver;
 import org.apache.jcs.engine.logging.behavior.ICacheEventLogger;
 
 /**
@@ -49,7 +50,7 @@
  *        Acceptable for all - cache managers or a manager within a type
  */
 public abstract class LateralCacheAbstractManager
-    implements ILateralCacheManager
+    implements ILateralCacheManager, IShutdownObserver
 {
     /** Don't change */
     private static final long serialVersionUID = -515393179178435508L;

Modified: jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/LateralCacheNoWait.java
URL: http://svn.apache.org/viewvc/jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/LateralCacheNoWait.java?rev=796882&r1=796881&r2=796882&view=diff
==============================================================================
--- jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/LateralCacheNoWait.java (original)
+++ jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/LateralCacheNoWait.java Wed Jul 22 21:13:38 2009
@@ -420,6 +420,11 @@
         se.setData( "" + this.putCount );
         elems.add( se );
 
+        se = new StatElement();
+        se.setName( "Attributes" );
+        se.setData( "" + cache.getAuxiliaryCacheAttributes() );
+        elems.add( se );
+
         // get an array and put them in the Stats object
         IStatElement[] ses = (IStatElement[]) elems.toArray( new StatElement[elems.size()] );
         stats.setStatElements( ses );

Modified: jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/LateralCacheNoWaitFacade.java
URL: http://svn.apache.org/viewvc/jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/LateralCacheNoWaitFacade.java?rev=796882&r1=796881&r2=796882&view=diff
==============================================================================
--- jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/LateralCacheNoWaitFacade.java (original)
+++ jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/LateralCacheNoWaitFacade.java Wed Jul 22 21:13:38 2009
@@ -85,10 +85,29 @@
     }
 
     /**
+     * Tells you if the no wait is in the list or not.
+     * <p>
+     * @param noWait
+     * @return true if the noWait is in the list.
+     */
+    public boolean containsNoWait( LateralCacheNoWait noWait )
+    {
+        for ( int i = 0; i < noWaits.length; i++ )
+        {
+            // we know noWait isn't null
+            if ( noWait.equals( noWaits[i] ) )
+            {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    /**
      * Adds a no wait to the list if it isn't already in the list.
      * <p>
      * @param noWait
-     * @return true if it wasn't alreay contained
+     * @return true if it wasn't already contained
      */
     public synchronized boolean addNoWait( LateralCacheNoWait noWait )
     {
@@ -97,17 +116,13 @@
             return false;
         }
 
-        for ( int i = 0; i < noWaits.length; i++ )
+        if ( containsNoWait( noWait ) )
         {
-            // we know noWait isn't null
-            if ( noWait.equals( noWaits[i] ) )
+            if ( log.isDebugEnabled() )
             {
-                if ( log.isDebugEnabled() )
-                {
-                    log.debug( "No Wait already contained, [" + noWait + "]" );
-                }
-                return false;
+                log.debug( "No Wait already contained, [" + noWait + "]" );
             }
+            return false;
         }
 
         LateralCacheNoWait[] newArray = new LateralCacheNoWait[noWaits.length + 1];
@@ -142,28 +157,27 @@
             if ( noWait.equals( noWaits[i] ) )
             {
                 position = i;
+                break;
             }
         }
-
+        
         if ( position == -1 )
         {
             return false;
         }
-        
-        LateralCacheNoWait[] newArray = new LateralCacheNoWait[noWaits.length -1];
 
-        if ( position > 0 )
+        LateralCacheNoWait[] newArray = new LateralCacheNoWait[noWaits.length - 1];
+
+        System.arraycopy( noWaits, 0, newArray, 0, position );
+        if ( noWaits.length != position )
         {
-            System.arraycopy( noWaits, 0, newArray, 0, position -1 );
+            System.arraycopy( noWaits, position + 1, newArray, position, noWaits.length - position - 1 );
         }
-        System.arraycopy( noWaits, position +1, newArray, 0, noWaits.length );
-
         noWaits = newArray;
 
         return true;
     }
 
-    
     /**
      * @param ce
      * @throws IOException

Modified: jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/socket/tcp/LateralTCPCacheFactory.java
URL: http://svn.apache.org/viewvc/jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/socket/tcp/LateralTCPCacheFactory.java?rev=796882&r1=796881&r2=796882&view=diff
==============================================================================
--- jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/socket/tcp/LateralTCPCacheFactory.java (original)
+++ jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/socket/tcp/LateralTCPCacheFactory.java Wed Jul 22 21:13:38 2009
@@ -32,8 +32,11 @@
 import org.apache.jcs.auxiliary.lateral.behavior.ILateralCacheAttributes;
 import org.apache.jcs.auxiliary.lateral.socket.tcp.behavior.ITCPLateralCacheAttributes;
 import org.apache.jcs.engine.behavior.ICache;
+import org.apache.jcs.engine.behavior.ICacheListener;
 import org.apache.jcs.engine.behavior.ICompositeCacheManager;
 import org.apache.jcs.engine.behavior.IElementSerializer;
+import org.apache.jcs.engine.behavior.IShutdownObservable;
+import org.apache.jcs.engine.behavior.IShutdownObserver;
 import org.apache.jcs.engine.logging.behavior.ICacheEventLogger;
 import org.apache.jcs.utils.discovery.UDPDiscoveryManager;
 import org.apache.jcs.utils.discovery.UDPDiscoveryService;
@@ -51,6 +54,9 @@
     /** The logger */
     private final static Log log = LogFactory.getLog( LateralTCPCacheFactory.class );
 
+    /** Non singleton manager. Used by this instance of the factory. */
+    private LateralTCPDiscoveryListenerManager lateralTCPDiscoveryListenerManager;
+
     /**
      * Creates a TCP lateral.
      * <p>
@@ -66,7 +72,7 @@
         ITCPLateralCacheAttributes lac = (ITCPLateralCacheAttributes) iaca;
         ArrayList noWaits = new ArrayList();
 
-        // pars up the tcp servers and set the tcpServer value and
+        // pairs up the tcp servers and set the tcpServer value and
         // get the manager and then get the cache
         // no servers are required.
         if ( lac.getTcpServers() != null )
@@ -126,13 +132,19 @@
         {
             if ( log.isInfoEnabled() )
             {
-                log.info( "Creating listener for " + lac );
+                log.info( "Getting listener for " + lac );
             }
 
             try
             {
                 // make a listener. if one doesn't exist
-                LateralTCPListener.getInstance( attr, cacheMgr );
+                ICacheListener listener = LateralTCPListener.getInstance( attr, cacheMgr );
+
+                // register for shutdown notification
+                if ( listener instanceof IShutdownObserver && cacheMgr instanceof IShutdownObservable )
+                {
+                    ( (IShutdownObservable) cacheMgr ).registerShutdownObserver( (IShutdownObserver) listener );
+                }
             }
             catch ( Exception e )
             {
@@ -158,22 +170,27 @@
      * @param elementSerializer
      * @return null if none is created.
      */
-    private UDPDiscoveryService createDiscoveryService( ITCPLateralCacheAttributes lac, LateralCacheNoWaitFacade lcnwf,
-                                                        ICompositeCacheManager cacheMgr,
-                                                        ICacheEventLogger cacheEventLogger,
-                                                        IElementSerializer elementSerializer )
+    private synchronized UDPDiscoveryService createDiscoveryService( ITCPLateralCacheAttributes lac,
+                                                                     LateralCacheNoWaitFacade lcnwf,
+                                                                     ICompositeCacheManager cacheMgr,
+                                                                     ICacheEventLogger cacheEventLogger,
+                                                                     IElementSerializer elementSerializer )
     {
         UDPDiscoveryService discovery = null;
 
         // create the UDP discovery for the TCP lateral
         if ( lac.isUdpDiscoveryEnabled() )
         {
-            // TODO this will create one for each region, but one can be used for all regions
-            LateralTCPDiscoveryListener discoveryListener = new LateralTCPDiscoveryListener( cacheMgr,
-                                                                                             cacheEventLogger,
-                                                                                             elementSerializer );
+            if ( lateralTCPDiscoveryListenerManager == null )
+            {
+                lateralTCPDiscoveryListenerManager = new LateralTCPDiscoveryListenerManager();
+            }
+
+            // One can be used for all regions
+            LateralTCPDiscoveryListener discoveryListener = lateralTCPDiscoveryListenerManager
+                .getDiscoveryListener( lac, cacheMgr, cacheEventLogger, elementSerializer );
 
-            discoveryListener.addNoWaitFacade( lcnwf, lac.getCacheName() );
+            discoveryListener.addNoWaitFacade( lac.getCacheName(), lcnwf );
 
             // need a factory for this so it doesn't
             // get dereferenced, also we don't want one for every region.
@@ -187,7 +204,7 @@
 
             if ( log.isInfoEnabled() )
             {
-                log.info( "Created UDPDiscoveryService for TCP lateral cache." );
+                log.info( "Registered TCP lateral cache [" + lac.getCacheName() + "] with UDPDiscoveryService." );
             }
         }
         return discovery;

Modified: jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/socket/tcp/LateralTCPCacheManager.java
URL: http://svn.apache.org/viewvc/jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/socket/tcp/LateralTCPCacheManager.java?rev=796882&r1=796881&r2=796882&view=diff
==============================================================================
--- jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/socket/tcp/LateralTCPCacheManager.java (original)
+++ jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/socket/tcp/LateralTCPCacheManager.java Wed Jul 22 21:13:38 2009
@@ -104,18 +104,19 @@
     {
         synchronized ( instances )
         {
-            LateralTCPCacheManager ins = (LateralTCPCacheManager) instances.get( lca.toString() );
+            String key = lca.getTcpServer();
+            LateralTCPCacheManager ins = (LateralTCPCacheManager) instances.get( key );
             if ( ins == null )
             {
-                log.info( "Instance for [" + lca.toString() + "] is null, creating" );
+                log.info( "Instance for [" + key + "] is null, creating" );
 
-                ins = (LateralTCPCacheManager) instances.get( lca.toString() );
+                ins = (LateralTCPCacheManager) instances.get( lca.getTcpServer() );
                 if ( ins == null )
                 {
                     ins = new LateralTCPCacheManager( lca, cacheMgr, cacheEventLogger, elementSerializer );
-                    instances.put( lca.toString(), ins );
+                    instances.put( key, ins );
                 }
-                
+
                 createMonitor( ins );
             }
             ins.clients++;
@@ -153,8 +154,8 @@
      * <p>
      * @param lcaA
      * @param cacheMgr
-     * @param cacheEventLogger 
-     * @param elementSerializer 
+     * @param cacheEventLogger
+     * @param elementSerializer
      */
     private LateralTCPCacheManager( ITCPLateralCacheAttributes lcaA, ICompositeCacheManager cacheMgr,
                                     ICacheEventLogger cacheEventLogger, IElementSerializer elementSerializer )
@@ -217,7 +218,7 @@
     {
         synchronized ( this.caches )
         {
-            this.lateralWatch.addCacheListener( cacheName, listener );
+            lateralWatch.addCacheListener( cacheName, listener );
         }
     }
 
@@ -236,9 +237,9 @@
     public AuxiliaryCache getCache( String cacheName )
     {
         LateralCacheNoWait lateralNoWait = null;
-        synchronized ( this.caches )
+        synchronized ( caches )
         {
-            lateralNoWait = (LateralCacheNoWait) this.caches.get( cacheName );
+            lateralNoWait = (LateralCacheNoWait) caches.get( cacheName );
             if ( lateralNoWait == null )
             {
                 LateralCacheAttributes attr = (LateralCacheAttributes) lca.copy();
@@ -257,22 +258,35 @@
                 lateralNoWait.setCacheEventLogger( cacheEventLogger );
                 lateralNoWait.setElementSerializer( elementSerializer );
 
-                this.caches.put( cacheName, lateralNoWait );
+                caches.put( cacheName, lateralNoWait );
 
                 if ( log.isInfoEnabled() )
                 {
-                    log.info( "Created LateralCacheNoWait for [" + this.lca + "] LateralCacheNoWait = ["
-                        + lateralNoWait + "]" );
+                    log.info( "Created LateralCacheNoWait for [" + lca + "] LateralCacheNoWait = [" + lateralNoWait
+                        + "]" );
                 }
+
+                // this used to be called every getCache. i move it in here.
+                addListenerIfNeeded( cacheName );
             }
         }
 
+        return lateralNoWait;
+    }
+
+    /**
+     * TODO see if this belongs here or in the factory.
+     * <p>
+     * @param cacheName
+     */
+    private void addListenerIfNeeded( String cacheName )
+    {
         // don't create a listener if we are not receiving.
         if ( lca.isReceive() )
         {
             try
             {
-                addLateralCacheListener( cacheName, LateralTCPListener.getInstance( this.lca, cacheMgr ) );
+                addLateralCacheListener( cacheName, LateralTCPListener.getInstance( lca, cacheMgr ) );
             }
             catch ( IOException ioe )
             {
@@ -290,9 +304,6 @@
                 log.debug( "Not creating a listener since we are not receiving." );
             }
         }
-        // TODO: need listener repair
-
-        return lateralNoWait;
     }
 
     /**
@@ -322,4 +333,21 @@
         }
         return service;
     }
+
+    /**
+     * Shutsdown the lateral.
+     */
+    public void shutdown()
+    {
+        // TODO revist this.
+        // the name here doesn't matter.
+        try
+        {
+            lateralService.dispose( "ALL" );
+        }
+        catch ( IOException e )
+        {
+            log.error( "Problem disposing of service", e );
+        }
+    }
 }

Modified: jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/socket/tcp/LateralTCPDiscoveryListener.java
URL: http://svn.apache.org/viewvc/jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/socket/tcp/LateralTCPDiscoveryListener.java?rev=796882&r1=796881&r2=796882&view=diff
==============================================================================
--- jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/socket/tcp/LateralTCPDiscoveryListener.java (original)
+++ jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/socket/tcp/LateralTCPDiscoveryListener.java Wed Jul 22 21:13:38 2009
@@ -1,6 +1,7 @@
 package org.apache.jcs.auxiliary.lateral.socket.tcp;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
@@ -33,7 +34,7 @@
      * Map of no wait facades. these are used to determine which regions are locally configured to
      * use laterals.
      */
-    private Map facades = new HashMap();
+    private Map facades = Collections.synchronizedMap( new HashMap() );
 
     /** The cache manager. */
     private ICompositeCacheManager cacheMgr;
@@ -65,11 +66,11 @@
      * This adds nowaits to a facade for the region name. If the region has no facade, then it is
      * not configured to use the lateral cache, and no facade will be created.
      * <p>
-     * @param facade
+     * @param facade - facade (for region) => multiple lateral clients.
      * @param cacheName
      * @return true if the facade was not already registered.
      */
-    public synchronized boolean addNoWaitFacade( LateralCacheNoWaitFacade facade, String cacheName )
+    public synchronized boolean addNoWaitFacade( String cacheName, LateralCacheNoWaitFacade facade )
     {
         boolean isNew = !facades.containsKey( cacheName );
 
@@ -109,8 +110,8 @@
         {
             if ( log.isInfoEnabled() )
             {
-                log.info( "Different nodes are configured differently.  Region [" + noWait.getCacheName()
-                    + "] is not configured to use the lateral cache." );
+                log.info( "addNoWait > Different nodes are configured differently or region [" + noWait.getCacheName()
+                    + "] is not yet used on this side.  " );
             }
         }
     }
@@ -131,18 +132,18 @@
 
         if ( facade != null )
         {
-            boolean isNew = facade.addNoWait( noWait );
+            boolean removed = facade.removeNoWait( noWait );
             if ( log.isDebugEnabled() )
             {
-                log.debug( "Called addNoWait, isNew = " + isNew );
+                log.debug( "Called removeNoWait, removed " + removed );
             }
         }
         else
         {
             if ( log.isInfoEnabled() )
             {
-                log.info( "Different nodes are configured differently.  Region [" + noWait.getCacheName()
-                    + "] is not configured to use the lateral cache." );
+                log.info( "removeNoWait > Different nodes are configured differently or region ["
+                    + noWait.getCacheName() + "] is not yet used on this side.  " );
             }
         }
     }
@@ -150,6 +151,17 @@
     /**
      * Creates the lateral cache if needed.
      * <p>
+     * We could go to the composite cache manager and get the the cache for the region. This would
+     * force a full configuration of the region. One advantage of this would be that the creation of
+     * the later would go through the factory, which would add the item to the no wait list. But we
+     * don't want to do this. This would force this client to have all the regions as the other.
+     * This might not be desired. We don't want to send or recieve for a region here that is either
+     * not used or not configured to use the lateral.
+     * <p>
+     * Right now, I'm afraid that the region will get puts if another instance has the region
+     * configured to use the lateral and our address is configured. This might be a bug, but it
+     * shouldn't happen with discovery.
+     * <p>
      * @param service
      */
     public void addDiscoveredService( DiscoveredService service )
@@ -202,6 +214,9 @@
     /**
      * Removes the lateral cache.
      * <p>
+     * We need to tell the manager that this instance is bad, so it will reconnect the sender if it
+     * comes back.
+     * <p>
      * @param service
      */
     public void removeDiscoveredService( DiscoveredService service )

Added: jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/socket/tcp/LateralTCPDiscoveryListenerManager.java
URL: http://svn.apache.org/viewvc/jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/socket/tcp/LateralTCPDiscoveryListenerManager.java?rev=796882&view=auto
==============================================================================
--- jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/socket/tcp/LateralTCPDiscoveryListenerManager.java (added)
+++ jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/socket/tcp/LateralTCPDiscoveryListenerManager.java Wed Jul 22 21:13:38 2009
@@ -0,0 +1,70 @@
+package org.apache.jcs.auxiliary.lateral.socket.tcp;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.jcs.auxiliary.lateral.socket.tcp.behavior.ITCPLateralCacheAttributes;
+import org.apache.jcs.engine.behavior.ICompositeCacheManager;
+import org.apache.jcs.engine.behavior.IElementSerializer;
+import org.apache.jcs.engine.logging.behavior.ICacheEventLogger;
+
+/**
+ * The factory holds an instance of this maanger. This manager has a map of listeners, keyed to the
+ * discovery configuration. I'm not using a static map, because I'm trying to make JCS
+ * multi-instance.
+ * <p>
+ * During configuration, the factory is only created once per auxiliary definition. Two different
+ * laterals canot use the same discovery service. We will likey wantt o change this.
+ */
+public class LateralTCPDiscoveryListenerManager
+{
+    /** Map of available instances, keyed by port. Note, this is not static. */
+    protected Map instances = Collections.synchronizedMap( new HashMap() );
+
+    /** The logger */
+    private final static Log log = LogFactory.getLog( LateralTCPDiscoveryListenerManager.class );
+
+    /** Does nothing. */
+    public LateralTCPDiscoveryListenerManager()
+    {
+        if ( log.isInfoEnabled() )
+        {
+            log.info( "Creating new LateralTCPDiscoveryListenerManager" );
+        }
+    }
+
+    /**
+     * Gets the instance attribute of the LateralCacheTCPListener class.
+     * <p>
+     * @param ilca ITCPLateralCacheAttributes
+     * @param cacheMgr
+     * @param cacheEventLogger
+     * @param elementSerializer
+     * @return The instance value
+     */
+    public synchronized LateralTCPDiscoveryListener getDiscoveryListener( ITCPLateralCacheAttributes ilca,
+                                                                          ICompositeCacheManager cacheMgr,
+                                                                          ICacheEventLogger cacheEventLogger,
+                                                                          IElementSerializer elementSerializer )
+    {
+        String key = ilca.getUdpDiscoveryAddr() + ":" + ilca.getUdpDiscoveryPort();
+        LateralTCPDiscoveryListener ins = (LateralTCPDiscoveryListener) instances.get( key );
+
+        if ( ins == null )
+        {
+            ins = new LateralTCPDiscoveryListener( cacheMgr, cacheEventLogger, elementSerializer );
+
+            instances.put( String.valueOf( ilca.getTcpListenerPort() ), ins );
+
+            if ( log.isDebugEnabled() )
+            {
+                log.debug( "created new listener " + ilca.getTcpListenerPort() );
+            }
+        }
+
+        return ins;
+    }
+}

Modified: jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/socket/tcp/LateralTCPListener.java
URL: http://svn.apache.org/viewvc/jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/socket/tcp/LateralTCPListener.java?rev=796882&r1=796881&r2=796882&view=diff
==============================================================================
--- jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/socket/tcp/LateralTCPListener.java (original)
+++ jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/socket/tcp/LateralTCPListener.java Wed Jul 22 21:13:38 2009
@@ -37,6 +37,7 @@
 import org.apache.jcs.auxiliary.lateral.socket.tcp.behavior.ITCPLateralCacheAttributes;
 import org.apache.jcs.engine.behavior.ICacheElement;
 import org.apache.jcs.engine.behavior.ICompositeCacheManager;
+import org.apache.jcs.engine.behavior.IShutdownObserver;
 import org.apache.jcs.engine.control.CompositeCache;
 import org.apache.jcs.engine.control.CompositeCacheManager;
 
@@ -49,7 +50,7 @@
  * passed to a pooled executor which then calls the appropriate handle method.
  */
 public class LateralTCPListener
-    implements ILateralCacheListener, Serializable
+    implements ILateralCacheListener, Serializable, IShutdownObserver
 {
     /** Don't change. */
     private static final long serialVersionUID = -9107062664967131738L;
@@ -93,6 +94,9 @@
      */
     private long listenerId = LateralCacheInfo.listenerId;
 
+    /** is this shut down? */
+    private boolean shutdown = false;
+
     /**
      * Gets the instance attribute of the LateralCacheTCPListener class.
      * <p>
@@ -115,9 +119,9 @@
 
             instances.put( String.valueOf( ilca.getTcpListenerPort() ), ins );
 
-            if ( log.isDebugEnabled() )
+            if ( log.isInfoEnabled() )
             {
-                log.debug( "created new listener " + ilca.getTcpListenerPort() );
+                log.info( "Created new listener " + ilca.getTcpListenerPort() );
             }
         }
 
@@ -322,7 +326,7 @@
 
         return getCache( cacheName ).localGetMatching( pattern );
     }
-    
+
     /**
      * Right now this does nothing.
      * <p>
@@ -333,12 +337,10 @@
     {
         if ( log.isInfoEnabled() )
         {
-            log.info( "handleDispose > cacheName=" + cacheName );
+            log.info( "handleDispose > cacheName=" + cacheName + " | Ignoring message.  Do not dispose from remote." );
         }
 
         // TODO handle active deregistration, rather than passive detection
-        // through error
-        // getCacheManager().freeCache( cacheName, true );
     }
 
     /**
@@ -443,7 +445,7 @@
 
                 ConnectionHandler handler;
 
-                while ( true )
+                while ( !shutdown )
                 {
                     if ( log.isDebugEnabled() )
                     {
@@ -538,11 +540,11 @@
             }
             catch ( java.io.EOFException e )
             {
-                log.info( "Caught java.io.EOFException closing connection." );
+                log.info( "Caught java.io.EOFException closing connection." + e.getMessage() );
             }
             catch ( java.net.SocketException e )
             {
-                log.info( "Caught java.net.SocketException closing connection." );
+                log.info( "Caught java.net.SocketException closing connection." + e.getMessage() );
             }
             catch ( Exception e )
             {
@@ -628,7 +630,7 @@
             }
             else if ( led.command == LateralElementDescriptor.GET_MATCHING )
             {
-                Map obj = handleGetMatching( cacheName, (String)key );
+                Map obj = handleGetMatching( cacheName, (String) key );
 
                 ObjectOutputStream oos = new ObjectOutputStream( socket.getOutputStream() );
 
@@ -661,4 +663,28 @@
             return t;
         }
     }
+
+    /**
+     * Shuts down the receiver.
+     */
+    public void shutdown()
+    {
+        if ( !shutdown )
+        {
+            shutdown = true;
+
+            if ( log.isInfoEnabled() )
+            {
+                log.info( "Shutting down TCP Lateral receiver." );
+            }
+            receiver.interrupt();
+        }
+        else
+        {
+            if ( log.isDebugEnabled() )
+            {
+                log.debug( "Shutdown already called." );
+            }
+        }
+    }
 }

Modified: jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/UDPDiscoveryManager.java
URL: http://svn.apache.org/viewvc/jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/UDPDiscoveryManager.java?rev=796882&r1=796881&r2=796882&view=diff
==============================================================================
--- jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/UDPDiscoveryManager.java (original)
+++ jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/UDPDiscoveryManager.java Wed Jul 22 21:13:38 2009
@@ -94,10 +94,12 @@
             attributes.setServicePort( servicePort );
 
             service = new UDPDiscoveryService( attributes, cacheEventLogger );
-            
-            // register for shutdown notification
-            ( (IShutdownObservable) cacheMgr ).registerShutdownObserver( service );
 
+            // register for shutdown notification
+            if ( cacheMgr instanceof IShutdownObservable )
+            {
+                ( (IShutdownObservable) cacheMgr ).registerShutdownObserver( service );
+            }
             services.put( key, service );
         }
 

Modified: jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/UDPDiscoverySender.java
URL: http://svn.apache.org/viewvc/jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/UDPDiscoverySender.java?rev=796882&r1=796881&r2=796882&view=diff
==============================================================================
--- jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/UDPDiscoverySender.java (original)
+++ jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/UDPDiscoverySender.java Wed Jul 22 21:13:38 2009
@@ -68,9 +68,9 @@
     {
         try
         {
-            if ( log.isInfoEnabled() )
+            if ( log.isDebugEnabled() )
             {
-                log.info( "Constructing socket for sender." );
+                log.debug( "Constructing socket for sender on port [" + port + "]" );
             }
             localSocket = new MulticastSocket( port );
 

Modified: jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/UDPDiscoveryService.java
URL: http://svn.apache.org/viewvc/jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/UDPDiscoveryService.java?rev=796882&r1=796881&r2=796882&view=diff
==============================================================================
--- jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/UDPDiscoveryService.java (original)
+++ jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/UDPDiscoveryService.java Wed Jul 22 21:13:38 2009
@@ -116,8 +116,6 @@
                 + getUdpDiscoveryAttributes().getUdpDiscoveryPort() + "] we won't be able to find any other caches", e );
         }
 
-        // todo only do the passive if receive is enabled, perhaps set the
-        // myhost to null or something on the request
         if ( senderDaemon == null )
         {
             senderDaemon = new ClockDaemon();
@@ -125,16 +123,15 @@
         }
 
         // create a sender thread
-        sender = new UDPDiscoverySenderThread( getUdpDiscoveryAttributes(), this.getCacheNames() );
-
-        senderDaemon.executePeriodically( 30 * 1000, sender, false );
+        sender = new UDPDiscoverySenderThread( getUdpDiscoveryAttributes(), getCacheNames() );
+        senderDaemon.executePeriodically( 15 * 1000, sender, true );        
 
         // add the cleanup daemon too
         cleanup = new UDPCleanupRunner( this );
         // I'm going to use this as both, but it could happen
-        // that something could hang around twice the time suing this as the
+        // that something could hang around twice the time using this as the
         // delay and the idle time.
-        senderDaemon.executePeriodically( this.getUdpDiscoveryAttributes().getMaxIdleTimeSec() * 1000, cleanup, false );
+        senderDaemon.executePeriodically( getUdpDiscoveryAttributes().getMaxIdleTimeSec() * 1000, cleanup, false );
 
         // add shutdown hook that will issue a remove call.
         DiscoveryShutdownHook shutdownHook = new DiscoveryShutdownHook( this );
@@ -208,7 +205,15 @@
      */
     public void removeDiscoveredService( DiscoveredService service )
     {
-        getDiscoveredServices().remove( service );
+        boolean contained = getDiscoveredServices().remove( service );
+        
+        if ( contained )
+        {
+            if ( log.isInfoEnabled() )
+            {
+                log.info( "Removing " + service );
+            }
+        }
         
         Iterator it = getDiscoveryListeners().iterator();
         while ( it.hasNext() )

Added: jakarta/jcs/trunk/src/test/org/apache/jcs/auxiliary/lateral/LateralCacheNoWaitFacadeUnitTest.java
URL: http://svn.apache.org/viewvc/jakarta/jcs/trunk/src/test/org/apache/jcs/auxiliary/lateral/LateralCacheNoWaitFacadeUnitTest.java?rev=796882&view=auto
==============================================================================
--- jakarta/jcs/trunk/src/test/org/apache/jcs/auxiliary/lateral/LateralCacheNoWaitFacadeUnitTest.java (added)
+++ jakarta/jcs/trunk/src/test/org/apache/jcs/auxiliary/lateral/LateralCacheNoWaitFacadeUnitTest.java Wed Jul 22 21:13:38 2009
@@ -0,0 +1,121 @@
+package org.apache.jcs.auxiliary.lateral;
+
+import org.apache.jcs.auxiliary.lateral.behavior.ILateralCacheAttributes;
+
+import junit.framework.TestCase;
+
+/**
+ * Tests for LateralCacheNoWaitFacade.
+ */
+public class LateralCacheNoWaitFacadeUnitTest
+    extends TestCase
+{
+    /**
+     * Verify that we can remove an item.
+     */
+    public void testAddThenRemoveNoWait_InList()
+    {
+        // SETUP
+        LateralCacheNoWait[] noWaits = new LateralCacheNoWait[0];
+        ILateralCacheAttributes cattr = new LateralCacheAttributes();
+        cattr.setCacheName( "testCache1" );
+        
+        LateralCacheNoWaitFacade facade = new LateralCacheNoWaitFacade( noWaits, cattr );
+        
+        LateralCache cache = new LateralCache( cattr );
+        LateralCacheNoWait noWait = new LateralCacheNoWait( cache );
+        
+        // DO WORK
+        facade.addNoWait( noWait );
+
+        // VERIFY
+        assertTrue( "Should be in the list.", facade.containsNoWait( noWait ) );
+        
+        // DO WORK
+        facade.removeNoWait( noWait );
+        
+        // VERIFY
+        assertEquals( "Should have 0", 0, facade.noWaits.length );               
+        assertFalse( "Should not be in the list. ", facade.containsNoWait( noWait ) );
+    }
+    
+    /**
+     * Verify that we can remove an item.
+     */
+    public void testAddThenRemoveNoWait_InListSize2()
+    {
+        // SETUP
+        LateralCacheNoWait[] noWaits = new LateralCacheNoWait[0];
+        ILateralCacheAttributes cattr = new LateralCacheAttributes();
+        cattr.setCacheName( "testCache1" );
+        
+        LateralCacheNoWaitFacade facade = new LateralCacheNoWaitFacade( noWaits, cattr );
+        
+        LateralCache cache = new LateralCache( cattr );
+        LateralCacheNoWait noWait = new LateralCacheNoWait( cache );
+        LateralCacheNoWait noWait2 = new LateralCacheNoWait( cache );
+        
+        // DO WORK
+        facade.addNoWait( noWait );
+        facade.addNoWait( noWait2 );
+
+        // VERIFY
+        assertEquals( "Should have 2", 2, facade.noWaits.length );        
+        assertTrue( "Should be in the list.", facade.containsNoWait( noWait ) );
+        assertTrue( "Should be in the list.", facade.containsNoWait( noWait2 ) );
+        
+        // DO WORK
+        facade.removeNoWait( noWait );
+        
+        // VERIFY        
+        assertEquals( "Should only have 1", 1, facade.noWaits.length );        
+        assertFalse( "Should not be in the list. ", facade.containsNoWait( noWait ) );
+        assertTrue( "Should be in the list.", facade.containsNoWait( noWait2 ) );
+    }
+    
+    /**
+     * Verify that we can remove an item.
+     */
+    public void testAdd_InList()
+    {
+        // SETUP
+        LateralCacheNoWait[] noWaits = new LateralCacheNoWait[0];
+        ILateralCacheAttributes cattr = new LateralCacheAttributes();
+        cattr.setCacheName( "testCache1" );
+        
+        LateralCacheNoWaitFacade facade = new LateralCacheNoWaitFacade( noWaits, cattr );
+        
+        LateralCache cache = new LateralCache( cattr );
+        LateralCacheNoWait noWait = new LateralCacheNoWait( cache );
+        
+        // DO WORK
+        facade.addNoWait( noWait );
+        facade.addNoWait( noWait );
+
+        // VERIFY
+        assertTrue( "Should be in the list.", facade.containsNoWait( noWait ) );
+        assertEquals( "Should only have 1", 1, facade.noWaits.length );
+    }
+    
+    /**
+     * Verify that we can remove an item.
+     */
+    public void testAddThenRemoveNoWait_NotInList()
+    {
+        // SETUP
+        LateralCacheNoWait[] noWaits = new LateralCacheNoWait[0];
+        ILateralCacheAttributes cattr = new LateralCacheAttributes();
+        cattr.setCacheName( "testCache1" );
+        
+        LateralCacheNoWaitFacade facade = new LateralCacheNoWaitFacade( noWaits, cattr );
+        
+        LateralCache cache = new LateralCache( cattr );
+        LateralCacheNoWait noWait = new LateralCacheNoWait( cache );       
+        
+        // DO WORK
+        facade.removeNoWait( noWait );
+        
+        // VERIFY
+        assertFalse( "Should not be in the list.", facade.containsNoWait( noWait ) );
+    }    
+}



---------------------------------------------------------------------
To unsubscribe, e-mail: jcs-dev-unsubscribe@jakarta.apache.org
For additional commands, e-mail: jcs-dev-help@jakarta.apache.org