You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jcs-dev@jakarta.apache.org by as...@apache.org on 2004/05/14 06:59:40 UTC
cvs commit: jakarta-turbine-jcs/src/java/org/apache/jcs/auxiliary/javagroups JavaGroupsCacheFactory.java JavaGroupsCacheAttributes.java JavaGroupsCache.java
asmuts 2004/05/13 21:59:40
Added: src/java/org/apache/jcs/auxiliary/javagroups
JavaGroupsCacheFactory.java
JavaGroupsCacheAttributes.java JavaGroupsCache.java
Log:
trying to figure out what to add to this from the other. need to prevent blocking on startup.
Revision Changes Path
1.1 jakarta-turbine-jcs/src/java/org/apache/jcs/auxiliary/javagroups/JavaGroupsCacheFactory.java
Index: JavaGroupsCacheFactory.java
===================================================================
package org.apache.jcs.auxiliary.javagroups;
/*
* Copyright 2001-2004 The Apache Software Foundation.
*
* Licensed under the Apache License, Version 2.0 (the "License")
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.jcs.auxiliary.AuxiliaryCacheFactory;
import org.apache.jcs.auxiliary.AuxiliaryCache;
import org.apache.jcs.auxiliary.AuxiliaryCacheAttributes;
import org.apache.jcs.engine.control.CompositeCacheManager;
import org.apache.jcs.engine.control.CompositeCache;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jgroups.ChannelFactory;
import org.jgroups.Channel;
/**
* AuxiliaryCacheFactory for creating instances of {@link JavaGroupsCache}
* for a particular CompositeCache and {@link JavaGroupsCacheAttributes}.
*
* @version $Id: JavaGroupsCacheFactory.java,v 1.1 2004/05/14 04:59:40 asmuts Exp $
*/
public class JavaGroupsCacheFactory implements AuxiliaryCacheFactory
{
private final static Log log =
LogFactory.getLog( JavaGroupsCacheFactory.class );
private String name;
public AuxiliaryCache createCache( AuxiliaryCacheAttributes iaca,
CompositeCache cache )
{
try
{
// Cast provided attributes to JavaGroupsCacheAttributes
JavaGroupsCacheAttributes attributes =
( JavaGroupsCacheAttributes ) iaca;
// Create a ChannelFactory using the classname specified in the
// config as 'channelFactoryClassName'
ChannelFactory factory = ( ChannelFactory ) Class.forName(
attributes.getChannelFactoryClassName() ).newInstance();
// Create a channel based on 'channelProperties' from the config
Channel channel =
factory.createChannel( attributes.getJGChannelProperties() );
// Return a new JavaGroupsCache for the new channel.
return new JavaGroupsCache( cache,
channel,
attributes.isGetFromPeers() );
}
catch ( Exception e )
{
log.error( "Failed to create JavaGroupsCache", e );
return null;
}
}
/**
* Accessor for name property
*/
public String getName()
{
return this.name;
}
/**
* Mutator for name property
*/
public void setName( String name )
{
this.name = name;
}
}
1.1 jakarta-turbine-jcs/src/java/org/apache/jcs/auxiliary/javagroups/JavaGroupsCacheAttributes.java
Index: JavaGroupsCacheAttributes.java
===================================================================
package org.apache.jcs.auxiliary.javagroups;
/*
* Copyright 2001-2004 The Apache Software Foundation.
*
* Licensed under the Apache License, Version 2.0 (the "License")
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.jcs.auxiliary.AuxiliaryCacheAttributes;
/**
* Attributes used by {@link JavaGroupsCacheFactory#createCache} to configure
* an instance of {@link JavaGroupsCache}.
*
* <h3> Configurable Properties: </h3>
*
* <dl>
* <dt>channelFactoryClassName</dt>
* <dd>
* Name of an {@link org.jgroups.ChannelFactory} implementation which
* will be used to create the channel for the instance. Defaults to
* {@link org.jgroups.JChannelFactory}.
* </dd>
* <dt>channelProperties</dt>
* <dd>
* A JavaGroups properties object which will be used by the channel to
* create the protocol stack. Either a properties string, or the URL of
* a file containing the properties in XML form is valid. Defaults to null
* which causes the Channel implementation to use its defaults.
* </dd>
* </dl>
*
* @version $Id: JavaGroupsCacheAttributes.java,v 1.1 2004/05/14 04:59:40 asmuts Exp $
*/
public class JavaGroupsCacheAttributes implements AuxiliaryCacheAttributes
{
private String cacheName;
private String name;
private String channelFactoryClassName = "org.jgroups.JChannelFactory";
private String channelProperties = null;
private boolean getFromPeers = false;
public String getChannelFactoryClassName()
{
return channelFactoryClassName;
}
public void setChannelFactoryClassName( String channelFactoryClassName )
{
this.channelFactoryClassName = channelFactoryClassName;
}
public String getJGChannelProperties()
{
return channelProperties;
}
public void setChannelProperties( String channelProperties )
{
this.channelProperties = channelProperties;
}
public boolean isGetFromPeers()
{
return getFromPeers;
}
public void setGetFromPeers( boolean getFromPeers )
{
this.getFromPeers = getFromPeers;
}
// ----------------------------------------------- AuxiliaryCacheAttributes
/**
* Accessor for cacheName property.
*/
public String getCacheName()
{
return this.cacheName;
}
/**
* Mutator for cacheName property.
*/
public void setCacheName( String s )
{
this.cacheName = s;
}
/**
* Accessor for name property.
*/
public String getName()
{
return this.name;
}
/**
* Mutator for name property.
*/
public void setName( String name )
{
this.name = name;
}
/**
* Return a copy of this JavaGroupsCacheAttributes, cast to an
* AuxiliaryCacheAttributes
*/
public AuxiliaryCacheAttributes copy()
{
return ( AuxiliaryCacheAttributes ) this.clone();
}
/**
* Return a clone of this JavaGroupsCacheAttributes
*/
public Object clone()
{
JavaGroupsCacheAttributes copy = new JavaGroupsCacheAttributes();
copy.cacheName = this.cacheName;
copy.name = this.name;
copy.channelFactoryClassName = this.channelFactoryClassName;
copy.channelProperties = this.channelProperties;
return copy;
}
}
1.1 jakarta-turbine-jcs/src/java/org/apache/jcs/auxiliary/javagroups/JavaGroupsCache.java
Index: JavaGroupsCache.java
===================================================================
package org.apache.jcs.auxiliary.javagroups;
/*
* Copyright 2001-2004 The Apache Software Foundation.
*
* Licensed under the Apache License, Version 2.0 (the "License")
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.jcs.auxiliary.AuxiliaryCache;
import org.apache.jcs.engine.CacheConstants;
import org.apache.jcs.engine.CacheElement;
import org.apache.jcs.engine.behavior.ICacheElement;
import org.apache.jcs.engine.behavior.ICacheType;
import org.apache.jcs.engine.control.CompositeCache;
import org.jgroups.Channel;
import org.jgroups.Message;
import org.jgroups.View;
import org.jgroups.Address;
import org.jgroups.MembershipListener;
import org.jgroups.util.RspList;
import org.jgroups.blocks.RequestHandler;
import org.jgroups.blocks.GroupRequest;
import org.jgroups.blocks.MessageDispatcher;
import java.io.IOException;
import java.io.Serializable;
import java.util.Set;
import java.util.Vector;
/**
* Auxiliary cache using javagroups. Expects to be created with a Channel,
* the {@link JavaGroupsCacheFactory} is responsible for creating that channel.
* To do so it uses configuration properties specified by an instance of
* {@link JavaGroupsCacheAttributes}.
* <p>
* At creation time the provided channel is connected to a group having the
* same name as the cache / region name this auxiliary is associated with.
* update / remove / removeAll operations are broadcast to all members of the
* group. A listener thread processes requests from other members of the group,
* and dispatches to appropriate methods on the associated CompositeCache. </p>
* <p>
* Calls to get are currently ignored.
* <p>
* Messages are sent to peers asynchronously. Synchronous messaging could be
* added using MessageDispatcher or RpcDispatcher. Combined with a get
* implementation this could provide much higher cache consistency (but with
* a substantial speed penalty).
*
* @version $Id: JavaGroupsCache.java,v 1.1 2004/05/14 04:59:40 asmuts Exp $
*/
public class JavaGroupsCache
implements AuxiliaryCache, RequestHandler, MembershipListener
{
private final Log log = LogFactory.getLog( JavaGroupsCache.class );
private String cacheName;
private int status;
private boolean getFromPeers;
private CompositeCache cache;
private Channel channel;
private MessageDispatcher dispatcher;
public JavaGroupsCache( CompositeCache cache,
Channel channel,
boolean getFromPeers )
throws Exception
{
this.cache = cache;
this.cacheName = cache.getCacheName();
this.channel = channel;
this.getFromPeers = getFromPeers;
// The adapter listens to the channel and fires MessageListener events
// on this object.
dispatcher = new MessageDispatcher( channel, null, this, this );
// Connect channel to the 'group' for our region name
channel.setOpt( Channel.LOCAL, Boolean.FALSE );
channel.connect( cacheName );
// If all the above succeed, the cache is now alive.
this.status = CacheConstants.STATUS_ALIVE;
log.info( "Initialized for cache: " + cacheName );
}
public void send( ICacheElement element, int command )
{
Request request = new Request( element, command );
try
{
dispatcher.castMessage( null,
new Message( null, null, request ),
GroupRequest.GET_NONE,
0 );
}
catch ( Exception e )
{
log.error( "Failed to send JavaGroups message", e );
}
}
// ----------------------------------------------- interface AuxiliaryCache
/**
* Sends the provided element to all peers (connected to the same channel
* and region name).
*
* @param ce CacheElement to replicate
* @throws IOException Never thrown by this implementation
*/
public void update( ICacheElement ce ) throws IOException
{
send( ce, Request.UPDATE );
}
/**
* If 'getFromPeers' is true, this will attempt to get the requested
* element from ant other members of the group.
*
* @param key
* @return
* @throws IOException Never thrown by this implementation
*/
public ICacheElement get( Serializable key ) throws IOException
{
if ( getFromPeers )
{
CacheElement element = new CacheElement( cacheName, key, null );
Request request = new Request( element, Request.GET );
// Cast message and wait for all responses.
// FIXME: we can stop waiting after the first not null response,
// that is more difficult to implement however.
RspList responses =
dispatcher.castMessage( null,
new Message( null, null, request ),
GroupRequest.GET_ALL,
0 );
// Get results only gives the responses which were not null
Vector results = responses.getResults();
// If there were any non null results, return the first
if ( results.size() > 0 )
{
return ( ICacheElement ) results.get( 0 );
}
}
return null;
}
/**
* Sends a request to all peers to remove the element having the provided
* key.
*
* @param key Key of element to be removed
* @throws IOException Never thrown by this implementation
*/
public boolean remove( Serializable key ) throws IOException
{
CacheElement ce = new CacheElement( cacheName, key, null );
send( ce, Request.REMOVE );
return false;
}
/**
* Sends a request to remove ALL elements from the peers
*
* @throws IOException Never thrown by this implementation
*/
public void removeAll() throws IOException
{
CacheElement ce = new CacheElement( cacheName, null, null );
send( ce, Request.REMOVE_ALL );
}
/**
* Dispose this cache, terminates the listener thread and disconnects the
* channel from the group.
*
* @throws IOException
*/
public void dispose() throws IOException
{
// This will join the scheduler thread and ensure everything terminates
dispatcher.stop();
// Now we can disconnect from the group and close the channel
channel.disconnect();
channel.close();
status = CacheConstants.STATUS_DISPOSED;
log.info( "Disposed for cache: " + cacheName );
}
/**
* Since this is a lateral, size is not defined.
*
* @return Always returns 0
*/
public int getSize()
{
return 0;
}
/**
* Returns the status of this auxiliary.
*
* @return One of the status constants from {@link CacheConstants}
*/
public int getStatus()
{
return status;
}
/**
* Accessor for cacheName property
*
* @return Name of cache / region this auxiliary is associated with.
*/
public String getCacheName()
{
return cacheName;
}
/**
* Not implemented (I believe since get is not supported, this should also
* not be).
*
* @param group Ignored
* @return Always reurns null
*/
public Set getGroupKeys( String group )
{
return null;
}
// --------------------------------------------------- interface ICacheType
/**
* Get the cache type (always Lateral).
*
* @return Always returns ICacheType.LATERAL_CACHE
*/
public int getCacheType()
{
return ICacheType.LATERAL_CACHE;
}
// ----------------------------------------------- interface RequestHandler
/**
* Handles a message from a peer. The message should contain a Request,
* and depending on the command this will call localUpdate, localRemove,
* or localRemoveAll on the associated CompositeCache.
*
* @param msg The JavaGroups Message
* @return Always returns null
*/
public Object handle( Message msg )
{
try
{
Request request = ( Request ) msg.getObject();
// Switch based on the command and invoke the
// appropriate method on the associate composite cache
switch ( request.getCommand() )
{
case Request.GET:
return cache.localGet( request.getCacheElement().getKey() );
// break;
case Request.UPDATE:
cache.localUpdate( request.getCacheElement() );
break;
case Request.REMOVE:
cache.localRemove( request.getCacheElement().getKey() );
break;
case Request.REMOVE_ALL:
cache.localRemoveAll();
break;
default:
log.error( "Recieved unknown command" );
}
}
catch ( Exception e )
{
log.error( "Failed to process received JavaGroups message", e );
}
return null;
}
// ------------------------------------------- interface MembershipListener
public void viewAccepted( View view )
{
log.info( "View Changed: " + String.valueOf( view ) );
}
public void suspect( Address suspectedAddress ) { }
public void block() { }
// ---------------------------------------------------------- inner classes
/**
* Object for messages, wraps the command type (update, remove, or remove
* all) and original cache element to distribute.
*/
static class Request implements Serializable
{
public final static int UPDATE = 1;
public final static int REMOVE = 2;
public final static int REMOVE_ALL = 3;
public final static int GET = 5;
private ICacheElement cacheElement;
private int command;
public Request( ICacheElement cacheElement, int command )
{
this.cacheElement = cacheElement;
this.command = command;
}
public ICacheElement getCacheElement()
{
return cacheElement;
}
public int getCommand()
{
return command;
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: turbine-jcs-dev-unsubscribe@jakarta.apache.org
For additional commands, e-mail: turbine-jcs-dev-help@jakarta.apache.org