You are viewing a plain text version of this content. The canonical link for it is here.
Posted to java-dev@axis.apache.org by jm...@apache.org on 2002/10/29 06:15:30 UTC

cvs commit: xml-axis/java/src/org/apache/axis/ime/internal/util KeyedBuffer.java WorkerPool.java NonPersistentKeyedBuffer.java

jmsnell     2002/10/28 21:15:30

  Modified:    java/src/org/apache/axis/ime/internal
                        NonPersistentMessageExchangeCorrelatorService.java
                        MessageExchangeProvider.java
                        MessageExchangeImpl.java
               java/src/org/apache/axis/ime MessageExchange.java
                        MessageExchangeCorrelatorService.java
  Added:       java/src/org/apache/axis/ime/internal/util/handler
                        HandlerWrapper.java
               java/src/org/apache/axis/ime/internal
                        MessageExchangeSendContext.java
                        MessageExchangeSendListener.java
                        FirstComeFirstServeDispatchPolicy.java
                        MessageExchangeReceiveContext.java
                        ReceivedMessageDispatchPolicy.java
               java/src/org/apache/axis/ime MessageContextListener.java
               java/src/org/apache/axis/ime/internal/util KeyedBuffer.java
                        WorkerPool.java NonPersistentKeyedBuffer.java
  Removed:     java/src/org/apache/axis/ime/internal/util/handler
                        HandlerWrapper1.java HandlerWrapper2.java
               java/src/org/apache/axis/ime/internal
                        MessageExchangeProvider1.java MessageWorker.java
                        MessageExchangeProvider2.java
                        NonPersistentMessageChannel.java
                        MessageWorkerGroup.java
               java/src/org/apache/axis/ime MessageChannel.java
                        MessageExchangeReceiveListener.java
                        MessageExchangeContextListener.java
                        MessageExchangeContext.java
  Log:
  Some fairly significant changes here.  I went through and simplified the 
  MessageExchange interface and org.apache.axis.ime package and reworked
  the internal impl quite a bit to improve how the way sending and receiving
  is done.  This still needs to be tested, but everything builds.
  
  Revision  Changes    Path
  1.1                  xml-axis/java/src/org/apache/axis/ime/internal/util/handler/HandlerWrapper.java
  
  Index: HandlerWrapper.java
  ===================================================================
  /*
   * The Apache Software License, Version 1.1
   *
   *
   * Copyright (c) 2001 The Apache Software Foundation.  All rights
   * reserved.
   *
   * Redistribution and use in source and binary forms, with or without
   * modification, are permitted provided that the following conditions
   * are met:
   *
   * 1. Redistributions of source code must retain the above copyright
   *    notice, this list of conditions and the following disclaimer.
   *
   * 2. Redistributions in binary form must reproduce the above copyright
   *    notice, this list of conditions and the following disclaimer in
   *    the documentation and/or other materials provided with the
   *    distribution.
   *
   * 3. The end-user documentation included with the redistribution,
   *    if any, must include the following acknowledgment:
   *       "This product includes software developed by the
   *        Apache Software Foundation (http://www.apache.org/)."
   *    Alternately, this acknowledgment may appear in the software itself,
   *    if and wherever such third-party acknowledgments normally appear.
   *
   * 4. The names "Axis" and "Apache Software Foundation" must
   *    not be used to endorse or promote products derived from this
   *    software without prior written permission. For written
   *    permission, please contact apache@apache.org.
   *
   * 5. Products derived from this software may not be called "Apache",
   *    nor may "Apache" appear in their name, without prior written
   *    permission of the Apache Software Foundation.
   *
   * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
   * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
   * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
   * DISCLAIMED.  IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR
   * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
   * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
   * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
   * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
   * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
   * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
   * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
   * SUCH DAMAGE.
   * ====================================================================
   *
   * This software consists of voluntary contributions made by many
   * individuals on behalf of the Apache Software Foundation.  For more
   * information on the Apache Software Foundation, please see
   * <http://www.apache.org/>.
   */
  
  package org.apache.axis.ime.internal.util.handler;
  
  import org.apache.axis.Handler;
  import org.apache.axis.MessageContext;
  import org.apache.axis.ime.MessageExchangeCorrelator;
  import org.apache.axis.ime.MessageContextListener;
  import org.apache.axis.ime.MessageExchangeFaultListener;
  import org.apache.axis.ime.internal.MessageExchangeProvider;
  import org.apache.axis.ime.internal.MessageExchangeSendContext;
  import org.apache.axis.ime.internal.MessageExchangeSendListener;
  import org.apache.axis.ime.internal.ReceivedMessageDispatchPolicy;
  import org.apache.axis.ime.internal.FirstComeFirstServeDispatchPolicy;
  
  /**
   * Used to wrap synchronous handlers (e.g. Axis 1.0 transports)
   * 
   * @author James M Snell (jasnell@us.ibm.com)
   */
  public class HandlerWrapper
          extends MessageExchangeProvider {
  
      private Handler handler;
  
      public HandlerWrapper(Handler handler) {
          this.handler = handler;
      }
  
      /**
       * @see org.apache.axis.ime.internal.MessageExchangeProvider1#createSendMessageContextListener()
       */
      protected MessageExchangeSendListener getMessageExchangeSendListener() {
          return new Listener(handler);
      }
  
      protected ReceivedMessageDispatchPolicy getReceivedMessageDispatchPolicy() {
          return new FirstComeFirstServeDispatchPolicy(RECEIVE, RECEIVE_REQUESTS);
      }
  
      public class Listener
              implements MessageExchangeSendListener {
  
          private Handler handler;
  
          public Listener(Handler handler) {
              this.handler = handler;
          }
  
          /**
           * @see org.apache.axis.ime.MessageExchangeContextListener#onMessageExchangeContext(MessageExchangeContext)
           */
          public void onSend(
                  MessageExchangeSendContext context) {
              MessageExchangeFaultListener listener = 
                  context.getMessageExchangeFaultListener();
              try {
                  MessageContext msgContext =
                          context.getMessageContext();
                  MessageExchangeCorrelator correlator =
                          context.getMessageExchangeCorrelator();
              
                  // should I do init's and cleanup's in here?  
                  handler.invoke(msgContext);
  
  
                  RECEIVE.put(correlator, context);
              } catch (Exception exception) {
                  if (listener != null)
                      listener.onFault(
                              context.getMessageExchangeCorrelator(),
                              exception);
              }
          }
      }
  }
  
  
  
  1.3       +3 -4      xml-axis/java/src/org/apache/axis/ime/internal/NonPersistentMessageExchangeCorrelatorService.java
  
  Index: NonPersistentMessageExchangeCorrelatorService.java
  ===================================================================
  RCS file: /home/cvs/xml-axis/java/src/org/apache/axis/ime/internal/NonPersistentMessageExchangeCorrelatorService.java,v
  retrieving revision 1.2
  retrieving revision 1.3
  diff -u -r1.2 -r1.3
  --- NonPersistentMessageExchangeCorrelatorService.java	28 Oct 2002 13:50:40 -0000	1.2
  +++ NonPersistentMessageExchangeCorrelatorService.java	29 Oct 2002 05:15:29 -0000	1.3
  @@ -55,7 +55,6 @@
   
   package org.apache.axis.ime.internal;
   
  -import org.apache.axis.ime.MessageExchangeContext;
   import org.apache.axis.ime.MessageExchangeCorrelator;
   import org.apache.axis.ime.MessageExchangeCorrelatorService;
   
  @@ -74,7 +73,7 @@
        */
       public void put(
               MessageExchangeCorrelator correlator,
  -            MessageExchangeContext context) {
  +            Object context) {
           synchronized (contexts) {
               contexts.put(correlator, context);
           }
  @@ -83,9 +82,9 @@
       /**
        * @see org.apache.axis.ime.MessageExchangeCorrelatorService#get(MessageExchangeCorrelator)
        */
  -    public MessageExchangeContext get(MessageExchangeCorrelator correlator) {
  +    public Object get(MessageExchangeCorrelator correlator) {
           synchronized (contexts) {
  -            return (MessageExchangeContext) contexts.remove(correlator);
  +            return contexts.remove(correlator);
           }
       }
   
  
  
  
  1.3       +123 -11   xml-axis/java/src/org/apache/axis/ime/internal/MessageExchangeProvider.java
  
  Index: MessageExchangeProvider.java
  ===================================================================
  RCS file: /home/cvs/xml-axis/java/src/org/apache/axis/ime/internal/MessageExchangeProvider.java,v
  retrieving revision 1.2
  retrieving revision 1.3
  diff -u -r1.2 -r1.3
  --- MessageExchangeProvider.java	28 Oct 2002 13:50:40 -0000	1.2
  +++ MessageExchangeProvider.java	29 Oct 2002 05:15:29 -0000	1.3
  @@ -55,38 +55,68 @@
   
   package org.apache.axis.ime.internal;
   
  -import org.apache.axis.ime.MessageChannel;
  +import org.apache.axis.i18n.Messages;
  +import org.apache.axis.MessageContext;
   import org.apache.axis.ime.MessageExchange;
  +import org.apache.axis.ime.MessageContextListener;
  +import org.apache.axis.ime.MessageExchangeCorrelator;
   import org.apache.axis.ime.MessageExchangeFactory;
  +import org.apache.axis.ime.MessageExchangeFaultListener;
  +import org.apache.axis.ime.internal.util.WorkerPool;
  +import org.apache.axis.ime.internal.util.KeyedBuffer;
  +import org.apache.axis.ime.internal.util.NonPersistentKeyedBuffer;
   
   /**
  - * Serves as a base class for MessageExchangeProviders that
  - * need to thread pooling on send AND receive message 
  - * flows (as opposed to MessageExchangeProvider2 which only
  - * does thread pooling on send flows).
  - * 
    * @author James M Snell (jasnell@us.ibm.com)
    */
   public abstract class MessageExchangeProvider
           implements MessageExchangeFactory {
   
  +    public static final long SELECT_TIMEOUT = 1000 * 30;
       public static final long DEFAULT_THREAD_COUNT = 5;
   
  -    protected final MessageWorkerGroup WORKERS = new MessageWorkerGroup();
  -    protected final MessageChannel SEND = new NonPersistentMessageChannel(WORKERS);
  -    protected final MessageChannel RECEIVE = new NonPersistentMessageChannel(WORKERS);
  +    protected final WorkerPool WORKERS = new WorkerPool();
  +    protected final KeyedBuffer SEND = new NonPersistentKeyedBuffer(WORKERS);
  +    protected final KeyedBuffer RECEIVE = new NonPersistentKeyedBuffer(WORKERS);
  +    protected final KeyedBuffer RECEIVE_REQUESTS = new NonPersistentKeyedBuffer(WORKERS);
   
       protected boolean initialized = false;
   
  +    protected abstract MessageExchangeSendListener getMessageExchangeSendListener();
  +
  +    protected abstract ReceivedMessageDispatchPolicy getReceivedMessageDispatchPolicy();
  +
       public MessageExchange createMessageExchange() {
  -        return new MessageExchangeImpl(this, SEND, RECEIVE);
  +        return new MessageExchangeImpl(this);
       }
   
       public void init() {
           init(DEFAULT_THREAD_COUNT);
       }
   
  -    public abstract void init(long THREAD_COUNT);
  +    public void init(long THREAD_COUNT) {
  +        if (initialized)
  +            throw new IllegalStateException(Messages.getMessage("illegalStateException00"));
  +        for (int n = 0; n < THREAD_COUNT; n++) {
  +            WORKERS.addWorker(new MessageSender(WORKERS, SEND, getMessageExchangeSendListener()));
  +            WORKERS.addWorker(new MessageReceiver(WORKERS, RECEIVE, getReceivedMessageDispatchPolicy()));
  +        }
  +        initialized = true;
  +    }
  +    
  +    public void processReceive(
  +            MessageExchangeReceiveContext context) {
  +        RECEIVE_REQUESTS.put(
  +            context.getMessageExchangeCorrelator(),
  +            context);
  +    }
  +    
  +    public void processSend(
  +            MessageExchangeSendContext context) {
  +        SEND.put(
  +            context.getMessageExchangeCorrelator(),
  +            context);
  +    }
   
       public void shutdown() {
           shutdown(false);
  @@ -108,6 +138,88 @@
       public void awaitShutdown(long shutdown)
               throws InterruptedException {
           WORKERS.awaitShutdown(shutdown);
  +    }
  +
  +
  +
  +  // -- Worker Classes --- //
  +    public static class MessageReceiver 
  +            implements Runnable {
  +        
  +        protected WorkerPool pool;
  +        protected KeyedBuffer channel;
  +        protected ReceivedMessageDispatchPolicy policy;
  +    
  +        protected MessageReceiver(
  +                WorkerPool pool,
  +                KeyedBuffer channel,
  +                ReceivedMessageDispatchPolicy policy) {
  +            this.pool = pool;
  +            this.channel = channel;
  +            this.policy = policy;
  +        }
  +    
  +        /**
  +         * @see java.lang.Runnable#run()
  +         */
  +        public void run() {
  +            try {
  +                while (!pool.isShuttingDown()) {
  +                    MessageExchangeSendContext context = (MessageExchangeSendContext)channel.select(SELECT_TIMEOUT);
  +                    policy.dispatch(context);
  +                }
  +            } catch (Throwable t) {
  +                // kill the thread if any type of exception occurs.
  +                // don't worry, we'll create another one to replace it
  +                // if we're not currently in the process of shutting down.
  +                // once I get the logging function plugged in, we'll
  +                // log whatever errors do occur
  +            } finally {
  +                pool.workerDone(this);
  +            }
  +        }
  +    
  +    }
  +
  +
  +
  +    public static class MessageSender 
  +            implements Runnable {
  +    
  +        protected WorkerPool pool;
  +        protected KeyedBuffer channel;
  +        protected MessageExchangeSendListener listener;
  +    
  +        protected MessageSender(
  +                WorkerPool pool,
  +                KeyedBuffer channel,
  +                MessageExchangeSendListener listener) {
  +            this.pool = pool;
  +            this.channel = channel;
  +            this.listener = listener;
  +        }
  +        
  +        /**
  +         * @see java.lang.Runnable#run()
  +         */
  +        public void run() {
  +            try {
  +                while (!pool.isShuttingDown()) {
  +                    MessageExchangeSendContext context = (MessageExchangeSendContext)channel.select(SELECT_TIMEOUT);
  +                    if (context != null)
  +                        listener.onSend(context);
  +                }
  +            } catch (Throwable t) {
  +                // kill the thread if any type of exception occurs.
  +                // don't worry, we'll create another one to replace it
  +                // if we're not currently in the process of shutting down.
  +                // once I get the logging function plugged in, we'll
  +                // log whatever errors do occur
  +            } finally {
  +                pool.workerDone(this);
  +            }
  +        }
  +    
       }
   
   }
  
  
  
  1.3       +112 -160  xml-axis/java/src/org/apache/axis/ime/internal/MessageExchangeImpl.java
  
  Index: MessageExchangeImpl.java
  ===================================================================
  RCS file: /home/cvs/xml-axis/java/src/org/apache/axis/ime/internal/MessageExchangeImpl.java,v
  retrieving revision 1.2
  retrieving revision 1.3
  diff -u -r1.2 -r1.3
  --- MessageExchangeImpl.java	28 Oct 2002 13:50:40 -0000	1.2
  +++ MessageExchangeImpl.java	29 Oct 2002 05:15:29 -0000	1.3
  @@ -58,16 +58,14 @@
   import org.apache.axis.AxisFault;
   import org.apache.axis.MessageContext;
   import org.apache.axis.i18n.Messages;
  -import org.apache.axis.ime.MessageChannel;
   import org.apache.axis.ime.MessageExchange;
   import org.apache.axis.ime.MessageExchangeConstants;
  -import org.apache.axis.ime.MessageExchangeContext;
  -import org.apache.axis.ime.MessageExchangeContextListener;
  -import org.apache.axis.ime.MessageExchangeCorrelator;
   import org.apache.axis.ime.MessageExchangeFaultListener;
  -import org.apache.axis.ime.MessageExchangeLifecycle;
  -import org.apache.axis.ime.MessageExchangeReceiveListener;
   import org.apache.axis.ime.MessageExchangeStatusListener;
  +import org.apache.axis.ime.MessageExchangeCorrelator;
  +import org.apache.axis.ime.MessageExchangeCorrelatorService;
  +import org.apache.axis.ime.MessageContextListener;
  +import org.apache.axis.ime.MessageExchangeLifecycle;
   import org.apache.axis.ime.internal.util.uuid.UUIDGenFactory;
   
   /**
  @@ -76,25 +74,19 @@
   public class MessageExchangeImpl
           implements MessageExchange, MessageExchangeLifecycle {
   
  +    private static final long NO_TIMEOUT = -1;
       public static final long WORKER_COUNT = 5;
       public static final long DEFAULT_TIMEOUT = 1000 * 20;
   
  -    private MessageExchangeProvider provider;
  -    private MessageChannel send;
  -    private MessageChannel receive;
  -    private MessageExchangeReceiveListener receiveListener;
  -    private MessageExchangeStatusListener statusListener;
  -    private MessageExchangeFaultListener faultListener;
       private MessageWorkerGroup workers = new MessageWorkerGroup();
  +    private MessageExchangeFaultListener faultListener;
  +    private MessageExchangeStatusListener statusListener;
  +    private MessageExchangeProvider provider;
       private boolean listening = false;
       protected Holder holder;
   
       public MessageExchangeImpl(
  -            MessageExchangeProvider provider,
  -            MessageChannel sendChannel,
  -            MessageChannel receiveChannel) {
  -        this.send = sendChannel;
  -        this.receive = receiveChannel;
  +            MessageExchangeProvider provider) {
       }
   
       /**
  @@ -103,6 +95,16 @@
       public MessageExchangeCorrelator send(
               MessageContext context)
               throws AxisFault {
  +        return send(context,null); // should do default listener
  +    }
  +
  +    /**
  +     * @see org.apache.axis.ime.MessageExchange#send(MessageContext)
  +     */
  +    public MessageExchangeCorrelator send(
  +            MessageContext context,
  +            MessageContextListener listener)
  +            throws AxisFault {
           MessageExchangeCorrelator correlator =
                   (MessageExchangeCorrelator) context.getProperty(
                           MessageExchangeConstants.MESSAGE_CORRELATOR_PROPERTY);
  @@ -113,88 +115,64 @@
                       MessageExchangeConstants.MESSAGE_CORRELATOR_PROPERTY,
                       correlator);
           }
  -        MessageExchangeContext meContext =
  -                MessageExchangeContext.newInstance(
  -                        correlator,
  -                        statusListener,
  -                        receiveListener,
  -                        faultListener,
  -                        context);
  -        send.put(correlator, meContext);
  +        if (listener != null) {
  +            provider.processReceive(
  +                MessageExchangeReceiveContext.newInstance(
  +                    correlator,
  +                    listener,
  +                    faultListener,
  +                    statusListener));
  +        }
  +        provider.processSend(
  +            MessageExchangeSendContext.newInstance(
  +                correlator,
  +                context,
  +                faultListener,
  +                statusListener));
           return correlator;
       }
   
       /**
  -     * @see org.apache.axis.ime.MessageExchange#setMessageExchangeStatusListener(MessageExchangeStatusListener)
  -     */
  -    public void setMessageExchangeStatusListener(
  -            MessageExchangeStatusListener listener)
  -            throws AxisFault {
  -        if (listening)
  -            throw new IllegalStateException(Messages.getMessage("illegalStateException00"));
  -        this.statusListener = listener;
  -    }
  -
  -    /**
  -     * @see org.apache.axis.ime.MessageExchange#setMessageExchangeReceiveListener(MessageExchangeReceiveListener)
  +     * @see org.apache.axis.ime.MessageExchange#receive()
        */
  -    public void setMessageExchangeReceiveListener(
  -            MessageExchangeReceiveListener listener)
  +    public MessageContext receive() 
               throws AxisFault {
  -        if (listening)
  -            throw new IllegalStateException(Messages.getMessage("illegalStateException00"));
  -        this.receiveListener = listener;
  +        return receive(null,NO_TIMEOUT);
       }
   
       /**
  -     * @see org.apache.axis.ime.MessageExchange#setMessageExchangeReceiveListener(MessageExchangeReceiveListener)
  +     * @see org.apache.axis.ime.MessageExchange#receive(long)
        */
  -    public void setMessageExchangeFaultListener(
  -            MessageExchangeFaultListener listener)
  +    public MessageContext receive(
  +            long timeout) 
               throws AxisFault {
  -        if (listening)
  -            throw new IllegalStateException(Messages.getMessage("illegalStateException00"));
  -        this.faultListener = listener;
  +        return receive(null,timeout);
       }
   
       /**
  -     * @see org.apache.axis.ime.MessageExchange#cancel(MessageExchangeCorrelator)
  +     * @see org.apache.axis.ime.MessageExchange#receive(MessageExchangeCorrelator)
        */
  -    public MessageContext cancel(
  -            MessageExchangeCorrelator correlator)
  +    public MessageContext receive(
  +            MessageExchangeCorrelator correlator) 
               throws AxisFault {
  -        MessageExchangeContext context = send.cancel(correlator);
  -        if (context != null)
  -            return context.getMessageContext();
  -        else
  -            return null;
  +        return receive(correlator,NO_TIMEOUT);
       }
   
       /**
  -     * @see org.apache.axis.ime.MessageExchange#getReceiveChannel()
  +     * @see org.apache.axis.ime.MessageExchange#receive(MessageExchangeCorrelator,long)
        */
  -    public MessageChannel getReceiveChannel() {
  -        return receive;
  -    }
  -
  -    /**
  -     * @see org.apache.axis.ime.MessageExchange#getSendChannel()
  -     */
  -    public MessageChannel getSendChannel() {
  -        return send;
  -    }
  -
  -
  -    public MessageContext sendAndReceive(
  -            MessageContext context)
  +    public MessageContext receive(
  +            MessageExchangeCorrelator correlator,
  +            long timeout) 
               throws AxisFault {
           holder = new Holder();
           Listener listener = new Listener(holder);
  -        this.setMessageExchangeFaultListener(listener);
  -        this.setMessageExchangeReceiveListener(listener);
           try {
  -            this.send(context);
  -            holder.waitForNotify();
  +            this.receive(correlator,listener);
  +            if (timeout != NO_TIMEOUT) 
  +              holder.waitForNotify(timeout);
  +            else 
  +              holder.waitForNotify();
           } catch (InterruptedException ie) {
               throw AxisFault.makeFault(ie);
           }
  @@ -207,17 +185,54 @@
           return null;
       }
   
  +    /**
  +     * @see org.apache.axis.ime.MessageExchange#receive(MessageContextListener)
  +     */
  +    public void receive(
  +            MessageContextListener listener) 
  +            throws AxisFault {
  +        receive(null,listener);
  +    }
  +
  +    /**
  +     * @see org.apache.axis.ime.MessageExchange#receive(MessageExchangeCorrelator,MessageContextListener)
  +     */
  +    public void receive(
  +            MessageExchangeCorrelator correlator,
  +            MessageContextListener listener) 
  +            throws AxisFault {
  +        provider.processReceive(
  +            MessageExchangeReceiveContext.newInstance(
  +                correlator,
  +                listener,
  +                faultListener,
  +                statusListener));
  +    }
  +
  +    /**
  +     * @see org.apache.axis.ime.MessageExchange#sendAndReceive(MessageContext)
  +     */
  +    public MessageContext sendAndReceive(
  +            MessageContext context)
  +            throws AxisFault {
  +        return sendAndReceive(context,NO_TIMEOUT);
  +    }
  +
  +    /**
  +     * @see org.apache.axis.ime.MessageExchange#sendAndReceive(MessageContext,long)
  +     */
       public MessageContext sendAndReceive(
               MessageContext context,
               long timeout)
               throws AxisFault {
           holder = new Holder();
           Listener listener = new Listener(holder);
  -        this.setMessageExchangeFaultListener(listener);
  -        this.setMessageExchangeReceiveListener(listener);
           try {
  -            this.send(context);
  -            holder.waitForNotify(timeout);
  +            this.send(context,listener);
  +            if (timeout != NO_TIMEOUT) 
  +              holder.waitForNotify(timeout);
  +            else 
  +              holder.waitForNotify();
           } catch (InterruptedException ie) {
               throw AxisFault.makeFault(ie);
           }
  @@ -230,44 +245,8 @@
           return null;
       }
   
  -    /**
  -     * @see org.apache.axis.ime.MessageExchange#startListening()
  -     */
  -    public void startListening() {
  -        if (provider instanceof MessageExchangeProvider1)
  -            throw new UnsupportedOperationException(Messages.getMessage("unsupportedOperationException00"));
  -        for (int n = 0; n < WORKER_COUNT; n++) {
  -            workers.addWorker(receive, new ReceiverListener());
  -        }
  -        listening = true;
  -    }
   
  -    /**
  -     * @see org.apache.axis.ime.MessageExchange#startListening()
  -     */
  -    public void startListening(MessageExchangeCorrelator correlator) {
  -        throw new UnsupportedOperationException(Messages.getMessage("unsupportedOperationException01", "Unsupported For Now"));
  -    }
  -
  -    /**
  -     * @see org.apache.axis.ime.MessageExchange#stopListening()
  -     */
  -    public void stopListening() {
  -        stopListening(false);
  -    }
  -
  -    /**
  -     * @see org.apache.axis.ime.MessageExchange#stopListening(boolean)
  -     */
  -    public void stopListening(boolean force) {
  -        if (provider instanceof MessageExchangeProvider1)
  -            throw new UnsupportedOperationException(Messages.getMessage("unsupportedOperationException00"));
  -        if (!force)
  -            workers.safeShutdown();
  -        else
  -            workers.shutdown();
  -        listening = false;
  -    }
  +  // -- Utility Classes --- //
   
       private class Holder {
           private MessageExchangeCorrelator correlator;
  @@ -305,8 +284,7 @@
       }
   
       public class Listener
  -            implements MessageExchangeReceiveListener,
  -            MessageExchangeFaultListener {
  +            extends MessageContextListener {
   
           protected Holder holder;
   
  @@ -335,37 +313,8 @@
       }
   
   
  -    private class ReceiverListener
  -            implements MessageExchangeContextListener {
   
  -        /**
  -         * @see org.apache.axis.ime.MessageExchangeContextListener#onMessageExchangeContext(MessageExchangeContext)
  -         */
  -        public void onMessageExchangeContext(
  -                MessageExchangeContext context) {
  -
  -            MessageContext msgContext =
  -                    context.getMessageContext();
  -            MessageExchangeCorrelator correlator =
  -                    context.getMessageExchangeCorrelator();
  -
  -            try {
  -                // there should be code here to see if the message
  -                // contains a fault.  if so, the fault listener should
  -                // be invoked
  -                if (msgContext != null &&
  -                        msgContext.getResponseMessage() != null &&
  -                        receiveListener != null) {
  -                    receiveListener.onReceive(correlator, msgContext);
  -                }
  -            } catch (Exception exception) {
  -                if (faultListener != null)
  -                    faultListener.onFault(
  -                            correlator, exception);
  -            }
  -
  -        }
  -    }
  +  // -- MessageExchangeLifecycle Implementation --- //
   
       /**
        * @see org.apache.axis.ime.MessageExchangeLifecycle#awaitShutdown()
  @@ -404,18 +353,21 @@
           provider.shutdown(force);
       }
   
  -    /**
  -     * @see org.apache.axis.ime.MessageExchange#receive()
  -     */
  -    public MessageContext receive() throws AxisFault {
  -        throw new UnsupportedOperationException(Messages.getMessage("unsupportedOperationException00"));
  +    public synchronized void setMessageExchangeFaultListener(
  +            MessageExchangeFaultListener listener) {
  +        this.faultListener = listener;
       }
  -
  -    /**
  -     * @see org.apache.axis.ime.MessageExchange#receive(long)
  -     */
  -    public MessageContext receive(long timeout) throws AxisFault {
  -        throw new UnsupportedOperationException(Messages.getMessage("unsupportedOperationException00"));
  +    
  +    public synchronized MessageExchangeFaultListener getMessageExchangeFaultListener() {
  +        return this.faultListener;
  +    }
  +    
  +    public synchronized void setMessageExchangeStatusListener(
  +            MessageExchangeStatusListener listener) {
  +        this.statusListener = listener;
  +    }
  +    
  +    public synchronized MessageExchangeStatusListener getMessageExchangeStatusListener() {
  +        return this.statusListener;
       }
  -
   }
  
  
  
  1.1                  xml-axis/java/src/org/apache/axis/ime/internal/MessageExchangeSendContext.java
  
  Index: MessageExchangeSendContext.java
  ===================================================================
  /*
   * The Apache Software License, Version 1.1
   *
   *
   * Copyright (c) 2001 The Apache Software Foundation.  All rights
   * reserved.
   *
   * Redistribution and use in source and binary forms, with or without
   * modification, are permitted provided that the following conditions
   * are met:
   *
   * 1. Redistributions of source code must retain the above copyright
   *    notice, this list of conditions and the following disclaimer.
   *
   * 2. Redistributions in binary form must reproduce the above copyright
   *    notice, this list of conditions and the following disclaimer in
   *    the documentation and/or other materials provided with the
   *    distribution.
   *
   * 3. The end-user documentation included with the redistribution,
   *    if any, must include the following acknowledgment:
   *       "This product includes software developed by the
   *        Apache Software Foundation (http://www.apache.org/)."
   *    Alternately, this acknowledgment may appear in the software itself,
   *    if and wherever such third-party acknowledgments normally appear.
   *
   * 4. The names "Axis" and "Apache Software Foundation" must
   *    not be used to endorse or promote products derived from this
   *    software without prior written permission. For written
   *    permission, please contact apache@apache.org.
   *
   * 5. Products derived from this software may not be called "Apache",
   *    nor may "Apache" appear in their name, without prior written
   *    permission of the Apache Software Foundation.
   *
   * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
   * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
   * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
   * DISCLAIMED.  IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR
   * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
   * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
   * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
   * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
   * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
   * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
   * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
   * SUCH DAMAGE.
   * ====================================================================
   *
   * This software consists of voluntary contributions made by many
   * individuals on behalf of the Apache Software Foundation.  For more
   * information on the Apache Software Foundation, please see
   * <http://www.apache.org/>.
   */
  package org.apache.axis.ime.internal;
  
  import org.apache.axis.MessageContext;
  import org.apache.axis.ime.MessageExchangeCorrelator;
  import org.apache.axis.ime.MessageExchangeFaultListener;
  import org.apache.axis.ime.MessageExchangeStatusListener;
  
  import java.io.Serializable;
  
  /**
   * Note: the only challenge with making this class serializable
   * is that org.apache.axis.MessageContext is currently NOT
   * serializable.  MessageContext needs to change in order to 
   * take advantage of persistent Channels and CorrelatorServices
   * 
   * For thread safety, instances of this class are immutable
   * 
   * @author James M Snell (jasnell@us.ibm.com)
   */
  public final class MessageExchangeSendContext
          implements Serializable {
  
      public static MessageExchangeSendContext newInstance(
              MessageExchangeCorrelator correlator,
              MessageContext context,
              MessageExchangeFaultListener faultListener,
              MessageExchangeStatusListener statusListener) {
          MessageExchangeSendContext mectx =
                  new MessageExchangeSendContext();
          mectx.correlator = correlator;
          mectx.context = context;
          mectx.faultListener = faultListener;
          mectx.statusListener = statusListener;
          return mectx;
      }
  
      protected MessageExchangeCorrelator correlator;
      protected MessageExchangeFaultListener faultListener;
      protected MessageExchangeStatusListener statusListener;
      protected MessageContext context;
  
      protected MessageExchangeSendContext() {
      }
  
      public MessageExchangeCorrelator getMessageExchangeCorrelator() {
          return this.correlator;
      }
  
      public MessageContext getMessageContext() {
          return this.context;
      }
  
      public MessageExchangeFaultListener getMessageExchangeFaultListener() {
          return this.faultListener;
      }
  
      public MessageExchangeStatusListener getMessageExchangeStatusListener() {
          return this.statusListener;
      }    
  }
  
  
  
  1.1                  xml-axis/java/src/org/apache/axis/ime/internal/MessageExchangeSendListener.java
  
  Index: MessageExchangeSendListener.java
  ===================================================================
  /*
   * The Apache Software License, Version 1.1
   *
   *
   * Copyright (c) 2001 The Apache Software Foundation.  All rights
   * reserved.
   *
   * Redistribution and use in source and binary forms, with or without
   * modification, are permitted provided that the following conditions
   * are met:
   *
   * 1. Redistributions of source code must retain the above copyright
   *    notice, this list of conditions and the following disclaimer.
   *
   * 2. Redistributions in binary form must reproduce the above copyright
   *    notice, this list of conditions and the following disclaimer in
   *    the documentation and/or other materials provided with the
   *    distribution.
   *
   * 3. The end-user documentation included with the redistribution,
   *    if any, must include the following acknowledgment:
   *       "This product includes software developed by the
   *        Apache Software Foundation (http://www.apache.org/)."
   *    Alternately, this acknowledgment may appear in the software itself,
   *    if and wherever such third-party acknowledgments normally appear.
   *
   * 4. The names "Axis" and "Apache Software Foundation" must
   *    not be used to endorse or promote products derived from this
   *    software without prior written permission. For written
   *    permission, please contact apache@apache.org.
   *
   * 5. Products derived from this software may not be called "Apache",
   *    nor may "Apache" appear in their name, without prior written
   *    permission of the Apache Software Foundation.
   *
   * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
   * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
   * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
   * DISCLAIMED.  IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR
   * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
   * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
   * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
   * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
   * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
   * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
   * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
   * SUCH DAMAGE.
   * ====================================================================
   *
   * This software consists of voluntary contributions made by many
   * individuals on behalf of the Apache Software Foundation.  For more
   * information on the Apache Software Foundation, please see
   * <http://www.apache.org/>.
   */
  package org.apache.axis.ime.internal;
  
  import java.io.Serializable;
  
  /**
   * @author James M Snell (jasnell@us.ibm.com)
   */
  public interface MessageExchangeSendListener
          extends Serializable {
  
      public void onSend(
              MessageExchangeSendContext context);
  
  }
  
  
  
  1.1                  xml-axis/java/src/org/apache/axis/ime/internal/FirstComeFirstServeDispatchPolicy.java
  
  Index: FirstComeFirstServeDispatchPolicy.java
  ===================================================================
  /*
   * The Apache Software License, Version 1.1
   *
   *
   * Copyright (c) 2001 The Apache Software Foundation.  All rights
   * reserved.
   *
   * Redistribution and use in source and binary forms, with or without
   * modification, are permitted provided that the following conditions
   * are met:
   *
   * 1. Redistributions of source code must retain the above copyright
   *    notice, this list of conditions and the following disclaimer.
   *
   * 2. Redistributions in binary form must reproduce the above copyright
   *    notice, this list of conditions and the following disclaimer in
   *    the documentation and/or other materials provided with the
   *    distribution.
   *
   * 3. The end-user documentation included with the redistribution,
   *    if any, must include the following acknowledgment:
   *       "This product includes software developed by the
   *        Apache Software Foundation (http://www.apache.org/)."
   *    Alternately, this acknowledgment may appear in the software itself,
   *    if and wherever such third-party acknowledgments normally appear.
   *
   * 4. The names "Axis" and "Apache Software Foundation" must
   *    not be used to endorse or promote products derived from this
   *    software without prior written permission. For written
   *    permission, please contact apache@apache.org.
   *
   * 5. Products derived from this software may not be called "Apache",
   *    nor may "Apache" appear in their name, without prior written
   *    permission of the Apache Software Foundation.
   *
   * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
   * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
   * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
   * DISCLAIMED.  IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR
   * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
   * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
   * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
   * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
   * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
   * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
   * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
   * SUCH DAMAGE.
   * ====================================================================
   *
   * This software consists of voluntary contributions made by many
   * individuals on behalf of the Apache Software Foundation.  For more
   * information on the Apache Software Foundation, please see
   * <http://www.apache.org/>.
   */
  package org.apache.axis.ime.internal;
  
  import org.apache.axis.MessageContext;
  import org.apache.axis.ime.MessageExchangeCorrelator;
  import org.apache.axis.ime.MessageContextListener;
  import org.apache.axis.ime.MessageExchangeFaultListener;
  import org.apache.axis.ime.internal.util.KeyedBuffer;
  
  /**
   * @author James M Snell (jasnell@us.ibm.com)
   */
  public class FirstComeFirstServeDispatchPolicy
          implements ReceivedMessageDispatchPolicy {
    
      protected KeyedBuffer RECEIVE_REQUESTS;
      protected KeyedBuffer RECEIVE;
      
      public FirstComeFirstServeDispatchPolicy(
              KeyedBuffer RECEIVE,
              KeyedBuffer RECEIVE_REQUESTS) {
          this.RECEIVE = RECEIVE;
          this.RECEIVE_REQUESTS = RECEIVE_REQUESTS;
      }
    
      public void dispatch(
              MessageExchangeSendContext context) {
      
        // 1. Get the correlator
        // 2. See if there are any receive requests based on the correlator
        // 3. If there are receive requests for the correlator, deliver to the first one
        // 4. If there are no receive requests for the correlator, deliver to the first "anonymous" receive request
        // 5. If there are no receive requests, put the message back on the Queue
        
          MessageExchangeReceiveContext receiveContext = null;
          MessageExchangeCorrelator correlator =
              context.getMessageExchangeCorrelator();
          receiveContext = (MessageExchangeReceiveContext)RECEIVE_REQUESTS.get(correlator);
          if (receiveContext == null) {
              receiveContext = (MessageExchangeReceiveContext)RECEIVE_REQUESTS.get();
          }
          if (receiveContext == null) 
              RECEIVE.put(correlator,context);
          else {
              MessageExchangeFaultListener faultListener = 
                receiveContext.getMessageExchangeFaultListener();
              MessageContextListener contextListener = 
                receiveContext.getMessageContextListener();
              MessageContext msgContext = 
                context.getMessageContext();
              try {
                  contextListener.onReceive(
                      correlator, msgContext);       
              } catch (Exception exception) {
                if (faultListener != null)
                    faultListener.onFault(
                        correlator, exception);
              }
          }
      }
    
  }
  
  
  
  1.1                  xml-axis/java/src/org/apache/axis/ime/internal/MessageExchangeReceiveContext.java
  
  Index: MessageExchangeReceiveContext.java
  ===================================================================
  package org.apache.axis.ime.internal;
  
  import org.apache.axis.ime.MessageContextListener;
  import org.apache.axis.ime.MessageExchangeFaultListener;
  import org.apache.axis.ime.MessageExchangeStatusListener;
  import org.apache.axis.ime.MessageExchangeCorrelator;
  
  /**
   * @author James M Snell (jasnell@us.ibm.com)
   */
  public class MessageExchangeReceiveContext {
  
      public static MessageExchangeReceiveContext newInstance(
              MessageExchangeCorrelator correlator,
              MessageContextListener listener,
              MessageExchangeFaultListener faultListener,
              MessageExchangeStatusListener statusListener) {
          MessageExchangeReceiveContext mectx =
                  new MessageExchangeReceiveContext();
          mectx.correlator = correlator;
          mectx.listener = listener;
          mectx.faultListener = faultListener;
          mectx.statusListener = statusListener;
          return mectx;
      }
  
    protected MessageContextListener listener;
    protected MessageExchangeFaultListener faultListener;
    protected MessageExchangeStatusListener statusListener;
    protected MessageExchangeCorrelator correlator;
  
    protected MessageExchangeReceiveContext() {}
    
      public MessageExchangeCorrelator getMessageExchangeCorrelator() {
          return this.correlator;
      }
  
      public MessageContextListener getMessageContextListener() {
          return this.listener;
      }
  
      public MessageExchangeFaultListener getMessageExchangeFaultListener() {
          return this.faultListener;
      }
      
      public MessageExchangeStatusListener getMessageExchangeStatusListener() {
          return this.statusListener;
      } 
  }
  
  
  
  1.1                  xml-axis/java/src/org/apache/axis/ime/internal/ReceivedMessageDispatchPolicy.java
  
  Index: ReceivedMessageDispatchPolicy.java
  ===================================================================
  /*
   * The Apache Software License, Version 1.1
   *
   *
   * Copyright (c) 2001 The Apache Software Foundation.  All rights
   * reserved.
   *
   * Redistribution and use in source and binary forms, with or without
   * modification, are permitted provided that the following conditions
   * are met:
   *
   * 1. Redistributions of source code must retain the above copyright
   *    notice, this list of conditions and the following disclaimer.
   *
   * 2. Redistributions in binary form must reproduce the above copyright
   *    notice, this list of conditions and the following disclaimer in
   *    the documentation and/or other materials provided with the
   *    distribution.
   *
   * 3. The end-user documentation included with the redistribution,
   *    if any, must include the following acknowledgment:
   *       "This product includes software developed by the
   *        Apache Software Foundation (http://www.apache.org/)."
   *    Alternately, this acknowledgment may appear in the software itself,
   *    if and wherever such third-party acknowledgments normally appear.
   *
   * 4. The names "Axis" and "Apache Software Foundation" must
   *    not be used to endorse or promote products derived from this
   *    software without prior written permission. For written
   *    permission, please contact apache@apache.org.
   *
   * 5. Products derived from this software may not be called "Apache",
   *    nor may "Apache" appear in their name, without prior written
   *    permission of the Apache Software Foundation.
   *
   * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
   * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
   * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
   * DISCLAIMED.  IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR
   * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
   * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
   * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
   * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
   * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
   * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
   * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
   * SUCH DAMAGE.
   * ====================================================================
   *
   * This software consists of voluntary contributions made by many
   * individuals on behalf of the Apache Software Foundation.  For more
   * information on the Apache Software Foundation, please see
   * <http://www.apache.org/>.
   */
  package org.apache.axis.ime.internal;
  
  /**
   * @author James M Snell (jasnell@us.ibm.com)
   */
  public interface ReceivedMessageDispatchPolicy {
  
      public void dispatch(
              MessageExchangeSendContext context);
  
  }
  
  
  
  1.5       +54 -65    xml-axis/java/src/org/apache/axis/ime/MessageExchange.java
  
  Index: MessageExchange.java
  ===================================================================
  RCS file: /home/cvs/xml-axis/java/src/org/apache/axis/ime/MessageExchange.java,v
  retrieving revision 1.4
  retrieving revision 1.5
  diff -u -r1.4 -r1.5
  --- MessageExchange.java	28 Oct 2002 21:59:45 -0000	1.4
  +++ MessageExchange.java	29 Oct 2002 05:15:29 -0000	1.5
  @@ -80,22 +80,23 @@
               throws AxisFault;
   
       /**
  -     * Will attempt to cancel the outbound MessageExchange 
  -     * process for a given message context. Returns true if 
  -     * an only if the MessageContext was canceled.  A false 
  -     * response indicates that the MessageContext could not 
  -     * be removed from the outbound channel for whatever 
  -     * reason.
  -     * @param MessageExchangeCorrelator The correlator for the message being canceled
  -     * @return MessageContext The canceled MessageContext
  +     * Send an outbound message.  (Impl's of this method
  +     * need to create a new MessageExchangeCorrelator and 
  +     * put it into the MessageContext if one does not already
  +     * exist.)
  +     * @param MessageContext The Axis MessageContext being sent
  +     * @param MessageContextListener The listener to which responses, faults, and status updates should be delivered
  +     * @return MessageExchangeCorrelator The correlator for the sent MessageContext
        * @throws AxisFault
        */
  -    public MessageContext cancel(
  -            MessageExchangeCorrelator correlator)
  +    public MessageExchangeCorrelator send(
  +            MessageContext context,
  +            MessageContextListener listener)
               throws AxisFault;
   
       /**
        * Waits indefinitely for a message to be received
  +     * (blocking)
        * @return MessageContext The received MessageContext
        * @throws AxisFault
        */
  @@ -105,6 +106,7 @@
       /**
        * Waits the specified amount of time for a message to 
        * be received
  +     * (blocking)
        * @param long The amount of time (ms) to wait
        * @return MessageContext The received MessageContext
        * @throws AxisFault
  @@ -114,29 +116,51 @@
               throws AxisFault;
   
       /**
  -     * Will instruct the MessageExchange provider to 
  -     * wait for a message to be received.
  +     * Waits indefinitely for a message matching the 
  +     * specified correlator
  +     * (blocking)
  +     * @param MessageExchangeCorrelator
  +     * @return MessageContext
        * @throws AxisFault
        */
  -    public void startListening()
  +    public MessageContext receive(
  +            MessageExchangeCorrelator correlator)
               throws AxisFault;
   
       /**
  -     * Will instruct the MessageExchange provider to
  -     * wait for a specific MessageExchangeCorrelator
  -     * @param MessageExchangeCorrelator The correlator of the MessageContext to listen for
  +     * Waits the specified amount of time for a message matching the 
  +     * specified correlator
  +     * (blocking)
  +     * @param MessageExchangeCorrelator
  +     * @param long timeout
  +     * @returns MessageContext
        * @throws AxisFault
        */
  -    public void startListening(
  -            MessageExchangeCorrelator correlator)
  +    public MessageContext receive(
  +            MessageExchangeCorrelator correlator,
  +            long timeout)
               throws AxisFault;
   
       /**
  -     * Will instruct the MessageExchange provider to 
  -     * stop listening
  +     * Registers a listener for receiving messages
  +     * (nonblocking)
  +     * @param MessageContextListener
        * @throws AxisFault
        */
  -    public void stopListening()
  +    public void receive(
  +            MessageContextListener listener)
  +            throws AxisFault;
  +
  +    /**
  +     * Registers a listener for receiving messages
  +     * (nonblocking)
  +     * @param MessageExchangeCorrelator
  +     * @param MessageContextListener
  +     * @throws AxisFault
  +     */            
  +    public void receive(
  +            MessageExchangeCorrelator correlator,
  +            MessageContextListener listener)
               throws AxisFault;
   
       /**
  @@ -161,49 +185,14 @@
               long timeout)
               throws AxisFault;
   
  -    /**
  -     * Allows applications to listen for changes to
  -     * the current disposition of the MessageExchange operation
  -     * (push model)
  -     * @param MessageExchangeStatusListener
  -     * @throws AxisFault
  -     */
  -    public void setMessageExchangeStatusListener(
  -            MessageExchangeStatusListener listener)
  -            throws AxisFault;
  -
  -    /**
  -     * Allows applications to listen for inbound messages
  -     * (push model)
  -     * @param MessageExchangeReceiveListener
  -     * @throws AxisFault
  -     */
  -    public void setMessageExchangeReceiveListener(
  -            MessageExchangeReceiveListener listener)
  -            throws AxisFault;
  -
  -    /**
  -     * Allows applications to listen for faults/exceptions
  -     * (push model)
  -     * @param MessageExchangeFaultListener
  -     * @throws AxisFault
  -     */
       public void setMessageExchangeFaultListener(
  -            MessageExchangeFaultListener listener)
  -            throws AxisFault;
  -
  -    /**
  -     * Allows MessageExchange consumers low level access
  -     * to the Send message channel
  -     * @return MessageChannel
  -     */
  -    public MessageChannel getSendChannel();
  -
  -    /**
  -     * Allows MessageExchange consumers low level access
  -     * to the Receive message channel
  -     * @return MessageChannel
  -     */
  -    public MessageChannel getReceiveChannel();
  -
  +            MessageExchangeFaultListener listener);
  +            
  +    public MessageExchangeFaultListener getMessageExchangeFaultListener();
  +    
  +    public void setMessageExchangeStatusListener(
  +            MessageExchangeStatusListener listener);
  +            
  +    public MessageExchangeStatusListener getMessageExchangeStatusListener();
  +        
   }
  
  
  
  1.4       +2 -2      xml-axis/java/src/org/apache/axis/ime/MessageExchangeCorrelatorService.java
  
  Index: MessageExchangeCorrelatorService.java
  ===================================================================
  RCS file: /home/cvs/xml-axis/java/src/org/apache/axis/ime/MessageExchangeCorrelatorService.java,v
  retrieving revision 1.3
  retrieving revision 1.4
  diff -u -r1.3 -r1.4
  --- MessageExchangeCorrelatorService.java	28 Oct 2002 21:45:59 -0000	1.3
  +++ MessageExchangeCorrelatorService.java	29 Oct 2002 05:15:30 -0000	1.4
  @@ -61,9 +61,9 @@
   
       public void put(
               MessageExchangeCorrelator correlator,
  -            MessageExchangeContext context);
  +            Object context);
   
  -    public MessageExchangeContext get(
  +    public Object get(
               MessageExchangeCorrelator correlator);
   
   }
  
  
  
  1.1                  xml-axis/java/src/org/apache/axis/ime/MessageContextListener.java
  
  Index: MessageContextListener.java
  ===================================================================
  /*
   * The Apache Software License, Version 1.1
   *
   *
   * Copyright (c) 2001 The Apache Software Foundation.  All rights
   * reserved.
   *
   * Redistribution and use in source and binary forms, with or without
   * modification, are permitted provided that the following conditions
   * are met:
   *
   * 1. Redistributions of source code must retain the above copyright
   *    notice, this list of conditions and the following disclaimer.
   *
   * 2. Redistributions in binary form must reproduce the above copyright
   *    notice, this list of conditions and the following disclaimer in
   *    the documentation and/or other materials provided with the
   *    distribution.
   *
   * 3. The end-user documentation included with the redistribution,
   *    if any, must include the following acknowledgment:
   *       "This product includes software developed by the
   *        Apache Software Foundation (http://www.apache.org/)."
   *    Alternately, this acknowledgment may appear in the software itself,
   *    if and wherever such third-party acknowledgments normally appear.
   *
   * 4. The names "Axis" and "Apache Software Foundation" must
   *    not be used to endorse or promote products derived from this
   *    software without prior written permission. For written
   *    permission, please contact apache@apache.org.
   *
   * 5. Products derived from this software may not be called "Apache",
   *    nor may "Apache" appear in their name, without prior written
   *    permission of the Apache Software Foundation.
   *
   * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
   * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
   * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
   * DISCLAIMED.  IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR
   * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
   * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
   * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
   * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
   * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
   * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
   * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
   * SUCH DAMAGE.
   * ====================================================================
   *
   * This software consists of voluntary contributions made by many
   * individuals on behalf of the Apache Software Foundation.  For more
   * information on the Apache Software Foundation, please see
   * <http://www.apache.org/>.
   */
  package org.apache.axis.ime;
  
  import java.io.Serializable;
  import org.apache.axis.MessageContext;
  
  /**
   * @author James M Snell (jasnell@us.ibm.com)
   */
  public abstract class MessageContextListener
          implements Serializable {
  
      public void onFault(
              MessageExchangeCorrelator correlator,
              Throwable exception) {}
  
      public void onReceive(
              MessageExchangeCorrelator correlator,
              MessageContext context) {}
  
      public void onStatus(
              MessageExchangeCorrelator correlator,
              MessageExchangeStatus status) {}
  
  }
  
  
  
  1.1                  xml-axis/java/src/org/apache/axis/ime/internal/util/KeyedBuffer.java
  
  Index: KeyedBuffer.java
  ===================================================================
  /*
   * The Apache Software License, Version 1.1
   *
   *
   * Copyright (c) 2001 The Apache Software Foundation.  All rights
   * reserved.
   *
   * Redistribution and use in source and binary forms, with or without
   * modification, are permitted provided that the following conditions
   * are met:
   *
   * 1. Redistributions of source code must retain the above copyright
   *    notice, this list of conditions and the following disclaimer.
   *
   * 2. Redistributions in binary form must reproduce the above copyright
   *    notice, this list of conditions and the following disclaimer in
   *    the documentation and/or other materials provided with the
   *    distribution.
   *
   * 3. The end-user documentation included with the redistribution,
   *    if any, must include the following acknowledgment:
   *       "This product includes software developed by the
   *        Apache Software Foundation (http://www.apache.org/)."
   *    Alternately, this acknowledgment may appear in the software itself,
   *    if and wherever such third-party acknowledgments normally appear.
   *
   * 4. The names "Axis" and "Apache Software Foundation" must
   *    not be used to endorse or promote products derived from this
   *    software without prior written permission. For written
   *    permission, please contact apache@apache.org.
   *
   * 5. Products derived from this software may not be called "Apache",
   *    nor may "Apache" appear in their name, without prior written
   *    permission of the Apache Software Foundation.
   *
   * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
   * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
   * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
   * DISCLAIMED.  IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR
   * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
   * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
   * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
   * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
   * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
   * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
   * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
   * SUCH DAMAGE.
   * ====================================================================
   *
   * This software consists of voluntary contributions made by many
   * individuals on behalf of the Apache Software Foundation.  For more
   * information on the Apache Software Foundation, please see
   * <http://www.apache.org/>.
   */
  package org.apache.axis.ime.internal.util;
  
  /**
   * A KeyedBuffer is a low level hybrid FIFO Queue and Keyed map
   * Each MessageExchange implementation will create at least two
   * KeyedBuffer's, one for messages being sent, and another for
   * messages that have been received.
   * 
   * KeyedBuffers differ from traditional FIFO Queues in that 
   * elements put in are keyed and can be taken out of order.
   * 
   * Different implementations may allow for variations on 
   * how the KeyedBuffer model is implemented.  For instance,
   * the code will ship with a NonPersistentKeyedBuffer that
   * will store all contained objects in memory.  The fact that 
   * everything is stored in memory means that the buffer is not 
   * fault tolerant.  If fault tolerance is required, then a 
   * Persistent KeyedBuffer must be created that persists the 
   * objects somehow.
   * 
   * @author James M Snell (jasnell@us.ibm.com)
   */
  public interface KeyedBuffer {
  
      /**
       * Select, but do not remove the next message on the
       * channel.  If one does not exist, return null
       */
      public Object peek();
  
      /**
       * Put a message onto the channel
       */
      public void put(
              Object key,
              Object context);
  
      /**
       * Cancel a message that has been put on the channel.
       * Unlike select(Object key), this method will not block
       * and wait for a message with the specified key to be
       * put onto the MessageChannel.
       */
      public Object cancel(
              Object key);
  
      /**
       * Select and remove all of the messages currently in
       * the channel (useful for bulk operations).  This 
       * method will not block.  It is also not guaranteed
       * that the Channel will be empty once this operation
       * returns (it is possible that another thread may 
       * put new MessageContexts into the channel before this
       * operation completes)
       */
      public Object[] selectAll();
  
      /**
       * Select and remove the next message in the channel
       * If a message is not available, wait indefinitely for one
       */
      public Object select()
              throws InterruptedException;
  
      /**
       * Select and remove the next message in the channel
       * If a message is not available, wait the specified amount
       * of time for one
       */
      public Object select(
              long timeout)
              throws InterruptedException;
  
      /**
       * Select and remove a specific message in the channel
       * If the message is not available, wait indefinitely 
       * for one to be available
       */
      public Object select(
              Object key)
              throws InterruptedException;
  
      /**
       * Select and remove a specific message in the channel
       * If the message is not available, wait the specified 
       * amount of time for one
       */
      public Object select(
              Object key,
              long timeout)
              throws InterruptedException;
      
      /**
       * Select and remove the next object in the buffer
       * (does not wait for a message to be put into the buffer)
       */        
      public Object get();
  
      /**
       * Select and remove the specified object in the buffer
       * (does not wait for a message to be put into the buffer)
       */            
      public Object get(Object key);
      
  }
  
  
  
  1.1                  xml-axis/java/src/org/apache/axis/ime/internal/util/WorkerPool.java
  
  Index: WorkerPool.java
  ===================================================================
  /*
   * The Apache Software License, Version 1.1
   *
   *
   * Copyright (c) 2001 The Apache Software Foundation.  All rights
   * reserved.
   *
   * Redistribution and use in source and binary forms, with or without
   * modification, are permitted provided that the following conditions
   * are met:
   *
   * 1. Redistributions of source code must retain the above copyright
   *    notice, this list of conditions and the following disclaimer.
   *
   * 2. Redistributions in binary form must reproduce the above copyright
   *    notice, this list of conditions and the following disclaimer in
   *    the documentation and/or other materials provided with the
   *    distribution.
   *
   * 3. The end-user documentation included with the redistribution,
   *    if any, must include the following acknowledgment:
   *       "This product includes software developed by the
   *        Apache Software Foundation (http://www.apache.org/)."
   *    Alternately, this acknowledgment may appear in the software itself,
   *    if and wherever such third-party acknowledgments normally appear.
   *
   * 4. The names "Axis" and "Apache Software Foundation" must
   *    not be used to endorse or promote products derived from this
   *    software without prior written permission. For written
   *    permission, please contact apache@apache.org.
   *
   * 5. Products derived from this software may not be called "Apache",
   *    nor may "Apache" appear in their name, without prior written
   *    permission of the Apache Software Foundation.
   *
   * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
   * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
   * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
   * DISCLAIMED.  IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR
   * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
   * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
   * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
   * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
   * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
   * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
   * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
   * SUCH DAMAGE.
   * ====================================================================
   *
   * This software consists of voluntary contributions made by many
   * individuals on behalf of the Apache Software Foundation.  For more
   * information on the Apache Software Foundation, please see
   * <http://www.apache.org/>.
   */
  
  package org.apache.axis.ime.internal.util;
  
  import org.apache.axis.i18n.Messages;
  
  import java.util.Hashtable;
  import java.util.Iterator;
  import java.util.Map;
  
  /**
   * @author James M Snell (jasnell@us.ibm.com)
   */
  public class WorkerPool {
  
      public static final long MAX_THREADS = 100;
      
      protected Map threads = new Hashtable();
      protected boolean interrupt;
      protected long threadcount;
      public boolean _shutdown;
  
      /**
       * Returns true if all workers have been shutdown
       */
      public boolean isShutdown() {
          synchronized (this) {
              return _shutdown && threadcount == 0;
          }
      }
  
      /**
       * Returns true if all workers are in the process of shutting down
       */
      public boolean isShuttingDown() {
          synchronized (this) {
              return _shutdown;
          }
      }
  
      /**
       * Returns the total number of currently active workers
       */
      public long getWorkerCount() {
          synchronized (this) {
              return threadcount;
          }
      }
  
      /**
       * Adds a new worker to the pool
       */
      public void addWorker(
              Runnable worker) {
          if (_shutdown ||
              threadcount == MAX_THREADS)
              throw new IllegalStateException(Messages.getMessage("illegalStateException00"));
          Thread thread = new Thread(worker);
          threads.put(worker, thread);
          threadcount++;
          thread.start();
      }
  
      /**
       * Forcefully interrupt all workers
       */
      public void interruptAll() {
          synchronized (threads) {
              for (Iterator i = threads.values().iterator(); i.hasNext();) {
                  Thread t = (Thread) i.next();
                  t.interrupt();
              }
          }
      }
  
      /**
       * Forcefully shutdown the pool
       */
      public void shutdown() {
          synchronized (this) {
              _shutdown = true;
          }
          interruptAll();
      }
  
      /**
       * Forcefully shutdown the pool
       */
      public void safeShutdown() {
          synchronized (this) {
              _shutdown = true;
          }
      }
  
      /**
       * Await shutdown of the worker
       */
      public synchronized void awaitShutdown()
              throws InterruptedException {
          if (!_shutdown)
              throw new IllegalStateException(Messages.getMessage("illegalStateException00"));
          while (threadcount > 0)
              wait();
      }
  
      /**
       * Await shutdown of the worker
       */
      public synchronized boolean awaitShutdown(long timeout)
              throws InterruptedException {
          if (!_shutdown)
              throw new IllegalStateException(Messages.getMessage("illegalStateException00"));
          if (threadcount == 0)
              return true;
          long waittime = timeout;
          if (waittime <= 0)
              return false;
          long start = System.currentTimeMillis();
          for (; ;) {
              wait(waittime);
              if (threadcount == 0)
                  return true;
              waittime = timeout - System.currentTimeMillis();
              if (waittime <= 0)
                  return false;
          }
      }
  
      /**
       * Used by MessageWorkers to notify the pool that it is done
       */
      public synchronized void workerDone(
              Runnable worker) {
          threads.remove(worker);
          if (--threadcount == 0 && _shutdown) {
              notifyAll();
          }
          if (!_shutdown) {
              addWorker(worker);
          }
      }
  }
  
  
  
  
  1.1                  xml-axis/java/src/org/apache/axis/ime/internal/util/NonPersistentKeyedBuffer.java
  
  Index: NonPersistentKeyedBuffer.java
  ===================================================================
  /*
   * The Apache Software License, Version 1.1
   *
   *
   * Copyright (c) 2001 The Apache Software Foundation.  All rights
   * reserved.
   *
   * Redistribution and use in source and binary forms, with or without
   * modification, are permitted provided that the following conditions
   * are met:
   *
   * 1. Redistributions of source code must retain the above copyright
   *    notice, this list of conditions and the following disclaimer.
   *
   * 2. Redistributions in binary form must reproduce the above copyright
   *    notice, this list of conditions and the following disclaimer in
   *    the documentation and/or other materials provided with the
   *    distribution.
   *
   * 3. The end-user documentation included with the redistribution,
   *    if any, must include the following acknowledgment:
   *       "This product includes software developed by the
   *        Apache Software Foundation (http://www.apache.org/)."
   *    Alternately, this acknowledgment may appear in the software itself,
   *    if and wherever such third-party acknowledgments normally appear.
   *
   * 4. The names "Axis" and "Apache Software Foundation" must
   *    not be used to endorse or promote products derived from this
   *    software without prior written permission. For written
   *    permission, please contact apache@apache.org.
   *
   * 5. Products derived from this software may not be called "Apache",
   *    nor may "Apache" appear in their name, without prior written
   *    permission of the Apache Software Foundation.
   *
   * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
   * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
   * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
   * DISCLAIMED.  IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR
   * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
   * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
   * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
   * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
   * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
   * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
   * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
   * SUCH DAMAGE.
   * ====================================================================
   *
   * This software consists of voluntary contributions made by many
   * individuals on behalf of the Apache Software Foundation.  For more
   * information on the Apache Software Foundation, please see
   * <http://www.apache.org/>.
   */
  
  package org.apache.axis.ime.internal.util;
  
  import org.apache.axis.i18n.Messages;
  import java.util.Vector;
  
  /**
   * Creates a non-persistent KeyedBuffer.  Queued messages
   * are stored in memory. If the buffer instance is destroyed,
   * so is the Queue.
   * 
   * @author James M Snell (jasnell@us.ibm.com)
   */
  public class NonPersistentKeyedBuffer
          implements KeyedBuffer {
  
      private final KeyedQueue messages = new KeyedQueue();
  
      private WorkerPool WORKERS;
  
      public NonPersistentKeyedBuffer(
              WorkerPool workers) {
          this.WORKERS = workers;
      }
  
      public Object peek() {
          KeyedNode node = null;
          synchronized (messages) {
              node = messages.peek();
          }
          if (node != null) {
              return node.value;
          } else {
              return null;
          }
      }
  
      public void put(
              Object key,
              Object object) {
  
          if (key == null ||
                  object == null)
              throw new IllegalArgumentException(Messages.getMessage("illegalArgumentException00"));
  
          synchronized (messages) {
              messages.put(new KeyedNode(key, object));
              messages.notify();
          }
      }
  
      public Object cancel(Object key) {
          if (key == null)
              throw new IllegalArgumentException(Messages.getMessage("illegalArgumentException00"));
          Object object = null;
          synchronized (messages) {
              KeyedNode node = messages.select(key); // will attempt to find and remove
              if (node != null)
                  object = node.value;
              node.key = null;
              node.value = null;
          }
          return object;
      }
  
      public Object[] selectAll() {
          Vector v = new Vector();
          KeyedNode node = null;
          synchronized (messages) {
              while ((node = messages.select()) != null) {
                  v.add(node.value);
                  node.key = null;
                  node.value = null;
              }
          }
          Object[] objects = new
                  Object[v.size()];
          v.copyInto(objects);
          return objects;
      }
  
      public Object select()
              throws InterruptedException {
          for (; ;) {
              if (WORKERS.isShuttingDown())
                  throw new IllegalStateException(Messages.getMessage("illegalStateException00"));
              KeyedNode node = null;
              synchronized (messages) {
                  node = messages.select();
              }
              if (node != null) {
                  Object object = node.value;
                  node.key = null;
                  node.value = null;
                  return object;
              } else {
                  messages.wait();
              }
          }
      }
  
      public Object select(long timeout)
              throws InterruptedException {
          for (; ;) {
              if (WORKERS.isShuttingDown())
                  throw new IllegalStateException(Messages.getMessage("illegalStateException00"));
              KeyedNode node = null;
              synchronized (messages) {
                  node = messages.select();
              }
              if (node != null) {
                  Object object = node.value;
                  node.key = null;
                  node.value = null;
                  return object;
              } else {
                  messages.wait(timeout);
              }
          }
      }
  
      public Object select(Object key)
              throws InterruptedException {
          for (; ;) {
              if (WORKERS.isShuttingDown())
                  throw new IllegalStateException(Messages.getMessage("illegalStateException00"));
              KeyedNode node = null;
              synchronized (messages) {
                  node = messages.select(key);
              }
              if (node != null) {
                  Object object = node.value;
                  node.key = null;
                  node.value = null;
                  return object;
              } else {
                  messages.wait();
              }
          }
      }
  
      public Object select(Object key, long timeout)
              throws InterruptedException {
          for (; ;) {
              if (WORKERS.isShuttingDown())
                  throw new IllegalStateException(Messages.getMessage("illegalStateException00"));
              KeyedNode node = null;
              synchronized (messages) {
                  node = messages.select(key);
              }
              if (node != null) {
                  Object object = node.value;
                  node.key = null;
                  node.value = null;
                  return object;
              } else {
                  messages.wait(timeout);
              }
          }
      }
  
      public Object get() {
          KeyedNode node = null;
          Object object = null;
          synchronized (messages) {
              node = messages.select();
          }
          if (node != null) {
              object = node.value;
              node.key = null;
              node.value = null;
          }
          return object;
      }
  
      public Object get(Object key) {
          KeyedNode node = null;
          Object object = null;
          synchronized (messages) {
              node = messages.select(key);
          }
          if (node != null) {
              object = node.value;
              node.key = null;
              node.value = null;
          }
          return object;
      }
      
  /// Support Classes ///
      protected static class KeyedNode {
          public Object key;
          public Object value;
          public KeyedNode next;
  
          public KeyedNode() {
          }
  
          public KeyedNode(
                  Object key,
                  Object value) {
              this.key = key;
              this.value = value;
          }
  
          public KeyedNode(
                  Object key,
                  Object value,
                  KeyedNode next) {
              this(key, value);
              this.next = next;
          }
      }
  
      protected static class KeyedQueue {
  
          protected KeyedNode head;
          protected KeyedNode last;
  
          protected void put(KeyedNode node) {
              if (last == null) {
                  last = head = node;
              } else {
                  last = last.next = node;
              }
          }
  
          protected KeyedNode select() {
              KeyedNode node = head;
              if (node != null && (head = node.next) == null) {
                  last = null;
              }
              if (node != null)
                  node.next = null;
              return node;
          }
  
          protected KeyedNode select(Object key) {
              KeyedNode previous = null;
              for (KeyedNode node = head; node != null; node = node.next) {
                  if (node.key.equals(key)) {
                      if (previous != null)
                          previous.next = node.next;
                      node.next = null;
                      return node;
                  }
                  previous = node;
              }
              return null;
          }
  
          protected KeyedNode peek() {
              KeyedNode node = head;
              return node;
          }
  
      }
  
  }