You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jcs-dev@jakarta.apache.org by as...@apache.org on 2004/05/14 06:59:40 UTC

cvs commit: jakarta-turbine-jcs/src/java/org/apache/jcs/auxiliary/javagroups JavaGroupsCacheFactory.java JavaGroupsCacheAttributes.java JavaGroupsCache.java

asmuts      2004/05/13 21:59:40

  Added:       src/java/org/apache/jcs/auxiliary/javagroups
                        JavaGroupsCacheFactory.java
                        JavaGroupsCacheAttributes.java JavaGroupsCache.java
  Log:
  trying to figure out what to add to this from the other.  need to prevent blocking on startup.
  
  Revision  Changes    Path
  1.1                  jakarta-turbine-jcs/src/java/org/apache/jcs/auxiliary/javagroups/JavaGroupsCacheFactory.java
  
  Index: JavaGroupsCacheFactory.java
  ===================================================================
  package org.apache.jcs.auxiliary.javagroups;
  
  
  /*
   * Copyright 2001-2004 The Apache Software Foundation.
   *
   * Licensed under the Apache License, Version 2.0 (the "License")
   * you may not use this file except in compliance with the License.
   * You may obtain a copy of the License at
   *
   *     http://www.apache.org/licenses/LICENSE-2.0
   *
   * Unless required by applicable law or agreed to in writing, software
   * distributed under the License is distributed on an "AS IS" BASIS,
   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   * See the License for the specific language governing permissions and
   * limitations under the License.
   */
  
  
  import org.apache.jcs.auxiliary.AuxiliaryCacheFactory;
  import org.apache.jcs.auxiliary.AuxiliaryCache;
  import org.apache.jcs.auxiliary.AuxiliaryCacheAttributes;
  import org.apache.jcs.engine.control.CompositeCacheManager;
  import org.apache.jcs.engine.control.CompositeCache;
  import org.apache.commons.logging.Log;
  import org.apache.commons.logging.LogFactory;
  import org.jgroups.ChannelFactory;
  import org.jgroups.Channel;
  
  /**
   * AuxiliaryCacheFactory for creating instances of {@link JavaGroupsCache}
   * for a particular CompositeCache and {@link JavaGroupsCacheAttributes}.
   *
   * @version $Id: JavaGroupsCacheFactory.java,v 1.1 2004/05/14 04:59:40 asmuts Exp $
   */
  public class JavaGroupsCacheFactory implements AuxiliaryCacheFactory
  {
      private final static Log log =
          LogFactory.getLog( JavaGroupsCacheFactory.class );
  
      private String name;
  
      public AuxiliaryCache createCache( AuxiliaryCacheAttributes iaca,
                                         CompositeCache cache )
      {
          try
          {
              // Cast provided attributes to JavaGroupsCacheAttributes
  
              JavaGroupsCacheAttributes attributes =
                  ( JavaGroupsCacheAttributes ) iaca;
  
              // Create a ChannelFactory using the classname specified in the
              // config as 'channelFactoryClassName'
  
              ChannelFactory factory = ( ChannelFactory ) Class.forName(
                  attributes.getChannelFactoryClassName() ).newInstance();
  
              // Create a channel based on 'channelProperties' from the config
  
              Channel channel =
                  factory.createChannel( attributes.getJGChannelProperties() );
  
              // Return a new JavaGroupsCache for the new channel.
  
              return new JavaGroupsCache( cache,
                                          channel,
                                          attributes.isGetFromPeers() );
          }
          catch ( Exception e )
          {
              log.error( "Failed to create JavaGroupsCache", e );
  
              return null;
          }
      }
  
      /**
       * Accessor for name property
       */
      public String getName()
      {
          return this.name;
      }
  
      /**
       * Mutator for name property
       */
      public void setName( String name )
      {
          this.name = name;
      }
  }
  
  
  
  1.1                  jakarta-turbine-jcs/src/java/org/apache/jcs/auxiliary/javagroups/JavaGroupsCacheAttributes.java
  
  Index: JavaGroupsCacheAttributes.java
  ===================================================================
  package org.apache.jcs.auxiliary.javagroups;
  
  
  /*
   * Copyright 2001-2004 The Apache Software Foundation.
   *
   * Licensed under the Apache License, Version 2.0 (the "License")
   * you may not use this file except in compliance with the License.
   * You may obtain a copy of the License at
   *
   *     http://www.apache.org/licenses/LICENSE-2.0
   *
   * Unless required by applicable law or agreed to in writing, software
   * distributed under the License is distributed on an "AS IS" BASIS,
   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   * See the License for the specific language governing permissions and
   * limitations under the License.
   */
  
  
  import org.apache.jcs.auxiliary.AuxiliaryCacheAttributes;
  
  /**
   * Attributes used by {@link JavaGroupsCacheFactory#createCache} to configure
   * an instance of {@link JavaGroupsCache}.
   *
   * <h3> Configurable Properties: </h3>
   *
   * <dl>
   *   <dt>channelFactoryClassName</dt>
   *   <dd>
   *     Name of an {@link org.jgroups.ChannelFactory} implementation which
   *     will be used to create the channel for the instance. Defaults to
   *     {@link org.jgroups.JChannelFactory}.
   *   </dd>
   *   <dt>channelProperties</dt>
   *   <dd>
   *     A JavaGroups properties object which will be used by the channel to
   *     create the protocol stack. Either a properties string, or the URL of
   *     a file containing the properties in XML form is valid. Defaults to null
   *     which causes the Channel implementation to use its defaults.
   *   </dd>
   * </dl>
   *
   * @version $Id: JavaGroupsCacheAttributes.java,v 1.1 2004/05/14 04:59:40 asmuts Exp $
   */
  public class JavaGroupsCacheAttributes implements AuxiliaryCacheAttributes
  {
      private String cacheName;
      private String name;
  
      private String channelFactoryClassName = "org.jgroups.JChannelFactory";
      private String channelProperties = null;
      private boolean getFromPeers = false;
  
      public String getChannelFactoryClassName()
      {
          return channelFactoryClassName;
      }
  
      public void setChannelFactoryClassName( String channelFactoryClassName )
      {
          this.channelFactoryClassName = channelFactoryClassName;
      }
  
      public String getJGChannelProperties()
      {
          return channelProperties;
      }
  
      public void setChannelProperties( String channelProperties )
      {
          this.channelProperties = channelProperties;
      }
  
      public boolean isGetFromPeers()
      {
          return getFromPeers;
      }
  
      public void setGetFromPeers( boolean getFromPeers )
      {
          this.getFromPeers = getFromPeers;
      }
  
      // ----------------------------------------------- AuxiliaryCacheAttributes
  
      /**
       * Accessor for cacheName property.
       */
      public String getCacheName()
      {
          return this.cacheName;
      }
  
      /**
       * Mutator for cacheName property.
       */
      public void setCacheName( String s )
      {
          this.cacheName = s;
      }
  
      /**
       * Accessor for name property.
       */
      public String getName()
      {
          return this.name;
      }
  
      /**
       * Mutator for name property.
       */
      public void setName( String name )
      {
          this.name = name;
      }
  
      /**
       * Return a copy of this JavaGroupsCacheAttributes, cast to an
       * AuxiliaryCacheAttributes
       */
      public AuxiliaryCacheAttributes copy()
      {
          return ( AuxiliaryCacheAttributes ) this.clone();
      }
  
      /**
       * Return a clone of this JavaGroupsCacheAttributes
       */
      public Object clone()
      {
          JavaGroupsCacheAttributes copy = new JavaGroupsCacheAttributes();
  
          copy.cacheName = this.cacheName;
          copy.name = this.name;
  
          copy.channelFactoryClassName = this.channelFactoryClassName;
          copy.channelProperties = this.channelProperties;
  
          return copy;
      }
  }
  
  
  
  
  1.1                  jakarta-turbine-jcs/src/java/org/apache/jcs/auxiliary/javagroups/JavaGroupsCache.java
  
  Index: JavaGroupsCache.java
  ===================================================================
  package org.apache.jcs.auxiliary.javagroups;
  
  
  /*
   * Copyright 2001-2004 The Apache Software Foundation.
   *
   * Licensed under the Apache License, Version 2.0 (the "License")
   * you may not use this file except in compliance with the License.
   * You may obtain a copy of the License at
   *
   *     http://www.apache.org/licenses/LICENSE-2.0
   *
   * Unless required by applicable law or agreed to in writing, software
   * distributed under the License is distributed on an "AS IS" BASIS,
   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   * See the License for the specific language governing permissions and
   * limitations under the License.
   */
  
  
  import org.apache.commons.logging.Log;
  import org.apache.commons.logging.LogFactory;
  import org.apache.jcs.auxiliary.AuxiliaryCache;
  import org.apache.jcs.engine.CacheConstants;
  import org.apache.jcs.engine.CacheElement;
  import org.apache.jcs.engine.behavior.ICacheElement;
  import org.apache.jcs.engine.behavior.ICacheType;
  import org.apache.jcs.engine.control.CompositeCache;
  import org.jgroups.Channel;
  import org.jgroups.Message;
  import org.jgroups.View;
  import org.jgroups.Address;
  import org.jgroups.MembershipListener;
  import org.jgroups.util.RspList;
  import org.jgroups.blocks.RequestHandler;
  import org.jgroups.blocks.GroupRequest;
  import org.jgroups.blocks.MessageDispatcher;
  
  import java.io.IOException;
  import java.io.Serializable;
  import java.util.Set;
  import java.util.Vector;
  
  /**
   * Auxiliary cache using javagroups. Expects to be created with a Channel,
   * the {@link JavaGroupsCacheFactory} is responsible for creating that channel.
   * To do so it uses configuration properties specified by an instance of
   * {@link JavaGroupsCacheAttributes}.
   * <p>
   * At creation time the provided channel is connected to a group having the
   * same name as the cache / region name this auxiliary is associated with.
   * update / remove / removeAll operations are broadcast to all members of the
   * group. A listener thread processes requests from other members of the group,
   * and dispatches to appropriate methods on the associated CompositeCache. </p>
   * <p>
   * Calls to get are currently ignored.
   * <p>
   * Messages are sent to peers asynchronously. Synchronous messaging could be
   * added using MessageDispatcher or RpcDispatcher. Combined with a get
   * implementation this could provide much higher cache consistency (but with
   * a substantial speed penalty).
   *
   * @version $Id: JavaGroupsCache.java,v 1.1 2004/05/14 04:59:40 asmuts Exp $
   */
  public class JavaGroupsCache
      implements AuxiliaryCache, RequestHandler, MembershipListener
  {
      private final Log log = LogFactory.getLog( JavaGroupsCache.class );
  
      private String cacheName;
      private int status;
  
      private boolean getFromPeers;
  
      private CompositeCache cache;
  
      private Channel channel;
  
      private MessageDispatcher dispatcher;
  
      public JavaGroupsCache( CompositeCache cache,
                              Channel channel,
                              boolean getFromPeers )
          throws Exception
      {
          this.cache = cache;
  
          this.cacheName = cache.getCacheName();
          this.channel = channel;
  
          this.getFromPeers = getFromPeers;
  
          // The adapter listens to the channel and fires MessageListener events
          // on this object.
  
          dispatcher = new MessageDispatcher( channel, null, this, this );
  
          // Connect channel to the 'group' for our region name
  
          channel.setOpt( Channel.LOCAL, Boolean.FALSE );
  
          channel.connect( cacheName );
  
          // If all the above succeed, the cache is now alive.
  
          this.status = CacheConstants.STATUS_ALIVE;
  
          log.info( "Initialized for cache: " + cacheName );
      }
  
      public void send( ICacheElement element, int command )
      {
          Request request = new Request( element, command );
  
          try
          {
              dispatcher.castMessage( null,
                                      new Message( null, null, request ),
                                      GroupRequest.GET_NONE,
                                      0 );
  
          }
          catch ( Exception e )
          {
              log.error( "Failed to send JavaGroups message", e );
          }
      }
  
      // ----------------------------------------------- interface AuxiliaryCache
  
      /**
       * Sends the provided element to all peers (connected to the same channel
       * and region name).
       *
       * @param ce CacheElement to replicate
       * @throws IOException Never thrown by this implementation
       */
      public void update( ICacheElement ce ) throws IOException
      {
          send( ce, Request.UPDATE );
      }
  
      /**
       * If 'getFromPeers' is true, this will attempt to get the requested
       * element from ant other members of the group.
       *
       * @param key
       * @return
       * @throws IOException Never thrown by this implementation
       */
      public ICacheElement get( Serializable key ) throws IOException
      {
          if ( getFromPeers )
          {
              CacheElement element = new CacheElement( cacheName, key, null );
  
              Request request = new Request( element, Request.GET );
  
              // Cast message and wait for all responses.
  
              // FIXME: we can stop waiting after the first not null response,
              //        that is more difficult to implement however.
  
              RspList responses =
                  dispatcher.castMessage( null,
                                          new Message( null, null, request ),
                                          GroupRequest.GET_ALL,
                                          0 );
  
              // Get results only gives the responses which were not null
  
              Vector results = responses.getResults();
  
              // If there were any non null results, return the first
  
              if ( results.size() > 0 )
              {
                  return ( ICacheElement ) results.get( 0 );
              }
          }
  
          return null;
      }
  
      /**
       * Sends a request to all peers to remove the element having the provided
       * key.
       *
       * @param key Key of element to be removed
       * @throws IOException Never thrown by this implementation
       */
      public boolean remove( Serializable key ) throws IOException
      {
          CacheElement ce = new CacheElement( cacheName, key, null );
  
          send( ce, Request.REMOVE );
  
          return false;
      }
  
      /**
       * Sends a request to remove ALL elements from the peers
       *
       * @throws IOException Never thrown by this implementation
       */
      public void removeAll() throws IOException
      {
          CacheElement ce = new CacheElement( cacheName, null, null );
  
          send( ce, Request.REMOVE_ALL );
      }
  
      /**
       * Dispose this cache, terminates the listener thread and disconnects the
       * channel from the group.
       *
       * @throws IOException
       */
      public void dispose() throws IOException
      {
          // This will join the scheduler thread and ensure everything terminates
  
          dispatcher.stop();
  
          // Now we can disconnect from the group and close the channel
  
          channel.disconnect();
          channel.close();
  
          status = CacheConstants.STATUS_DISPOSED;
  
          log.info( "Disposed for cache: " + cacheName );
      }
  
      /**
       * Since this is a lateral, size is not defined.
       *
       * @return Always returns 0
       */
      public int getSize()
      {
          return 0;
      }
  
      /**
       * Returns the status of this auxiliary.
       *
       * @return One of the status constants from {@link CacheConstants}
       */
      public int getStatus()
      {
          return status;
      }
  
      /**
       * Accessor for cacheName property
       *
       * @return Name of cache / region this auxiliary is associated with.
       */
      public String getCacheName()
      {
          return cacheName;
      }
  
      /**
       * Not implemented (I believe since get is not supported, this should also
       * not be).
       *
       * @param group Ignored
       * @return Always reurns null
       */
      public Set getGroupKeys( String group )
      {
          return null;
      }
  
      // --------------------------------------------------- interface ICacheType
  
      /**
       * Get the cache type (always Lateral).
       *
       * @return Always returns ICacheType.LATERAL_CACHE
       */
      public int getCacheType()
      {
          return ICacheType.LATERAL_CACHE;
      }
  
      // ----------------------------------------------- interface RequestHandler
  
      /**
       * Handles a message from a peer. The message should contain a Request,
       * and depending on the command this will call localUpdate, localRemove,
       * or localRemoveAll on the associated CompositeCache.
       *
       * @param msg The JavaGroups Message
       * @return Always returns null
       */
      public Object handle( Message msg )
      {
          try
          {
              Request request = ( Request ) msg.getObject();
  
              // Switch based on the command and invoke the
              // appropriate method on the associate composite cache
  
              switch ( request.getCommand() )
              {
                  case Request.GET:
  
                      return cache.localGet( request.getCacheElement().getKey() );
                      // break;
  
                  case Request.UPDATE:
  
                      cache.localUpdate( request.getCacheElement() );
                      break;
  
                  case Request.REMOVE:
  
                      cache.localRemove( request.getCacheElement().getKey() );
                      break;
  
                  case Request.REMOVE_ALL:
  
                      cache.localRemoveAll();
                      break;
  
                  default:
  
                      log.error( "Recieved unknown command" );
              }
          }
          catch ( Exception e )
          {
              log.error( "Failed to process received JavaGroups message", e );
          }
  
          return null;
      }
  
      // ------------------------------------------- interface MembershipListener
  
      public void viewAccepted( View view )
      {
          log.info( "View Changed: " + String.valueOf( view ) );
      }
  
      public void suspect( Address suspectedAddress ) { }
  
      public void block() { }
  
      // ---------------------------------------------------------- inner classes
  
      /**
       * Object for messages, wraps the command type (update, remove, or remove
       * all) and original cache element to distribute.
       */
      static class Request implements Serializable
      {
          public final static int UPDATE = 1;
          public final static int REMOVE = 2;
          public final static int REMOVE_ALL = 3;
          public final static int GET = 5;
  
          private ICacheElement cacheElement;
          private int command;
  
          public Request( ICacheElement cacheElement, int command )
          {
              this.cacheElement = cacheElement;
              this.command = command;
          }
  
          public ICacheElement getCacheElement()
          {
              return cacheElement;
          }
  
          public int getCommand()
          {
              return command;
          }
      }
  }
  
  
  

---------------------------------------------------------------------
To unsubscribe, e-mail: turbine-jcs-dev-unsubscribe@jakarta.apache.org
For additional commands, e-mail: turbine-jcs-dev-help@jakarta.apache.org