You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@turbine.apache.org by as...@apache.org on 2002/02/24 07:15:42 UTC
cvs commit: jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/lateral/javagroups/behavior ILateralCacheJGListener.java IJGConstants.java
asmuts 02/02/23 22:15:42
Added: src/java/org/apache/stratum/jcs/auxiliary/lateral/javagroups
LateralJGService.java LateralJGSender.java
LateralJGReceiverConnection.java
LateralJGReceiver.java
LateralGroupCacheJGListener.java
LateralCacheJGListener.java JGConnectionHolder.java
src/java/org/apache/stratum/jcs/auxiliary/lateral/javagroups/utils
JGSocketOpener.java JGRpcOpener.java
src/java/org/apache/stratum/jcs/auxiliary/lateral/javagroups/behavior
ILateralCacheJGListener.java IJGConstants.java
Log:
an interesting distribution system
some code problems, like hidden system outs
need the javagroups2.0.jar
Revision Changes Path
1.1 jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/lateral/javagroups/LateralJGService.java
Index: LateralJGService.java
===================================================================
package org.apache.stratum.jcs.auxiliary.lateral.javagroups;
/*
* The Apache Software License, Version 1.1
*
* Copyright (c) 2001 The Apache Software Foundation. All rights
* reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in
* the documentation and/or other materials provided with the
* distribution.
*
* 3. The end-user documentation included with the redistribution, if
* any, must include the following acknowlegement:
* "This product includes software developed by the
* Apache Software Foundation (http://www.apache.org/)."
* Alternately, this acknowlegement may appear in the software itself,
* if and wherever such third-party acknowlegements normally appear.
*
* 4. The names "The Jakarta Project", "Velocity", and "Apache Software
* Foundation" must not be used to endorse or promote products derived
* from this software without prior written permission. For written
* permission, please contact apache@apache.org.
*
* 5. Products derived from this software may not be called "Apache"
* nor may "Apache" appear in their names without prior written
* permission of the Apache Group.
*
* THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
* WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
* OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR
* ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
* USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
* ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
* OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
* OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*/
import java.io.*;
import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.IOException;
import java.io.Reader;
import java.io.Serializable;
import org.apache.stratum.jcs.auxiliary.lateral.LateralCacheAttributes;
import org.apache.stratum.jcs.auxiliary.lateral.LateralCacheInfo;
import org.apache.stratum.jcs.auxiliary.lateral.LateralElementDescriptor;
import org.apache.stratum.jcs.auxiliary.lateral.behavior.ILateralCacheAttributes;
import org.apache.stratum.jcs.auxiliary.lateral.behavior.ILateralCacheObserver;
import org.apache.stratum.jcs.auxiliary.lateral.behavior.ILateralCacheService;
import org.apache.stratum.jcs.engine.CacheElement;
import org.apache.stratum.jcs.engine.behavior.ICacheElement;
import org.apache.stratum.jcs.engine.behavior.ICacheListener;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogSource;
/**
* A lateral cache service implementation.
*
* @author <a href="mailto:asmuts@yahoo.com">Aaron Smuts</a>
* @created February 19, 2002
* @version $Id: LateralJGService.java,v 1.8 2002/02/17 07:16:24 asmuts Exp
* $
*/
public class LateralJGService
implements ILateralCacheService, ILateralCacheObserver
{
private final static Log log =
LogSource.getInstance( LateralJGService.class );
private ILateralCacheAttributes ilca;
private LateralJGSender sender;
/**
* Constructor for the LateralJGService object
*
* @param lca
* @exception IOException
*/
public LateralJGService( ILateralCacheAttributes lca )
throws IOException
{
this.ilca = lca;
try
{
log.debug( "creating sender" );
sender = new LateralJGSender( lca );
log.debug( "created sender" );
}
catch ( IOException e )
{
//log.error( "Could not create sender", e );
// This gets thrown over and over in recovery mode.
// The stack trace isn't useful here.
log.error( "Could not create sender to [" + lca.getUdpMulticastAddr() + "] -- " + e.getMessage() );
throw e;
}
}
// -------------------------------------------------------- Service Methods
/**
* @param item
* @exception IOException
*/
public void update( ICacheElement item )
throws IOException
{
update( item, LateralCacheInfo.listenerId );
}
/**
* @param item
* @param requesterId
* @exception IOException
*/
public void update( ICacheElement item, byte requesterId )
throws IOException
{
LateralElementDescriptor led = new LateralElementDescriptor( item );
led.requesterId = requesterId;
led.command = led.UPDATE;
sender.send( led );
}
/**
* @param cacheName
* @param key
* @exception IOException
*/
public void remove( String cacheName, Serializable key )
throws IOException
{
remove( cacheName, key, LateralCacheInfo.listenerId );
}
/**
* @param cacheName
* @param key
* @param requesterId
* @exception IOException
*/
public void remove( String cacheName, Serializable key, byte requesterId )
throws IOException
{
CacheElement ce = new CacheElement( cacheName, key, null );
LateralElementDescriptor led = new LateralElementDescriptor( ce );
led.requesterId = requesterId;
led.command = led.REMOVE;
sender.send( led );
}
/**
* @exception IOException
*/
public void release()
throws IOException
{
// nothing needs to be done
}
/**
* Will close the connection.
*
* @param cache
* @exception IOException
*/
public void dispose( String cache )
throws IOException
{
sender.dispose( cache );
}
/**
* @return
* @param key
* @exception IOException
*/
public Serializable get( String key )
throws IOException
{
//p( "junk get" );
//return get( cattr.cacheName, key, true );
return null;
// nothing needs to be done
}
/**
* @return
* @param cacheName
* @param key
* @exception IOException
*/
public Serializable get( String cacheName, Serializable key )
throws IOException
{
//p( "get(cacheName,key)" );
return get( cacheName, key, true );
// nothing needs to be done
}
/**
* An expiremental get implementation. By default it should be off.
*
* @return
* @param cacheName
* @param key
* @param container
* @exception IOException
*/
public Serializable get( String cacheName, Serializable key, boolean container )
throws IOException
{
//p( "get(cacheName,key,container)" );
CacheElement ce = new CacheElement( cacheName, key, null );
LateralElementDescriptor led = new LateralElementDescriptor( ce );
//led.requesterId = requesterId; // later
led.command = led.GET;
return sender.sendAndReceive( led );
//return null;
// nothing needs to be done
}
/**
* @param cacheName
* @exception IOException
*/
public void removeAll( String cacheName )
throws IOException
{
removeAll( cacheName, LateralCacheInfo.listenerId );
}
/**
* @param cacheName
* @param requesterId
* @exception IOException
*/
public void removeAll( String cacheName, byte requesterId )
throws IOException
{
CacheElement ce = new CacheElement( cacheName, "ALL", null );
LateralElementDescriptor led = new LateralElementDescriptor( ce );
led.requesterId = requesterId;
led.command = led.REMOVEALL;
sender.send( led );
}
/**
* @param args
*/
public static void main( String args[] )
{
try
{
LateralJGSender sender =
new LateralJGSender( new LateralCacheAttributes() );
// process user input till done
boolean notDone = true;
String message = null;
// wait to dispose
BufferedReader br =
new BufferedReader( new InputStreamReader( System.in ) );
while ( notDone )
{
System.out.println( "enter mesage:" );
message = br.readLine();
CacheElement ce = new CacheElement( "test", "test", message );
LateralElementDescriptor led = new LateralElementDescriptor( ce );
sender.send( led );
}
}
catch ( Exception e )
{
System.out.println( e.toString() );
}
}
// ILateralCacheObserver methods, do nothing here since
// the connection is not registered, the udp service is
// is not registered.
/**
* @param cacheName The feature to be added to the CacheListener attribute
* @param obj The feature to be added to the CacheListener attribute
* @exception IOException
*/
public void addCacheListener( String cacheName, ICacheListener obj )
throws IOException
{
// Empty
}
/**
* @param obj The feature to be added to the CacheListener attribute
* @exception IOException
*/
public void addCacheListener( ICacheListener obj )
throws IOException
{
// Empty
}
/**
* @param cacheName
* @param obj
* @exception IOException
*/
public void removeCacheListener( String cacheName, ICacheListener obj )
throws IOException
{
// Empty
}
/**
* @param obj
* @exception IOException
*/
public void removeCacheListener( ICacheListener obj )
throws IOException
{
// Empty
}
}
1.1 jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/lateral/javagroups/LateralJGSender.java
Index: LateralJGSender.java
===================================================================
package org.apache.stratum.jcs.auxiliary.lateral.javagroups;
import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.io.Reader;
import java.io.Serializable;
import java.util.Vector;
import java.util.Iterator;
import java.net.InetAddress;
import java.net.Socket;
import org.apache.stratum.jcs.auxiliary.lateral.LateralCacheInfo;
import org.apache.stratum.jcs.auxiliary.lateral.LateralElementDescriptor;
import org.apache.stratum.jcs.auxiliary.lateral.behavior.ILateralCacheAttributes;
import org.apache.stratum.jcs.auxiliary.lateral.LateralCacheAttributes;
import org.apache.stratum.jcs.auxiliary.lateral.javagroups.utils.JGSocketOpener;
import org.apache.stratum.jcs.auxiliary.lateral.javagroups.utils.JGRpcOpener;
import org.apache.stratum.jcs.auxiliary.lateral.javagroups.behavior.IJGConstants;
import org.javagroups.JChannel;
import org.javagroups.Channel;
import org.javagroups.Message;
import org.javagroups.blocks.RpcDispatcher;
import org.javagroups.util.RspList;
import org.javagroups.blocks.GroupRequest;
import org.apache.stratum.jcs.engine.CacheElement;
import org.apache.stratum.jcs.engine.behavior.ICacheElement;
import org.apache.stratum.jcs.auxiliary.lateral.javagroups.behavior.ILateralCacheJGListener;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogSource;
/**
* This class is based on the log4j SocketAppender class. I'm using a differnet
* repair structure, so it is significant;y different.
*
* @author <a href="mailto:asmuts@yahoo.com">Aaron Smuts</a>
* @created January 15, 2002
* @version $Id: LateralJGSender.java,v 1.1 2002/02/24 06:15:42 asmuts Exp $
*/
public class LateralJGSender implements IJGConstants
{
private final static Log log =
LogSource.getInstance( LateralJGSender.class );
private ILateralCacheAttributes ilca;
private String remoteHost;
private InetAddress address;
int port = 1111;
private Channel javagroups;
private RpcDispatcher disp;
//private ObjectOutputStream oos;
//private Socket socket;
int counter = 0;
/**
* Only block for 5 seconds before timing out on startup.
*/
private final static int openTimeOut = 5000;
/**
* Constructor for the LateralJGSender object
*
* @param lca
* @exception IOException
*/
public LateralJGSender( ILateralCacheAttributes lca )
throws IOException
{
this.ilca = lca;
init( lca.getUdpMulticastAddr(), lca.getUdpMulticastPort() );
}
/**
* Description of the Method
*
* @param host
* @param port
* @exception IOException
*/
protected void init( String host, int port )
throws IOException
{
this.port = port;
this.address = getAddressByName( host );
this.remoteHost = host;
try
{
log.debug( "Attempting connection to " + address.getHostName() );
//socket = new Socket( address, port );
JGConnectionHolder holder = JGConnectionHolder.getInstance(ilca);
javagroups = holder.getChannel();
disp = holder.getDispatcher();
if ( javagroups == null )
{
throw new IOException( "javagroups is null" );
}
}
catch ( java.net.ConnectException e )
{
log.debug( "Remote host " + address.getHostName() + " refused connection." );
throw e;
}
catch ( Exception e )
{
log.debug( "Could not connect to " + address.getHostName() +
". Exception is " + e );
throw new IOException(e.getMessage());
}
}
// end constructor
/**
* Gets the addressByName attribute of the LateralJGSender object
*
* @return The addressByName value
* @param host
*/
private InetAddress getAddressByName( String host )
{
try
{
return InetAddress.getByName( host );
}
catch ( Exception e )
{
log.error( "Could not find address of [" + host + "]", e );
return null;
}
}
/**
* Sends commands to the lateral cache listener.
*
* @param led
* @exception IOException
*/
public void send( LateralElementDescriptor led )
throws IOException
{
log.debug( "sending LateralElementDescriptor" );
if ( led == null )
{
return;
}
if ( address == null )
{
throw new IOException( "No remote host is set for LateralJGSender." );
//return;
}
// if ( oos != null )
// {
try
{
Message send_msg = new Message( null, null, led );
javagroups.send( send_msg );
// oos.writeObject( led );
// oos.flush();
// if ( ++counter >= RESET_FREQUENCY )
// {
// counter = 0;
// // Failing to reset the object output stream every now and
// // then creates a serious memory leak.
// log.info( "Doing oos.reset()" );
// oos.reset();
// }
// }
// catch ( IOException e )
// {
// //oos = null;
// log.error( "Detected problem with connection: " + e );
// throw e;
}
catch ( Exception e )
{
log.error( "Detected problem with connection: " + e );
throw new IOException( e.getMessage() );
}
// }
}
/**
* Sends commands to the lateral cache listener and gets a response. I'm
* afraid that we could get into a pretty bad blocking situation here. This
* needs work. I just wanted to get some form of get working. Will need some
* sort of timeout.
*
* @return
* @param led
* @exception IOException
*/
public Serializable sendAndReceive( LateralElementDescriptor led )
throws IOException
{
ICacheElement ice = null;
log.debug( "sendAndReceive led" );
if ( led == null )
{
return null;
}
if ( address == null )
{
throw new IOException( "No remote host is set for LateralJGSender." );
//return;
}
// if ( oos != null )
// {
try
{
try
{
RspList rsp_list = disp.callRemoteMethods( null, "handleGet", (String)led.ce.getCacheName(), (Serializable)led.ce.getKey(),
GroupRequest.GET_ALL, 1000 );
log.debug( "rsp_list = " + rsp_list );
Vector vec = rsp_list.getResults();
log.debug( "rsp_list size = " + vec.size() );
Iterator it = vec.iterator();
while ( it.hasNext() )
{
ice = ( ICacheElement ) it.next();
if ( ice != null )
{
break;
}
}
}
catch ( Exception e )
{
log.error( e );
}
}
catch ( Exception e )
{
log.error( "Detected problem with connection: " + e );
throw new IOException( e.getMessage() );
}
// }
return ice;
}// end sendAndReceive
// Service Methods //
/**
* Description of the Method
*
* @param item
* @param requesterId
* @exception IOException
*/
public void update( ICacheElement item, byte requesterId )
throws IOException
{
LateralElementDescriptor led = new LateralElementDescriptor( item );
led.requesterId = requesterId;
led.command = led.UPDATE;
send( led );
}
/**
* Description of the Method
*
* @param cacheName
* @param key
* @exception IOException
*/
public void remove( String cacheName, Serializable key )
throws IOException
{
remove( cacheName, key, LateralCacheInfo.listenerId );
}
/**
* Description of the Method
*
* @param cacheName
* @param key
* @param requesterId
* @exception IOException
*/
public void remove( String cacheName, Serializable key, byte requesterId )
throws IOException
{
CacheElement ce = new CacheElement( cacheName, key, null );
LateralElementDescriptor led = new LateralElementDescriptor( ce );
led.requesterId = requesterId;
led.command = led.REMOVE;
send( led );
}
/**
* Description of the Method
*
* @exception IOException
*/
public void release()
throws IOException
{
// nothing needs to be done
}
/**
* Closes connection used by all LateralJGSenders for this lateral
* conneciton. Dispose request should come into the facade and be sent to
* all lateral cache sevices. The lateral cache service will then call this
* method.
*
* @param cache
* @exception IOException
*/
public void dispose( String cache )
throws IOException
{
// WILL CLOSE CONNECTION USED BY ALL
//oos.close();
//javagroups.
}
/**
* Description of the Method
*
* @param cacheName
* @exception IOException
*/
public void removeAll( String cacheName )
throws IOException
{
removeAll( cacheName, LateralCacheInfo.listenerId );
}
/**
* Description of the Method
*
* @param cacheName
* @param requesterId
* @exception IOException
*/
public void removeAll( String cacheName, byte requesterId )
throws IOException
{
CacheElement ce = new CacheElement( cacheName, "ALL", null );
LateralElementDescriptor led = new LateralElementDescriptor( ce );
led.requesterId = requesterId;
led.command = led.REMOVEALL;
send( led );
}
/**
* Description of the Method
*
* @param args
*/
public static void main( String args[] )
{
try
{
LateralJGSender lur = null;
LateralCacheAttributes lca = new LateralCacheAttributes();
lca.setHttpServer( "localhost:8181" );
lur = new LateralJGSender( lca );
// process user input till done
boolean notDone = true;
String message = null;
// wait to dispose
BufferedReader br = new BufferedReader( new InputStreamReader( System.in ) );
while ( notDone )
{
System.out.println( "enter mesage:" );
message = br.readLine();
CacheElement ce = new CacheElement( "test", "test", message );
LateralElementDescriptor led = new LateralElementDescriptor( ce );
lur.send( led );
}
}
catch ( Exception e )
{
System.out.println( e.toString() );
}
}
}
// end class
1.1 jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/lateral/javagroups/LateralJGReceiverConnection.java
Index: LateralJGReceiverConnection.java
===================================================================
package org.apache.stratum.jcs.auxiliary.lateral.javagroups;
/*
* The Apache Software License, Version 1.1
*
* Copyright (c) 2001 The Apache Software Foundation. All rights
* reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in
* the documentation and/or other materials provided with the
* distribution.
*
* 3. The end-user documentation included with the redistribution, if
* any, must include the following acknowlegement:
* "This product includes software developed by the
* Apache Software Foundation (http://www.apache.org/)."
* Alternately, this acknowlegement may appear in the software itself,
* if and wherever such third-party acknowlegements normally appear.
*
* 4. The names "The Jakarta Project", "Velocity", and "Apache Software
* Foundation" must not be used to endorse or promote products derived
* from this software without prior written permission. For written
* permission, please contact apache@apache.org.
*
* 5. Products derived from this software may not be called "Apache"
* nor may "Apache" appear in their names without prior written
* permission of the Apache Group.
*
* THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
* WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
* OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR
* ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
* USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
* ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
* OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
* OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*/
import java.io.InputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.net.Socket;
import java.util.Vector;
import org.apache.stratum.jcs.auxiliary.lateral.LateralCacheInfo;
import org.apache.stratum.jcs.auxiliary.lateral.LateralElementDescriptor;
import org.apache.stratum.jcs.auxiliary.lateral.javagroups.behavior.ILateralCacheJGListener;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogSource;
import org.javagroups.Channel;
import org.javagroups.JChannel;
import org.javagroups.Message;
/**
* Separate thread run when a command comes into the LateralJGReceiver.
*
* @author <a href="mailto:asmuts@yahoo.com">Aaron Smuts</a>
* @version $Id: LateralJGReceiverConnection.java,v 1.7 2002/02/15 04:33:37
* jtaylor Exp $
*/
public class LateralJGReceiverConnection implements Runnable
{
private final static Log log =
LogSource.getInstance( LateralJGReceiverConnection.class );
private Channel javagroups;
private Message mes;
private ILateralCacheJGListener ilcl;
private int puts = 0;
// /**
// * Constructor for the LateralJGReceiverConnection object
// *
// * @param socket
// * @param ilcl
// */
/**
* Constructor for the LateralJGReceiverConnection object
*
* @param ilcl
*/
public LateralJGReceiverConnection( Message mes, ILateralCacheJGListener ilcl )
{
this.ilcl = ilcl;
this.mes = mes;
// this.javagroups = javagroups;
}
// this.socket = socket;
// this.led = led;
//
// try
// {
// ois = new ObjectInputStream( socket.getInputStream() );
// }
// catch ( Exception e )
// {
// log.error( "Could not open ObjectInputStream to " + socket, e );
// }
// }
/**
* Main processing method for the LateralJGReceiverConnection object
*
* @return
* @param led
*/
public void run( )
{
Serializable obj = null;
// Object obj;
try
{
// while ( true )
// {
// obj = ois.readObject();
// LateralElementDescriptor led = ( LateralElementDescriptor ) obj;
LateralElementDescriptor led = ( LateralElementDescriptor ) mes.getObject();
if ( led == null )
{
log.debug( "LateralElementDescriptor is null" );
//continue;
}
if ( led.requesterId == LateralCacheInfo.listenerId )
{
log.debug( "from self" );
}
else
{
if ( log.isDebugEnabled() )
{
log.debug( "receiving LateralElementDescriptor from another, led = "
+ ", led = " + led
+ ", led.command = " + led.command
+ ", led.ce = " + led.ce
+ ", ilcl = " + ilcl );
}
if ( led.command == led.UPDATE )
{
puts++;
if ( log.isDebugEnabled() )
{
if ( puts % 100 == 0 )
{
log.debug( "puts = " + puts );
}
}
ilcl.handlePut( led.ce );
}
else
if ( led.command == led.REMOVE )
{
ilcl.handleRemove( led.ce.getCacheName(), led.ce.getKey() );
}
else
if ( led.command == led.GET )
{
obj = getAndRespond( led.ce.getCacheName(), led.ce.getKey() );
//ilcl.handleGet( led.ce.getCacheName(), led.ce.getKey() );
}
}
// }
}
catch ( java.io.EOFException e )
{
log.info( "Caught java.io.EOFException closing conneciton." );
}
catch ( java.net.SocketException e )
{
log.info( "Caught java.net.SocketException closing conneciton." );
}
catch ( Exception e )
{
log.error( "Unexpected exception. Closing conneciton", e );
}
// try
// {
// ois.close();
// }
// catch ( Exception e )
// {
// log.error( "Could not close connection", e );
// }
//return obj;
}
/**
* Send back the object if found.
*
* @return The {3} value
* @param cacheName
* @param key
* @exception Exception
*/
private Serializable getAndRespond( String cacheName, Serializable key )
throws Exception
{
Serializable obj = ilcl.handleGet( cacheName, key );
if ( log.isDebugEnabled() )
{
log.debug( "obj = " + obj );
}
// ObjectOutputStream oos = new ObjectOutputStream( socket.getOutputStream() );
//
// if ( oos != null )
// {
// try
// {
// oos.writeObject( obj );
// oos.flush();
// }
// catch ( IOException e )
// {
// oos = null;
// log.error( "Detected problem with connection", e );
// throw e;
// }
// }
return obj;
}
}
1.1 jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/lateral/javagroups/LateralJGReceiver.java
Index: LateralJGReceiver.java
===================================================================
package org.apache.stratum.jcs.auxiliary.lateral.javagroups;
/*
* The Apache Software License, Version 1.1
*
* Copyright (c) 2001 The Apache Software Foundation. All rights
* reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in
* the documentation and/or other materials provided with the
* distribution.
*
* 3. The end-user documentation included with the redistribution, if
* any, must include the following acknowlegement:
* "This product includes software developed by the
* Apache Software Foundation (http://www.apache.org/)."
* Alternately, this acknowlegement may appear in the software itself,
* if and wherever such third-party acknowlegements normally appear.
*
* 4. The names "The Jakarta Project", "Velocity", and "Apache Software
* Foundation" must not be used to endorse or promote products derived
* from this software without prior written permission. For written
* permission, please contact apache@apache.org.
*
* 5. Products derived from this software may not be called "Apache"
* nor may "Apache" appear in their names without prior written
* permission of the Apache Group.
*
* THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
* WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
* OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR
* ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
* USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
* ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
* OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
* OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*/
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.io.IOException;
import org.apache.stratum.jcs.auxiliary.lateral.behavior.ILateralCacheAttributes;
import org.apache.stratum.jcs.auxiliary.lateral.javagroups.behavior.ILateralCacheJGListener;
import org.apache.stratum.jcs.auxiliary.lateral.javagroups.behavior.IJGConstants;
import org.apache.stratum.jcs.auxiliary.lateral.javagroups.utils.JGSocketOpener;
import org.apache.stratum.jcs.auxiliary.lateral.javagroups.utils.JGRpcOpener;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogSource;
import org.javagroups.JChannel;
import org.javagroups.Channel;
import org.javagroups.Message;
import org.javagroups.blocks.RpcDispatcher;
import org.javagroups.ChannelNotConnectedException;
/**
* Processes commands from the server socket.
*
* @author <a href="mailto:asmuts@yahoo.com">Aaron Smuts</a>
* @created January 15, 2002
* @version $Id: LateralJGReceiver.java,v 1.1 2002/02/24 06:15:42 asmuts Exp $
*/
public class LateralJGReceiver implements IJGConstants, Runnable
{
private final static Log log =
LogSource.getInstance( LateralJGReceiver.class );
private int port;
private ILateralCacheJGListener ilcl;
private ILateralCacheAttributes ilca;
/**
* How long the server will block on an accept(). 0 is infinte.
*/
private final static int sTimeOut = 5000;
/**
* Main processing method for the LateralJGReceiver object
*/
public void run()
{
try
{
if ( log.isDebugEnabled() )
{
log.debug( "Listening on port " + port );
}
JGConnectionHolder holder = JGConnectionHolder.getInstance(ilca);
Channel javagroups = holder.getChannel();
RpcDispatcher disp = holder.getDispatcher();
if ( javagroups == null )
{
log.error( "JavaGroups is null" );
throw new IOException( "javagroups is null" );
}
while ( true )
{
if ( log.isDebugEnabled() )
{
log.debug( "Wating for messages." );
}
Message mes = null;
try
{
Object obj = javagroups.receive( 0 );
if ( obj != null && obj instanceof org.javagroups.Message )
{
mes = ( Message ) obj;
log.info( "Starting new socket node." );
new Thread( new LateralJGReceiverConnection( mes, ilcl ) ).start();
}
else
{
if ( log.isDebugEnabled() )
{
log.debug( obj );
}
}
}
catch ( ChannelNotConnectedException cnce )
{
log.warn(cnce);
// this will cycle unitl connected and eat up the processor
// need to throw out and recover
// seems to periodically require about 50 tries.
}
catch ( Exception e )
{
log.error( "problem receiving", e );
}
//InetAddress inetAddress = javagroups..getInetAddress();
//if ( log.isDebugEnabled() )
//{
// log.debug( "Connected to client at " + inetAddress );
//}
//log.info( "Connected to client at " + inetAddress );
}
}
catch ( Exception e )
{
log.error( "Major intialization problem", e );
}
}
/**
* Constructor for the LateralJGReceiver object
*
* @param lca
* @param ilcl
*/
public LateralJGReceiver( ILateralCacheAttributes ilca, ILateralCacheJGListener ilcl )
{
this.port = ilca.getTcpListenerPort();
this.ilcl = ilcl;
this.ilca = ilca;
if ( log.isDebugEnabled() )
{
log.debug( "ilcl = " + ilcl );
}
}
}
1.1 jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/lateral/javagroups/LateralGroupCacheJGListener.java
Index: LateralGroupCacheJGListener.java
===================================================================
package org.apache.stratum.jcs.auxiliary.lateral.javagroups;
/*
* The Apache Software License, Version 1.1
*
* Copyright (c) 2001 The Apache Software Foundation. All rights
* reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in
* the documentation and/or other materials provided with the
* distribution.
*
* 3. The end-user documentation included with the redistribution, if
* any, must include the following acknowlegement:
* "This product includes software developed by the
* Apache Software Foundation (http://www.apache.org/)."
* Alternately, this acknowlegement may appear in the software itself,
* if and wherever such third-party acknowlegements normally appear.
*
* 4. The names "The Jakarta Project", "Velocity", and "Apache Software
* Foundation" must not be used to endorse or promote products derived
* from this software without prior written permission. For written
* permission, please contact apache@apache.org.
*
* 5. Products derived from this software may not be called "Apache"
* nor may "Apache" appear in their names without prior written
* permission of the Apache Group.
*
* THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
* WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
* OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR
* ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
* USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
* ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
* OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
* OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*/
import org.apache.stratum.jcs.auxiliary.lateral.behavior.ILateralCacheAttributes;
import org.apache.stratum.jcs.auxiliary.lateral.behavior.ILateralCacheListener;
import org.apache.stratum.jcs.auxiliary.lateral.javagroups.behavior.ILateralCacheJGListener;
import org.apache.stratum.jcs.engine.behavior.ICompositeCacheManager;
import org.apache.stratum.jcs.engine.behavior.ICacheType;
import org.apache.stratum.jcs.engine.control.group.GroupCacheManager;
import org.apache.stratum.jcs.engine.control.group.GroupCacheManager;
import org.apache.stratum.jcs.engine.control.group.GroupCacheManager;
import org.apache.stratum.jcs.engine.control.group.GroupCacheManagerFactory;
import org.apache.stratum.jcs.engine.control.group.GroupCacheManagerFactory;
import org.apache.stratum.jcs.engine.control.group.GroupCacheManagerFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogSource;
/**
* @author <a href="mailto:asmuts@yahoo.com">Aaron Smuts</a>
* @created January 15, 2002
* @version $Id: LateralGroupCacheJGListener.java,v 1.6 2002/02/15 04:33:37
* jtaylor Exp $
*/
public class LateralGroupCacheJGListener
extends LateralCacheJGListener
implements ILateralCacheJGListener
{
private final static Log log =
LogSource.getInstance( LateralGroupCacheJGListener.class );
/**
* Constructor for the LateralGroupCacheJGListener object
*
* @param ilca
*/
protected LateralGroupCacheJGListener( ILateralCacheAttributes ilca )
{
super( ilca );
log.debug( "creating LateralGroupCacheJGListener" );
}
/**
* Gets the instance attribute of the LateralGroupCacheJGListener class
*
* @return The instance value
*/
public static ILateralCacheListener getInstance( ILateralCacheAttributes ilca )
{
//throws IOException, NotBoundException
ILateralCacheListener ins = ( ILateralCacheListener ) instances.get( String.valueOf( ilca.getUdpMulticastAddr() ) );
if ( ins == null )
{
synchronized ( LateralGroupCacheJGListener.class )
{
if ( ins == null )
{
ins = new LateralGroupCacheJGListener( ilca );
ins.init();
}
if ( log.isDebugEnabled() )
{
log.debug( "created new listener " + ilca.getUdpMulticastAddr() );
}
instances.put( String.valueOf( ilca.getUdpMulticastAddr() ), ins );
}
}
return ins;
}
// override for new funcitonality
// lazy init is too slow, find a better way
/**
* Gets the cacheManager attribute of the LateralGroupCacheJGListener
* object
*/
protected void getCacheManager()
{
try
{
if ( cacheMgr == null )
{
cacheMgr = ( ICompositeCacheManager ) GroupCacheManagerFactory.getInstance();
if ( log.isDebugEnabled() )
{
log.debug( " groupcache cacheMgr = " + cacheMgr );
}
}
else
{
if ( log.isDebugEnabled() )
{
log.debug( "already got groupcache cacheMgr = " + cacheMgr );
}
}
}
catch ( Exception e )
{
log.error( e );
}
}
}
1.1 jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/lateral/javagroups/LateralCacheJGListener.java
Index: LateralCacheJGListener.java
===================================================================
package org.apache.stratum.jcs.auxiliary.lateral.javagroups;
/*
* The Apache Software License, Version 1.1
*
* Copyright (c) 2001 The Apache Software Foundation. All rights
* reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in
* the documentation and/or other materials provided with the
* distribution.
*
* 3. The end-user documentation included with the redistribution, if
* any, must include the following acknowlegement:
* "This product includes software developed by the
* Apache Software Foundation (http://www.apache.org/)."
* Alternately, this acknowlegement may appear in the software itself,
* if and wherever such third-party acknowlegements normally appear.
*
* 4. The names "The Jakarta Project", "Velocity", and "Apache Software
* Foundation" must not be used to endorse or promote products derived
* from this software without prior written permission. For written
* permission, please contact apache@apache.org.
*
* 5. Products derived from this software may not be called "Apache"
* nor may "Apache" appear in their names without prior written
* permission of the Apache Group.
*
* THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
* WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
* OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR
* ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
* USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
* ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
* OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
* OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*/
import java.io.IOException;
import java.io.Serializable;
import java.util.HashMap;
import org.apache.stratum.jcs.auxiliary.lateral.LateralCacheInfo;
import org.apache.stratum.jcs.auxiliary.lateral.behavior.ILateralCacheAttributes;
import org.apache.stratum.jcs.auxiliary.lateral.behavior.ILateralCacheListener;
import org.apache.stratum.jcs.auxiliary.lateral.javagroups.behavior.ILateralCacheJGListener;
import org.apache.stratum.jcs.engine.behavior.ICache;
import org.apache.stratum.jcs.engine.behavior.ICacheElement;
import org.apache.stratum.jcs.engine.behavior.ICompositeCache;
import org.apache.stratum.jcs.engine.behavior.ICompositeCacheManager;
import org.apache.stratum.jcs.engine.control.CacheManagerFactory;
import org.apache.stratum.jcs.engine.control.CompositeCacheManager;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogSource;
/**
* Description of the Class
*
* @author <a href="mailto:asmuts@yahoo.com">Aaron Smuts</a>
* @created January 15, 2002
* @version $Id: LateralCacheJGListener.java,v 1.8 2002/02/15 04:33:37 jtaylor
* Exp $
*/
public class LateralCacheJGListener implements ILateralCacheJGListener, Serializable
{
private final static Log log =
LogSource.getInstance( LateralCacheJGListener.class );
/**
* Description of the Field
*/
protected static transient ICompositeCacheManager cacheMgr;
/**
* Description of the Field
*/
protected final static HashMap instances = new HashMap();
// instance vars
private LateralJGReceiver receiver;
private ILateralCacheAttributes ilca;
private boolean inited = false;
private int puts = 0;
/**
* Only need one since it does work for all regions, just reference by
* multiple region names.
*
* @param ilca
*/
protected LateralCacheJGListener( ILateralCacheAttributes ilca )
{
this.ilca = ilca;
}
/**
* Description of the Method
*/
public void init()
{
try
{
// need to connect based on type
//ILateralCacheListener ilcl = this;
//p( "in init, ilcl = " + ilcl );
receiver = new LateralJGReceiver( ilca, this );
Thread t = new Thread( receiver );
t.start();
}
catch ( Exception ex )
{
log.error( ex );
throw new IllegalStateException( ex.getMessage() );
}
inited = true;
}
/**
* let the lateral cache set a listener_id. Since there is only one
* listerenr for all the regions and every region gets registered? the id
* shouldn't be set if it isn't zero. If it is we assume that it is a
* reconnect.
*
* @param id The new listenerId value
* @exception IOException
*/
public void setListenerId( byte id )
throws IOException
{
LateralCacheInfo.listenerId = id;
if ( log.isDebugEnabled() )
{
log.debug( "set listenerId = " + id );
}
}
/**
* Gets the listenerId attribute of the LateralCacheJGListener object
*
* @return The listenerId value
* @exception IOException
*/
public byte getListenerId()
throws IOException
{
// set the manager since we are in use
//getCacheManager();
//p( "get listenerId" );
if ( log.isDebugEnabled() )
{
log.debug( "get listenerId = " + LateralCacheInfo.listenerId );
}
return LateralCacheInfo.listenerId;
}
/**
* Gets the instance attribute of the LateralCacheJGListener class
*
* @return The instance value
* @param ilca
*/
public static ILateralCacheListener getInstance( ILateralCacheAttributes ilca )
{
//throws IOException, NotBoundException
ILateralCacheListener ins = ( ILateralCacheListener ) instances.get( String.valueOf( ilca.getUdpMulticastAddr() ) );
if ( ins == null )
{
synchronized ( LateralCacheJGListener.class )
{
if ( ins == null )
{
ins = new LateralCacheJGListener( ilca );
ins.init();
}
if ( log.isDebugEnabled() )
{
log.debug( "created new listener " + ilca.getUdpMulticastAddr() );
}
instances.put( String.valueOf( ilca.getUdpMulticastAddr() ), ins );
}
}
return ins;
}
//////////////////////////// implements the ILateralCacheListener interface. //////////////
/**
* @param cb
* @exception IOException
*/
public void handlePut( ICacheElement cb )
throws IOException
{
if ( log.isDebugEnabled() )
{
log.debug( "PUTTING ELEMENT FROM LATERAL" );
}
getCacheManager();
ICompositeCache cache = ( ICompositeCache ) cacheMgr.getCache( cb.getCacheName() );
cache.update( cb, ICache.REMOTE_INVOKATION );
puts++;
if ( puts % 100 == 0 )
{
log.info( "puts = " + puts );
}
//handleRemove(cb.getCacheName(), cb.getKey());
}
/**
* Description of the Method
*
* @param cacheName
* @param key
* @exception IOException
*/
public void handleRemove( String cacheName, Serializable key )
throws IOException
{
if ( log.isDebugEnabled() )
{
log.debug( "handleRemove> cacheName=" + cacheName + ", key=" + key );
}
getCacheManager();
// interface limitation here
ICompositeCache cache = ( ICompositeCache ) cacheMgr.getCache( cacheName );
cache.remove( key, ICache.REMOTE_INVOKATION );
}
/**
* Description of the Method
*
* @param cacheName
* @exception IOException
*/
public void handleRemoveAll( String cacheName )
throws IOException
{
if ( log.isDebugEnabled() )
{
log.debug( "handleRemoveAll> cacheName=" + cacheName );
}
getCacheManager();
ICache cache = cacheMgr.getCache( cacheName );
cache.removeAll();
}
/**
* Test get implementation.
*
* @return
* @param cacheName
* @param key
* @exception IOException
*/
public Serializable handleGet( String cacheName, Serializable key )
throws IOException
{
if ( log.isDebugEnabled() )
{
log.debug( "handleGet> cacheName=" + cacheName + ", key = " + key );
}
getCacheManager();
ICompositeCache cache = ( ICompositeCache ) cacheMgr.getCache( cacheName );
// get container
return cache.get( key, true, ICache.REMOTE_INVOKATION );
}
/**
* Description of the Method
*
* @param cacheName
* @exception IOException
*/
public void handleDispose( String cacheName )
throws IOException
{
if ( log.isDebugEnabled() )
{
log.debug( "handleDispose> cacheName=" + cacheName );
}
CompositeCacheManager cm = ( CompositeCacheManager ) cacheMgr;
cm.freeCache( cacheName, ICache.REMOTE_INVOKATION );
}
// override for new funcitonality
/**
* Gets the cacheManager attribute of the LateralCacheJGListener object
*/
protected void getCacheManager()
{
if ( cacheMgr == null )
{
cacheMgr = ( ICompositeCacheManager ) CacheManagerFactory.getInstance();
if ( log.isDebugEnabled() )
{
log.debug( "cacheMgr = " + cacheMgr );
}
}
else
{
if ( log.isDebugEnabled() )
{
log.debug( "already got cacheMgr = " + cacheMgr );
}
}
}
}
1.1 jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/lateral/javagroups/JGConnectionHolder.java
Index: JGConnectionHolder.java
===================================================================
package org.apache.stratum.jcs.auxiliary.lateral.javagroups;
/*
* The Apache Software License, Version 1.1
*
* Copyright (c) 2001 The Apache Software Foundation. All rights
* reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in
* the documentation and/or other materials provided with the
* distribution.
*
* 3. The end-user documentation included with the redistribution, if
* any, must include the following acknowlegement:
* "This product includes software developed by the
* Apache Software Foundation (http://www.apache.org/)."
* Alternately, this acknowlegement may appear in the software itself,
* if and wherever such third-party acknowlegements normally appear.
*
* 4. The names "The Jakarta Project", "Velocity", and "Apache Software
* Foundation" must not be used to endorse or promote products derived
* from this software without prior written permission. For written
* permission, please contact apache@apache.org.
*
* 5. Products derived from this software may not be called "Apache"
* nor may "Apache" appear in their names without prior written
* permission of the Apache Group.
*
* THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
* WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
* OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR
* ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
* USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
* ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
* OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
* OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*/
import java.util.HashMap;
import java.io.IOException;
import org.javagroups.JChannel;
import org.javagroups.Channel;
import org.javagroups.Message;
import org.javagroups.blocks.RpcDispatcher;
import org.javagroups.util.RspList;
import org.javagroups.blocks.GroupRequest;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogSource;
import org.apache.stratum.jcs.auxiliary.lateral.javagroups.behavior.ILateralCacheJGListener;
import org.apache.stratum.jcs.auxiliary.lateral.behavior.ILateralCacheAttributes;
import org.apache.stratum.jcs.auxiliary.lateral.javagroups.behavior.IJGConstants;
import org.apache.stratum.jcs.auxiliary.lateral.javagroups.utils.JGRpcOpener;
import org.apache.stratum.jcs.auxiliary.lateral.javagroups.utils.JGSocketOpener;
/**
* Description of the Class
*/
public class JGConnectionHolder
{
private final static Log log =
LogSource.getInstance( JGConnectionHolder.class );
private Channel jg;
private RpcDispatcher disp;
private ILateralCacheAttributes ilca;
/**
* Description of the Field
*/
protected final static HashMap instances = new HashMap();
/**
* Gets the instance attribute of the LateralGroupCacheJGListener class
*
* @return The instance value
* @param ilca
*/
public static JGConnectionHolder getInstance( ILateralCacheAttributes ilca )
{
//throws IOException, NotBoundException
JGConnectionHolder ins = ( JGConnectionHolder ) instances.get( String.valueOf( ilca.getUdpMulticastAddr() ) );
try
{
if ( ins == null )
{
synchronized ( JGConnectionHolder.class )
{
if ( ins == null )
{
ins = new JGConnectionHolder( ilca );
}
if ( log.isDebugEnabled() )
{
log.debug( "created new listener " + ilca.getUdpMulticastAddr() );
}
instances.put( String.valueOf( ilca.getUdpMulticastAddr() ), ins );
}
}
}
catch ( Exception e )
{
log.error( "trouble intializing", e );
}
return ins;
}
/**
* Constructor for the JGConnectionHolder object
*
* @param lca
* @param ilca
*/
private JGConnectionHolder( ILateralCacheAttributes ilca )
{
this.ilca = ilca;
}
/**
* Gets the {3} attribute of the JGConnectionHolder object
*
* @return The {3} value
* @exception IOException
*/
public Channel getChannel()
throws IOException
{
try
{
if ( jg == null )
{
synchronized ( JGConnectionHolder.class )
{
if ( jg == null )
{
jg = JGSocketOpener.openSocket( ilca, 5000, IJGConstants.DEFAULT_JG_GROUP_NAME );
}
}
}
}
catch ( Exception e )
{
log.error( "Problem getting channel", e );
}
return jg;
}
/**
* Gets the {3} attribute of the JGConnectionHolder object
*
* @return The {3} value
* @exception IOException
*/
public RpcDispatcher getDispatcher()
throws IOException
{
try
{
if ( disp == null )
{
synchronized ( JGConnectionHolder.class )
{
if ( disp == null )
{
disp = JGRpcOpener.openSocket( ( ILateralCacheJGListener ) LateralGroupCacheJGListener.getInstance( ilca ), ilca, 5000, IJGConstants.RPC_JG_GROUP_NAME );
}
}
}
}
catch ( Exception e )
{
log.error( e );
}
return disp;
}
}
1.1 jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/lateral/javagroups/utils/JGSocketOpener.java
Index: JGSocketOpener.java
===================================================================
package org.apache.stratum.jcs.auxiliary.lateral.javagroups.utils;
/*
* The Apache Software License, Version 1.1
*
* Copyright (c) 2001 The Apache Software Foundation. All rights
* reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in
* the documentation and/or other materials provided with the
* distribution.
*
* 3. The end-user documentation included with the redistribution, if
* any, must include the following acknowlegement:
* "This product includes software developed by the
* Apache Software Foundation (http://www.apache.org/)."
* Alternately, this acknowlegement may appear in the software itself,
* if and wherever such third-party acknowlegements normally appear.
*
* 4. The names "The Jakarta Project", "Velocity", and "Apache Software
* Foundation" must not be used to endorse or promote products derived
* from this software without prior written permission. For written
* permission, please contact apache@apache.org.
*
* 5. Products derived from this software may not be called "Apache"
* nor may "Apache" appear in their names without prior written
* permission of the Apache Group.
*
* THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
* WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
* OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR
* ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
* USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
* ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
* OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
* OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*/
import java.io.IOException;
import java.io.InterruptedIOException;
import org.javagroups.JChannel;
import org.javagroups.Channel;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogSource;
import org.apache.stratum.jcs.auxiliary.lateral.javagroups.behavior.IJGConstants;
import org.apache.stratum.jcs.auxiliary.lateral.behavior.ILateralCacheAttributes;
/**
* Socket openere that will timeout on the initial connect rather than block
* forever. Technique from core java II.
*
* @author <a href="mailto:asmuts@yahoo.com">Aaron Smuts</a>
* @created January 15, 2002
* @version $Id: JGSocketOpener.java,v 1.1 2002/02/24 06:15:42 asmuts Exp $
*/
public class JGSocketOpener implements Runnable
{
private final static Log log =
LogSource.getInstance( JGSocketOpener.class );
private ILateralCacheAttributes lca;
private Channel javagroups;
private String groupName;
/** Constructor for the SocketOpener object */
public static Channel openSocket( ILateralCacheAttributes lca, int timeOut, String groupName )
{
JGSocketOpener opener = new JGSocketOpener( lca, groupName );
Thread t = new Thread( opener );
t.start();
try
{
t.join( timeOut );
}
catch ( InterruptedException ire )
{
log.error(ire);
}
return opener.getSocket();
}
/**
* Constructor for the SocketOpener object
*
* @param host
* @param port
*/
public JGSocketOpener( ILateralCacheAttributes lca, String groupName )
{
this.javagroups = null;
this.lca = lca;
this.groupName = groupName;
}
/** Main processing method for the SocketOpener object */
public void run()
{
try
{
// make configurable
String props="UDP(mcast_addr=" + lca.getUdpMulticastAddr() + ";mcast_port=" + lca.getUdpMulticastPort()+ "):PING:FD:STABLE:NAKACK:UNICAST:" +
"FRAG:FLUSH:GMS:VIEW_ENFORCER:STATE_TRANSFER:QUEUE";
javagroups = new JChannel(props);
javagroups.setOpt(javagroups.LOCAL, Boolean.FALSE);
// could have a channel per region
//javagroups.connect(IJGConstants.DEFAULT_JG_GROUP_NAME);
javagroups.connect(groupName);
}
catch ( Exception e )
{
log.error(e);
}
}
/** Gets the socket attribute of the SocketOpener object */
public Channel getSocket()
{
return javagroups;
}
}
1.1 jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/lateral/javagroups/utils/JGRpcOpener.java
Index: JGRpcOpener.java
===================================================================
package org.apache.stratum.jcs.auxiliary.lateral.javagroups.utils;
/*
* The Apache Software License, Version 1.1
*
* Copyright (c) 2001 The Apache Software Foundation. All rights
* reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in
* the documentation and/or other materials provided with the
* distribution.
*
* 3. The end-user documentation included with the redistribution, if
* any, must include the following acknowlegement:
* "This product includes software developed by the
* Apache Software Foundation (http://www.apache.org/)."
* Alternately, this acknowlegement may appear in the software itself,
* if and wherever such third-party acknowlegements normally appear.
*
* 4. The names "The Jakarta Project", "Velocity", and "Apache Software
* Foundation" must not be used to endorse or promote products derived
* from this software without prior written permission. For written
* permission, please contact apache@apache.org.
*
* 5. Products derived from this software may not be called "Apache"
* nor may "Apache" appear in their names without prior written
* permission of the Apache Group.
*
* THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
* WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
* OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR
* ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
* USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
* ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
* OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
* OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*/
import java.io.IOException;
import java.io.InterruptedIOException;
import org.javagroups.JChannel;
import org.javagroups.Channel;
import org.javagroups.blocks.RpcDispatcher;
import org.javagroups.blocks.GroupRequest;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogSource;
import org.apache.stratum.jcs.auxiliary.lateral.behavior.ILateralCacheAttributes;
import org.apache.stratum.jcs.auxiliary.lateral.javagroups.behavior.IJGConstants;
import org.apache.stratum.jcs.auxiliary.lateral.javagroups.behavior.ILateralCacheJGListener;
/**
* Socket openere that will timeout on the initial connect rather than block
* forever. Technique from core java II.
*
* @author <a href="mailto:asmuts@yahoo.com">Aaron Smuts</a>
* @created January 15, 2002
* @version $Id: JGRpcOpener.java,v 1.1 2002/02/24 06:15:42 asmuts Exp $
*/
public class JGRpcOpener implements Runnable
{
private final static Log log =
LogSource.getInstance( JGRpcOpener.class );
private String host;
private int port;
//private Socket socket;
private Channel rpcCh;
private RpcDispatcher disp;
private String groupName;
private ILateralCacheJGListener ilcl;
private ILateralCacheAttributes ilca;
/** Constructor for the SocketOpener object */
public static RpcDispatcher openSocket( ILateralCacheJGListener ilcl, ILateralCacheAttributes ilca, int timeOut, String groupName )
{
JGRpcOpener opener = new JGRpcOpener( ilcl, ilca, groupName );
Thread t = new Thread( opener );
t.start();
try
{
t.join( timeOut );
}
catch ( InterruptedException ire )
{
log.error(ire);
}
return opener.getSocket();
}
/**
* Constructor for the SocketOpener object
*
* @param host
* @param port
*/
public JGRpcOpener( ILateralCacheJGListener ilcl, ILateralCacheAttributes ilca, String groupName )
{
this.rpcCh = null;
this.ilcl = ilcl;
this.ilca = ilca;
this.groupName = groupName;
}
/** Main processing method for the SocketOpener object */
public void run()
{
try
{
String props="UDP(mcast_addr=" + ilca.getUdpMulticastAddr() + ";mcast_port=" + ilca.getUdpMulticastPort()+ "):PING:FD:STABLE:NAKACK:UNICAST:FLUSH:GMS:VIEW_ENFORCER:QUEUE";
rpcCh = new JChannel(props);
rpcCh.setOpt(rpcCh.LOCAL, Boolean.FALSE);
disp = new RpcDispatcher( rpcCh, null, null, ilcl );
rpcCh.connect(groupName);
}
catch ( Exception e )
{
log.error(e);
}
}
/** Gets the socket attribute of the SocketOpener object */
public RpcDispatcher getSocket()
{
return disp;
}
}
1.1 jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/lateral/javagroups/behavior/ILateralCacheJGListener.java
Index: ILateralCacheJGListener.java
===================================================================
package org.apache.stratum.jcs.auxiliary.lateral.javagroups.behavior;
/*
* The Apache Software License, Version 1.1
*
* Copyright (c) 2001 The Apache Software Foundation. All rights
* reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in
* the documentation and/or other materials provided with the
* distribution.
*
* 3. The end-user documentation included with the redistribution, if
* any, must include the following acknowlegement:
* "This product includes software developed by the
* Apache Software Foundation (http://www.apache.org/)."
* Alternately, this acknowlegement may appear in the software itself,
* if and wherever such third-party acknowlegements normally appear.
*
* 4. The names "The Jakarta Project", "Velocity", and "Apache Software
* Foundation" must not be used to endorse or promote products derived
* from this software without prior written permission. For written
* permission, please contact apache@apache.org.
*
* 5. Products derived from this software may not be called "Apache"
* nor may "Apache" appear in their names without prior written
* permission of the Apache Group.
*
* THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
* WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
* OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR
* ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
* USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
* ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
* OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
* OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*/
import java.io.Serializable;
import java.io.IOException;
import org.apache.stratum.jcs.auxiliary.lateral.behavior.ILateralCacheListener;
/**
* Listens for lateral cache event notification.
*
* @author <a href="mailto:asmuts@yahoo.com">Aaron Smuts</a>
* @created February 19, 2002
* @version $Id: asmuts
* Exp $
*/
public interface ILateralCacheJGListener extends ILateralCacheListener
{
/** Description of the Method */
public void init();
/** Tries to get a requested item from the cache. */
public Serializable handleGet( String cacheName, Serializable key )
throws IOException;
}
1.1 jakarta-turbine-stratum/src/java/org/apache/stratum/jcs/auxiliary/lateral/javagroups/behavior/IJGConstants.java
Index: IJGConstants.java
===================================================================
package org.apache.stratum.jcs.auxiliary.lateral.javagroups.behavior;
public interface IJGConstants {
public static final String HANDLERNAME = "LATERAL_JG_CACHE";
public static final String DEFAULT_JG_GROUP_NAME = "JCS_CACHE";
public static final String RPC_JG_GROUP_NAME = "RPC_JCS_CACHE";
}
--
To unsubscribe, e-mail: <ma...@jakarta.apache.org>
For additional commands, e-mail: <ma...@jakarta.apache.org>