You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jcs-dev@jakarta.apache.org by as...@apache.org on 2009/07/22 23:13:39 UTC
svn commit: r796882 - in /jakarta/jcs/trunk/src: conf/
java/org/apache/jcs/auxiliary/lateral/
java/org/apache/jcs/auxiliary/lateral/socket/tcp/
java/org/apache/jcs/utils/discovery/ test/org/apache/jcs/auxiliary/lateral/
Author: asmuts
Date: Wed Jul 22 21:13:38 2009
New Revision: 796882
URL: http://svn.apache.org/viewvc?rev=796882&view=rev
Log:
Improving lateral TCP discovery.
Added:
jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/socket/tcp/LateralTCPDiscoveryListenerManager.java
jakarta/jcs/trunk/src/test/org/apache/jcs/auxiliary/lateral/LateralCacheNoWaitFacadeUnitTest.java
Modified:
jakarta/jcs/trunk/src/conf/cacheTCP2.ccf
jakarta/jcs/trunk/src/conf/cacheTCP3.ccf
jakarta/jcs/trunk/src/conf/cacheTCP4.ccf
jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/LateralCache.java
jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/LateralCacheAbstractManager.java
jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/LateralCacheNoWait.java
jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/LateralCacheNoWaitFacade.java
jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/socket/tcp/LateralTCPCacheFactory.java
jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/socket/tcp/LateralTCPCacheManager.java
jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/socket/tcp/LateralTCPDiscoveryListener.java
jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/socket/tcp/LateralTCPListener.java
jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/UDPDiscoveryManager.java
jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/UDPDiscoverySender.java
jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/UDPDiscoveryService.java
Modified: jakarta/jcs/trunk/src/conf/cacheTCP2.ccf
URL: http://svn.apache.org/viewvc/jakarta/jcs/trunk/src/conf/cacheTCP2.ccf?rev=796882&r1=796881&r2=796882&view=diff
==============================================================================
Binary files - no diff available.
Modified: jakarta/jcs/trunk/src/conf/cacheTCP3.ccf
URL: http://svn.apache.org/viewvc/jakarta/jcs/trunk/src/conf/cacheTCP3.ccf?rev=796882&r1=796881&r2=796882&view=diff
==============================================================================
Binary files - no diff available.
Modified: jakarta/jcs/trunk/src/conf/cacheTCP4.ccf
URL: http://svn.apache.org/viewvc/jakarta/jcs/trunk/src/conf/cacheTCP4.ccf?rev=796882&r1=796881&r2=796882&view=diff
==============================================================================
Binary files - no diff available.
Modified: jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/LateralCache.java
URL: http://svn.apache.org/viewvc/jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/LateralCache.java?rev=796882&r1=796881&r2=796882&view=diff
==============================================================================
--- jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/LateralCache.java (original)
+++ jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/LateralCache.java Wed Jul 22 21:13:38 2009
@@ -111,12 +111,12 @@
catch ( NullPointerException npe )
{
log.error( "Failure updating lateral. lateral = " + lateral, npe );
- handleException( npe, "Failed to put [" + ce.getKey() + "] to " + ce.getCacheName() );
+ handleException( npe, "Failed to put [" + ce.getKey() + "] to " + ce.getCacheName() + "@" + cattr );
return;
}
catch ( Exception ex )
{
- handleException( ex, "Failed to put [" + ce.getKey() + "] to " + ce.getCacheName() );
+ handleException( ex, "Failed to put [" + ce.getKey() + "] to " + ce.getCacheName() + "@" + cattr );
}
}
@@ -143,7 +143,7 @@
catch ( Exception e )
{
log.error( e );
- handleException( e, "Failed to get [" + key + "] from " + this.cattr.getCacheName() );
+ handleException( e, "Failed to get [" + key + "] from " + cattr.getCacheName() + "@" + cattr );
}
return obj;
}
@@ -170,7 +170,7 @@
catch ( Exception e )
{
log.error( e );
- handleException( e, "Failed to getMatching [" + pattern + "] from " + this.cattr.getCacheName() );
+ handleException( e, "Failed to getMatching [" + pattern + "] from " + cattr.getCacheName() + "@" + cattr );
}
return elements;
}
@@ -239,7 +239,7 @@
}
catch ( Exception ex )
{
- handleException( ex, "Failed to remove " + key + " from " + this.cattr.getCacheName() );
+ handleException( ex, "Failed to remove " + key + " from " + cattr.getCacheName() + "@" + cattr );
}
return false;
}
@@ -259,7 +259,7 @@
}
catch ( Exception ex )
{
- handleException( ex, "Failed to remove all from " + this.cattr.getCacheName() );
+ handleException( ex, "Failed to remove all from " + cattr.getCacheName() + "@" + cattr );
}
}
@@ -285,7 +285,7 @@
catch ( Exception ex )
{
log.error( "Couldn't dispose", ex );
- handleException( ex, "Failed to dispose " + this.cattr.getCacheName() );
+ handleException( ex, "Failed to dispose " + cattr.getCacheName() );
}
}
Modified: jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/LateralCacheAbstractManager.java
URL: http://svn.apache.org/viewvc/jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/LateralCacheAbstractManager.java?rev=796882&r1=796881&r2=796882&view=diff
==============================================================================
--- jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/LateralCacheAbstractManager.java (original)
+++ jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/LateralCacheAbstractManager.java Wed Jul 22 21:13:38 2009
@@ -33,6 +33,7 @@
import org.apache.jcs.auxiliary.lateral.behavior.ILateralCacheObserver;
import org.apache.jcs.auxiliary.lateral.behavior.ILateralCacheService;
import org.apache.jcs.engine.behavior.IElementSerializer;
+import org.apache.jcs.engine.behavior.IShutdownObserver;
import org.apache.jcs.engine.logging.behavior.ICacheEventLogger;
/**
@@ -49,7 +50,7 @@
* Acceptable for all - cache managers or a manager within a type
*/
public abstract class LateralCacheAbstractManager
- implements ILateralCacheManager
+ implements ILateralCacheManager, IShutdownObserver
{
/** Don't change */
private static final long serialVersionUID = -515393179178435508L;
Modified: jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/LateralCacheNoWait.java
URL: http://svn.apache.org/viewvc/jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/LateralCacheNoWait.java?rev=796882&r1=796881&r2=796882&view=diff
==============================================================================
--- jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/LateralCacheNoWait.java (original)
+++ jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/LateralCacheNoWait.java Wed Jul 22 21:13:38 2009
@@ -420,6 +420,11 @@
se.setData( "" + this.putCount );
elems.add( se );
+ se = new StatElement();
+ se.setName( "Attributes" );
+ se.setData( "" + cache.getAuxiliaryCacheAttributes() );
+ elems.add( se );
+
// get an array and put them in the Stats object
IStatElement[] ses = (IStatElement[]) elems.toArray( new StatElement[elems.size()] );
stats.setStatElements( ses );
Modified: jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/LateralCacheNoWaitFacade.java
URL: http://svn.apache.org/viewvc/jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/LateralCacheNoWaitFacade.java?rev=796882&r1=796881&r2=796882&view=diff
==============================================================================
--- jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/LateralCacheNoWaitFacade.java (original)
+++ jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/LateralCacheNoWaitFacade.java Wed Jul 22 21:13:38 2009
@@ -85,10 +85,29 @@
}
/**
+ * Tells you if the no wait is in the list or not.
+ * <p>
+ * @param noWait
+ * @return true if the noWait is in the list.
+ */
+ public boolean containsNoWait( LateralCacheNoWait noWait )
+ {
+ for ( int i = 0; i < noWaits.length; i++ )
+ {
+ // we know noWait isn't null
+ if ( noWait.equals( noWaits[i] ) )
+ {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
* Adds a no wait to the list if it isn't already in the list.
* <p>
* @param noWait
- * @return true if it wasn't alreay contained
+ * @return true if it wasn't already contained
*/
public synchronized boolean addNoWait( LateralCacheNoWait noWait )
{
@@ -97,17 +116,13 @@
return false;
}
- for ( int i = 0; i < noWaits.length; i++ )
+ if ( containsNoWait( noWait ) )
{
- // we know noWait isn't null
- if ( noWait.equals( noWaits[i] ) )
+ if ( log.isDebugEnabled() )
{
- if ( log.isDebugEnabled() )
- {
- log.debug( "No Wait already contained, [" + noWait + "]" );
- }
- return false;
+ log.debug( "No Wait already contained, [" + noWait + "]" );
}
+ return false;
}
LateralCacheNoWait[] newArray = new LateralCacheNoWait[noWaits.length + 1];
@@ -142,28 +157,27 @@
if ( noWait.equals( noWaits[i] ) )
{
position = i;
+ break;
}
}
-
+
if ( position == -1 )
{
return false;
}
-
- LateralCacheNoWait[] newArray = new LateralCacheNoWait[noWaits.length -1];
- if ( position > 0 )
+ LateralCacheNoWait[] newArray = new LateralCacheNoWait[noWaits.length - 1];
+
+ System.arraycopy( noWaits, 0, newArray, 0, position );
+ if ( noWaits.length != position )
{
- System.arraycopy( noWaits, 0, newArray, 0, position -1 );
+ System.arraycopy( noWaits, position + 1, newArray, position, noWaits.length - position - 1 );
}
- System.arraycopy( noWaits, position +1, newArray, 0, noWaits.length );
-
noWaits = newArray;
return true;
}
-
/**
* @param ce
* @throws IOException
Modified: jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/socket/tcp/LateralTCPCacheFactory.java
URL: http://svn.apache.org/viewvc/jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/socket/tcp/LateralTCPCacheFactory.java?rev=796882&r1=796881&r2=796882&view=diff
==============================================================================
--- jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/socket/tcp/LateralTCPCacheFactory.java (original)
+++ jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/socket/tcp/LateralTCPCacheFactory.java Wed Jul 22 21:13:38 2009
@@ -32,8 +32,11 @@
import org.apache.jcs.auxiliary.lateral.behavior.ILateralCacheAttributes;
import org.apache.jcs.auxiliary.lateral.socket.tcp.behavior.ITCPLateralCacheAttributes;
import org.apache.jcs.engine.behavior.ICache;
+import org.apache.jcs.engine.behavior.ICacheListener;
import org.apache.jcs.engine.behavior.ICompositeCacheManager;
import org.apache.jcs.engine.behavior.IElementSerializer;
+import org.apache.jcs.engine.behavior.IShutdownObservable;
+import org.apache.jcs.engine.behavior.IShutdownObserver;
import org.apache.jcs.engine.logging.behavior.ICacheEventLogger;
import org.apache.jcs.utils.discovery.UDPDiscoveryManager;
import org.apache.jcs.utils.discovery.UDPDiscoveryService;
@@ -51,6 +54,9 @@
/** The logger */
private final static Log log = LogFactory.getLog( LateralTCPCacheFactory.class );
+ /** Non singleton manager. Used by this instance of the factory. */
+ private LateralTCPDiscoveryListenerManager lateralTCPDiscoveryListenerManager;
+
/**
* Creates a TCP lateral.
* <p>
@@ -66,7 +72,7 @@
ITCPLateralCacheAttributes lac = (ITCPLateralCacheAttributes) iaca;
ArrayList noWaits = new ArrayList();
- // pars up the tcp servers and set the tcpServer value and
+ // pairs up the tcp servers and set the tcpServer value and
// get the manager and then get the cache
// no servers are required.
if ( lac.getTcpServers() != null )
@@ -126,13 +132,19 @@
{
if ( log.isInfoEnabled() )
{
- log.info( "Creating listener for " + lac );
+ log.info( "Getting listener for " + lac );
}
try
{
// make a listener. if one doesn't exist
- LateralTCPListener.getInstance( attr, cacheMgr );
+ ICacheListener listener = LateralTCPListener.getInstance( attr, cacheMgr );
+
+ // register for shutdown notification
+ if ( listener instanceof IShutdownObserver && cacheMgr instanceof IShutdownObservable )
+ {
+ ( (IShutdownObservable) cacheMgr ).registerShutdownObserver( (IShutdownObserver) listener );
+ }
}
catch ( Exception e )
{
@@ -158,22 +170,27 @@
* @param elementSerializer
* @return null if none is created.
*/
- private UDPDiscoveryService createDiscoveryService( ITCPLateralCacheAttributes lac, LateralCacheNoWaitFacade lcnwf,
- ICompositeCacheManager cacheMgr,
- ICacheEventLogger cacheEventLogger,
- IElementSerializer elementSerializer )
+ private synchronized UDPDiscoveryService createDiscoveryService( ITCPLateralCacheAttributes lac,
+ LateralCacheNoWaitFacade lcnwf,
+ ICompositeCacheManager cacheMgr,
+ ICacheEventLogger cacheEventLogger,
+ IElementSerializer elementSerializer )
{
UDPDiscoveryService discovery = null;
// create the UDP discovery for the TCP lateral
if ( lac.isUdpDiscoveryEnabled() )
{
- // TODO this will create one for each region, but one can be used for all regions
- LateralTCPDiscoveryListener discoveryListener = new LateralTCPDiscoveryListener( cacheMgr,
- cacheEventLogger,
- elementSerializer );
+ if ( lateralTCPDiscoveryListenerManager == null )
+ {
+ lateralTCPDiscoveryListenerManager = new LateralTCPDiscoveryListenerManager();
+ }
+
+ // One can be used for all regions
+ LateralTCPDiscoveryListener discoveryListener = lateralTCPDiscoveryListenerManager
+ .getDiscoveryListener( lac, cacheMgr, cacheEventLogger, elementSerializer );
- discoveryListener.addNoWaitFacade( lcnwf, lac.getCacheName() );
+ discoveryListener.addNoWaitFacade( lac.getCacheName(), lcnwf );
// need a factory for this so it doesn't
// get dereferenced, also we don't want one for every region.
@@ -187,7 +204,7 @@
if ( log.isInfoEnabled() )
{
- log.info( "Created UDPDiscoveryService for TCP lateral cache." );
+ log.info( "Registered TCP lateral cache [" + lac.getCacheName() + "] with UDPDiscoveryService." );
}
}
return discovery;
Modified: jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/socket/tcp/LateralTCPCacheManager.java
URL: http://svn.apache.org/viewvc/jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/socket/tcp/LateralTCPCacheManager.java?rev=796882&r1=796881&r2=796882&view=diff
==============================================================================
--- jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/socket/tcp/LateralTCPCacheManager.java (original)
+++ jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/socket/tcp/LateralTCPCacheManager.java Wed Jul 22 21:13:38 2009
@@ -104,18 +104,19 @@
{
synchronized ( instances )
{
- LateralTCPCacheManager ins = (LateralTCPCacheManager) instances.get( lca.toString() );
+ String key = lca.getTcpServer();
+ LateralTCPCacheManager ins = (LateralTCPCacheManager) instances.get( key );
if ( ins == null )
{
- log.info( "Instance for [" + lca.toString() + "] is null, creating" );
+ log.info( "Instance for [" + key + "] is null, creating" );
- ins = (LateralTCPCacheManager) instances.get( lca.toString() );
+ ins = (LateralTCPCacheManager) instances.get( lca.getTcpServer() );
if ( ins == null )
{
ins = new LateralTCPCacheManager( lca, cacheMgr, cacheEventLogger, elementSerializer );
- instances.put( lca.toString(), ins );
+ instances.put( key, ins );
}
-
+
createMonitor( ins );
}
ins.clients++;
@@ -153,8 +154,8 @@
* <p>
* @param lcaA
* @param cacheMgr
- * @param cacheEventLogger
- * @param elementSerializer
+ * @param cacheEventLogger
+ * @param elementSerializer
*/
private LateralTCPCacheManager( ITCPLateralCacheAttributes lcaA, ICompositeCacheManager cacheMgr,
ICacheEventLogger cacheEventLogger, IElementSerializer elementSerializer )
@@ -217,7 +218,7 @@
{
synchronized ( this.caches )
{
- this.lateralWatch.addCacheListener( cacheName, listener );
+ lateralWatch.addCacheListener( cacheName, listener );
}
}
@@ -236,9 +237,9 @@
public AuxiliaryCache getCache( String cacheName )
{
LateralCacheNoWait lateralNoWait = null;
- synchronized ( this.caches )
+ synchronized ( caches )
{
- lateralNoWait = (LateralCacheNoWait) this.caches.get( cacheName );
+ lateralNoWait = (LateralCacheNoWait) caches.get( cacheName );
if ( lateralNoWait == null )
{
LateralCacheAttributes attr = (LateralCacheAttributes) lca.copy();
@@ -257,22 +258,35 @@
lateralNoWait.setCacheEventLogger( cacheEventLogger );
lateralNoWait.setElementSerializer( elementSerializer );
- this.caches.put( cacheName, lateralNoWait );
+ caches.put( cacheName, lateralNoWait );
if ( log.isInfoEnabled() )
{
- log.info( "Created LateralCacheNoWait for [" + this.lca + "] LateralCacheNoWait = ["
- + lateralNoWait + "]" );
+ log.info( "Created LateralCacheNoWait for [" + lca + "] LateralCacheNoWait = [" + lateralNoWait
+ + "]" );
}
+
+ // this used to be called every getCache. i move it in here.
+ addListenerIfNeeded( cacheName );
}
}
+ return lateralNoWait;
+ }
+
+ /**
+ * TODO see if this belongs here or in the factory.
+ * <p>
+ * @param cacheName
+ */
+ private void addListenerIfNeeded( String cacheName )
+ {
// don't create a listener if we are not receiving.
if ( lca.isReceive() )
{
try
{
- addLateralCacheListener( cacheName, LateralTCPListener.getInstance( this.lca, cacheMgr ) );
+ addLateralCacheListener( cacheName, LateralTCPListener.getInstance( lca, cacheMgr ) );
}
catch ( IOException ioe )
{
@@ -290,9 +304,6 @@
log.debug( "Not creating a listener since we are not receiving." );
}
}
- // TODO: need listener repair
-
- return lateralNoWait;
}
/**
@@ -322,4 +333,21 @@
}
return service;
}
+
+ /**
+ * Shutsdown the lateral.
+ */
+ public void shutdown()
+ {
+ // TODO revist this.
+ // the name here doesn't matter.
+ try
+ {
+ lateralService.dispose( "ALL" );
+ }
+ catch ( IOException e )
+ {
+ log.error( "Problem disposing of service", e );
+ }
+ }
}
Modified: jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/socket/tcp/LateralTCPDiscoveryListener.java
URL: http://svn.apache.org/viewvc/jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/socket/tcp/LateralTCPDiscoveryListener.java?rev=796882&r1=796881&r2=796882&view=diff
==============================================================================
--- jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/socket/tcp/LateralTCPDiscoveryListener.java (original)
+++ jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/socket/tcp/LateralTCPDiscoveryListener.java Wed Jul 22 21:13:38 2009
@@ -1,6 +1,7 @@
package org.apache.jcs.auxiliary.lateral.socket.tcp;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
@@ -33,7 +34,7 @@
* Map of no wait facades. these are used to determine which regions are locally configured to
* use laterals.
*/
- private Map facades = new HashMap();
+ private Map facades = Collections.synchronizedMap( new HashMap() );
/** The cache manager. */
private ICompositeCacheManager cacheMgr;
@@ -65,11 +66,11 @@
* This adds nowaits to a facade for the region name. If the region has no facade, then it is
* not configured to use the lateral cache, and no facade will be created.
* <p>
- * @param facade
+ * @param facade - facade (for region) => multiple lateral clients.
* @param cacheName
* @return true if the facade was not already registered.
*/
- public synchronized boolean addNoWaitFacade( LateralCacheNoWaitFacade facade, String cacheName )
+ public synchronized boolean addNoWaitFacade( String cacheName, LateralCacheNoWaitFacade facade )
{
boolean isNew = !facades.containsKey( cacheName );
@@ -109,8 +110,8 @@
{
if ( log.isInfoEnabled() )
{
- log.info( "Different nodes are configured differently. Region [" + noWait.getCacheName()
- + "] is not configured to use the lateral cache." );
+ log.info( "addNoWait > Different nodes are configured differently or region [" + noWait.getCacheName()
+ + "] is not yet used on this side. " );
}
}
}
@@ -131,18 +132,18 @@
if ( facade != null )
{
- boolean isNew = facade.addNoWait( noWait );
+ boolean removed = facade.removeNoWait( noWait );
if ( log.isDebugEnabled() )
{
- log.debug( "Called addNoWait, isNew = " + isNew );
+ log.debug( "Called removeNoWait, removed " + removed );
}
}
else
{
if ( log.isInfoEnabled() )
{
- log.info( "Different nodes are configured differently. Region [" + noWait.getCacheName()
- + "] is not configured to use the lateral cache." );
+ log.info( "removeNoWait > Different nodes are configured differently or region ["
+ + noWait.getCacheName() + "] is not yet used on this side. " );
}
}
}
@@ -150,6 +151,17 @@
/**
* Creates the lateral cache if needed.
* <p>
+ * We could go to the composite cache manager and get the the cache for the region. This would
+ * force a full configuration of the region. One advantage of this would be that the creation of
+ * the later would go through the factory, which would add the item to the no wait list. But we
+ * don't want to do this. This would force this client to have all the regions as the other.
+ * This might not be desired. We don't want to send or recieve for a region here that is either
+ * not used or not configured to use the lateral.
+ * <p>
+ * Right now, I'm afraid that the region will get puts if another instance has the region
+ * configured to use the lateral and our address is configured. This might be a bug, but it
+ * shouldn't happen with discovery.
+ * <p>
* @param service
*/
public void addDiscoveredService( DiscoveredService service )
@@ -202,6 +214,9 @@
/**
* Removes the lateral cache.
* <p>
+ * We need to tell the manager that this instance is bad, so it will reconnect the sender if it
+ * comes back.
+ * <p>
* @param service
*/
public void removeDiscoveredService( DiscoveredService service )
Added: jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/socket/tcp/LateralTCPDiscoveryListenerManager.java
URL: http://svn.apache.org/viewvc/jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/socket/tcp/LateralTCPDiscoveryListenerManager.java?rev=796882&view=auto
==============================================================================
--- jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/socket/tcp/LateralTCPDiscoveryListenerManager.java (added)
+++ jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/socket/tcp/LateralTCPDiscoveryListenerManager.java Wed Jul 22 21:13:38 2009
@@ -0,0 +1,70 @@
+package org.apache.jcs.auxiliary.lateral.socket.tcp;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.jcs.auxiliary.lateral.socket.tcp.behavior.ITCPLateralCacheAttributes;
+import org.apache.jcs.engine.behavior.ICompositeCacheManager;
+import org.apache.jcs.engine.behavior.IElementSerializer;
+import org.apache.jcs.engine.logging.behavior.ICacheEventLogger;
+
+/**
+ * The factory holds an instance of this maanger. This manager has a map of listeners, keyed to the
+ * discovery configuration. I'm not using a static map, because I'm trying to make JCS
+ * multi-instance.
+ * <p>
+ * During configuration, the factory is only created once per auxiliary definition. Two different
+ * laterals canot use the same discovery service. We will likey wantt o change this.
+ */
+public class LateralTCPDiscoveryListenerManager
+{
+ /** Map of available instances, keyed by port. Note, this is not static. */
+ protected Map instances = Collections.synchronizedMap( new HashMap() );
+
+ /** The logger */
+ private final static Log log = LogFactory.getLog( LateralTCPDiscoveryListenerManager.class );
+
+ /** Does nothing. */
+ public LateralTCPDiscoveryListenerManager()
+ {
+ if ( log.isInfoEnabled() )
+ {
+ log.info( "Creating new LateralTCPDiscoveryListenerManager" );
+ }
+ }
+
+ /**
+ * Gets the instance attribute of the LateralCacheTCPListener class.
+ * <p>
+ * @param ilca ITCPLateralCacheAttributes
+ * @param cacheMgr
+ * @param cacheEventLogger
+ * @param elementSerializer
+ * @return The instance value
+ */
+ public synchronized LateralTCPDiscoveryListener getDiscoveryListener( ITCPLateralCacheAttributes ilca,
+ ICompositeCacheManager cacheMgr,
+ ICacheEventLogger cacheEventLogger,
+ IElementSerializer elementSerializer )
+ {
+ String key = ilca.getUdpDiscoveryAddr() + ":" + ilca.getUdpDiscoveryPort();
+ LateralTCPDiscoveryListener ins = (LateralTCPDiscoveryListener) instances.get( key );
+
+ if ( ins == null )
+ {
+ ins = new LateralTCPDiscoveryListener( cacheMgr, cacheEventLogger, elementSerializer );
+
+ instances.put( String.valueOf( ilca.getTcpListenerPort() ), ins );
+
+ if ( log.isDebugEnabled() )
+ {
+ log.debug( "created new listener " + ilca.getTcpListenerPort() );
+ }
+ }
+
+ return ins;
+ }
+}
Modified: jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/socket/tcp/LateralTCPListener.java
URL: http://svn.apache.org/viewvc/jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/socket/tcp/LateralTCPListener.java?rev=796882&r1=796881&r2=796882&view=diff
==============================================================================
--- jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/socket/tcp/LateralTCPListener.java (original)
+++ jakarta/jcs/trunk/src/java/org/apache/jcs/auxiliary/lateral/socket/tcp/LateralTCPListener.java Wed Jul 22 21:13:38 2009
@@ -37,6 +37,7 @@
import org.apache.jcs.auxiliary.lateral.socket.tcp.behavior.ITCPLateralCacheAttributes;
import org.apache.jcs.engine.behavior.ICacheElement;
import org.apache.jcs.engine.behavior.ICompositeCacheManager;
+import org.apache.jcs.engine.behavior.IShutdownObserver;
import org.apache.jcs.engine.control.CompositeCache;
import org.apache.jcs.engine.control.CompositeCacheManager;
@@ -49,7 +50,7 @@
* passed to a pooled executor which then calls the appropriate handle method.
*/
public class LateralTCPListener
- implements ILateralCacheListener, Serializable
+ implements ILateralCacheListener, Serializable, IShutdownObserver
{
/** Don't change. */
private static final long serialVersionUID = -9107062664967131738L;
@@ -93,6 +94,9 @@
*/
private long listenerId = LateralCacheInfo.listenerId;
+ /** is this shut down? */
+ private boolean shutdown = false;
+
/**
* Gets the instance attribute of the LateralCacheTCPListener class.
* <p>
@@ -115,9 +119,9 @@
instances.put( String.valueOf( ilca.getTcpListenerPort() ), ins );
- if ( log.isDebugEnabled() )
+ if ( log.isInfoEnabled() )
{
- log.debug( "created new listener " + ilca.getTcpListenerPort() );
+ log.info( "Created new listener " + ilca.getTcpListenerPort() );
}
}
@@ -322,7 +326,7 @@
return getCache( cacheName ).localGetMatching( pattern );
}
-
+
/**
* Right now this does nothing.
* <p>
@@ -333,12 +337,10 @@
{
if ( log.isInfoEnabled() )
{
- log.info( "handleDispose > cacheName=" + cacheName );
+ log.info( "handleDispose > cacheName=" + cacheName + " | Ignoring message. Do not dispose from remote." );
}
// TODO handle active deregistration, rather than passive detection
- // through error
- // getCacheManager().freeCache( cacheName, true );
}
/**
@@ -443,7 +445,7 @@
ConnectionHandler handler;
- while ( true )
+ while ( !shutdown )
{
if ( log.isDebugEnabled() )
{
@@ -538,11 +540,11 @@
}
catch ( java.io.EOFException e )
{
- log.info( "Caught java.io.EOFException closing connection." );
+ log.info( "Caught java.io.EOFException closing connection." + e.getMessage() );
}
catch ( java.net.SocketException e )
{
- log.info( "Caught java.net.SocketException closing connection." );
+ log.info( "Caught java.net.SocketException closing connection." + e.getMessage() );
}
catch ( Exception e )
{
@@ -628,7 +630,7 @@
}
else if ( led.command == LateralElementDescriptor.GET_MATCHING )
{
- Map obj = handleGetMatching( cacheName, (String)key );
+ Map obj = handleGetMatching( cacheName, (String) key );
ObjectOutputStream oos = new ObjectOutputStream( socket.getOutputStream() );
@@ -661,4 +663,28 @@
return t;
}
}
+
+ /**
+ * Shuts down the receiver.
+ */
+ public void shutdown()
+ {
+ if ( !shutdown )
+ {
+ shutdown = true;
+
+ if ( log.isInfoEnabled() )
+ {
+ log.info( "Shutting down TCP Lateral receiver." );
+ }
+ receiver.interrupt();
+ }
+ else
+ {
+ if ( log.isDebugEnabled() )
+ {
+ log.debug( "Shutdown already called." );
+ }
+ }
+ }
}
Modified: jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/UDPDiscoveryManager.java
URL: http://svn.apache.org/viewvc/jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/UDPDiscoveryManager.java?rev=796882&r1=796881&r2=796882&view=diff
==============================================================================
--- jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/UDPDiscoveryManager.java (original)
+++ jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/UDPDiscoveryManager.java Wed Jul 22 21:13:38 2009
@@ -94,10 +94,12 @@
attributes.setServicePort( servicePort );
service = new UDPDiscoveryService( attributes, cacheEventLogger );
-
- // register for shutdown notification
- ( (IShutdownObservable) cacheMgr ).registerShutdownObserver( service );
+ // register for shutdown notification
+ if ( cacheMgr instanceof IShutdownObservable )
+ {
+ ( (IShutdownObservable) cacheMgr ).registerShutdownObserver( service );
+ }
services.put( key, service );
}
Modified: jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/UDPDiscoverySender.java
URL: http://svn.apache.org/viewvc/jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/UDPDiscoverySender.java?rev=796882&r1=796881&r2=796882&view=diff
==============================================================================
--- jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/UDPDiscoverySender.java (original)
+++ jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/UDPDiscoverySender.java Wed Jul 22 21:13:38 2009
@@ -68,9 +68,9 @@
{
try
{
- if ( log.isInfoEnabled() )
+ if ( log.isDebugEnabled() )
{
- log.info( "Constructing socket for sender." );
+ log.debug( "Constructing socket for sender on port [" + port + "]" );
}
localSocket = new MulticastSocket( port );
Modified: jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/UDPDiscoveryService.java
URL: http://svn.apache.org/viewvc/jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/UDPDiscoveryService.java?rev=796882&r1=796881&r2=796882&view=diff
==============================================================================
--- jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/UDPDiscoveryService.java (original)
+++ jakarta/jcs/trunk/src/java/org/apache/jcs/utils/discovery/UDPDiscoveryService.java Wed Jul 22 21:13:38 2009
@@ -116,8 +116,6 @@
+ getUdpDiscoveryAttributes().getUdpDiscoveryPort() + "] we won't be able to find any other caches", e );
}
- // todo only do the passive if receive is enabled, perhaps set the
- // myhost to null or something on the request
if ( senderDaemon == null )
{
senderDaemon = new ClockDaemon();
@@ -125,16 +123,15 @@
}
// create a sender thread
- sender = new UDPDiscoverySenderThread( getUdpDiscoveryAttributes(), this.getCacheNames() );
-
- senderDaemon.executePeriodically( 30 * 1000, sender, false );
+ sender = new UDPDiscoverySenderThread( getUdpDiscoveryAttributes(), getCacheNames() );
+ senderDaemon.executePeriodically( 15 * 1000, sender, true );
// add the cleanup daemon too
cleanup = new UDPCleanupRunner( this );
// I'm going to use this as both, but it could happen
- // that something could hang around twice the time suing this as the
+ // that something could hang around twice the time using this as the
// delay and the idle time.
- senderDaemon.executePeriodically( this.getUdpDiscoveryAttributes().getMaxIdleTimeSec() * 1000, cleanup, false );
+ senderDaemon.executePeriodically( getUdpDiscoveryAttributes().getMaxIdleTimeSec() * 1000, cleanup, false );
// add shutdown hook that will issue a remove call.
DiscoveryShutdownHook shutdownHook = new DiscoveryShutdownHook( this );
@@ -208,7 +205,15 @@
*/
public void removeDiscoveredService( DiscoveredService service )
{
- getDiscoveredServices().remove( service );
+ boolean contained = getDiscoveredServices().remove( service );
+
+ if ( contained )
+ {
+ if ( log.isInfoEnabled() )
+ {
+ log.info( "Removing " + service );
+ }
+ }
Iterator it = getDiscoveryListeners().iterator();
while ( it.hasNext() )
Added: jakarta/jcs/trunk/src/test/org/apache/jcs/auxiliary/lateral/LateralCacheNoWaitFacadeUnitTest.java
URL: http://svn.apache.org/viewvc/jakarta/jcs/trunk/src/test/org/apache/jcs/auxiliary/lateral/LateralCacheNoWaitFacadeUnitTest.java?rev=796882&view=auto
==============================================================================
--- jakarta/jcs/trunk/src/test/org/apache/jcs/auxiliary/lateral/LateralCacheNoWaitFacadeUnitTest.java (added)
+++ jakarta/jcs/trunk/src/test/org/apache/jcs/auxiliary/lateral/LateralCacheNoWaitFacadeUnitTest.java Wed Jul 22 21:13:38 2009
@@ -0,0 +1,121 @@
+package org.apache.jcs.auxiliary.lateral;
+
+import org.apache.jcs.auxiliary.lateral.behavior.ILateralCacheAttributes;
+
+import junit.framework.TestCase;
+
+/**
+ * Tests for LateralCacheNoWaitFacade.
+ */
+public class LateralCacheNoWaitFacadeUnitTest
+ extends TestCase
+{
+ /**
+ * Verify that we can remove an item.
+ */
+ public void testAddThenRemoveNoWait_InList()
+ {
+ // SETUP
+ LateralCacheNoWait[] noWaits = new LateralCacheNoWait[0];
+ ILateralCacheAttributes cattr = new LateralCacheAttributes();
+ cattr.setCacheName( "testCache1" );
+
+ LateralCacheNoWaitFacade facade = new LateralCacheNoWaitFacade( noWaits, cattr );
+
+ LateralCache cache = new LateralCache( cattr );
+ LateralCacheNoWait noWait = new LateralCacheNoWait( cache );
+
+ // DO WORK
+ facade.addNoWait( noWait );
+
+ // VERIFY
+ assertTrue( "Should be in the list.", facade.containsNoWait( noWait ) );
+
+ // DO WORK
+ facade.removeNoWait( noWait );
+
+ // VERIFY
+ assertEquals( "Should have 0", 0, facade.noWaits.length );
+ assertFalse( "Should not be in the list. ", facade.containsNoWait( noWait ) );
+ }
+
+ /**
+ * Verify that we can remove an item.
+ */
+ public void testAddThenRemoveNoWait_InListSize2()
+ {
+ // SETUP
+ LateralCacheNoWait[] noWaits = new LateralCacheNoWait[0];
+ ILateralCacheAttributes cattr = new LateralCacheAttributes();
+ cattr.setCacheName( "testCache1" );
+
+ LateralCacheNoWaitFacade facade = new LateralCacheNoWaitFacade( noWaits, cattr );
+
+ LateralCache cache = new LateralCache( cattr );
+ LateralCacheNoWait noWait = new LateralCacheNoWait( cache );
+ LateralCacheNoWait noWait2 = new LateralCacheNoWait( cache );
+
+ // DO WORK
+ facade.addNoWait( noWait );
+ facade.addNoWait( noWait2 );
+
+ // VERIFY
+ assertEquals( "Should have 2", 2, facade.noWaits.length );
+ assertTrue( "Should be in the list.", facade.containsNoWait( noWait ) );
+ assertTrue( "Should be in the list.", facade.containsNoWait( noWait2 ) );
+
+ // DO WORK
+ facade.removeNoWait( noWait );
+
+ // VERIFY
+ assertEquals( "Should only have 1", 1, facade.noWaits.length );
+ assertFalse( "Should not be in the list. ", facade.containsNoWait( noWait ) );
+ assertTrue( "Should be in the list.", facade.containsNoWait( noWait2 ) );
+ }
+
+ /**
+ * Verify that we can remove an item.
+ */
+ public void testAdd_InList()
+ {
+ // SETUP
+ LateralCacheNoWait[] noWaits = new LateralCacheNoWait[0];
+ ILateralCacheAttributes cattr = new LateralCacheAttributes();
+ cattr.setCacheName( "testCache1" );
+
+ LateralCacheNoWaitFacade facade = new LateralCacheNoWaitFacade( noWaits, cattr );
+
+ LateralCache cache = new LateralCache( cattr );
+ LateralCacheNoWait noWait = new LateralCacheNoWait( cache );
+
+ // DO WORK
+ facade.addNoWait( noWait );
+ facade.addNoWait( noWait );
+
+ // VERIFY
+ assertTrue( "Should be in the list.", facade.containsNoWait( noWait ) );
+ assertEquals( "Should only have 1", 1, facade.noWaits.length );
+ }
+
+ /**
+ * Verify that we can remove an item.
+ */
+ public void testAddThenRemoveNoWait_NotInList()
+ {
+ // SETUP
+ LateralCacheNoWait[] noWaits = new LateralCacheNoWait[0];
+ ILateralCacheAttributes cattr = new LateralCacheAttributes();
+ cattr.setCacheName( "testCache1" );
+
+ LateralCacheNoWaitFacade facade = new LateralCacheNoWaitFacade( noWaits, cattr );
+
+ LateralCache cache = new LateralCache( cattr );
+ LateralCacheNoWait noWait = new LateralCacheNoWait( cache );
+
+ // DO WORK
+ facade.removeNoWait( noWait );
+
+ // VERIFY
+ assertFalse( "Should not be in the list.", facade.containsNoWait( noWait ) );
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: jcs-dev-unsubscribe@jakarta.apache.org
For additional commands, e-mail: jcs-dev-help@jakarta.apache.org