You are viewing a plain text version of this content. The canonical link for it is here.
Posted to java-dev@axis.apache.org by jm...@apache.org on 2002/10/29 06:15:30 UTC
cvs commit: xml-axis/java/src/org/apache/axis/ime/internal/util KeyedBuffer.java WorkerPool.java NonPersistentKeyedBuffer.java
jmsnell 2002/10/28 21:15:30
Modified: java/src/org/apache/axis/ime/internal
NonPersistentMessageExchangeCorrelatorService.java
MessageExchangeProvider.java
MessageExchangeImpl.java
java/src/org/apache/axis/ime MessageExchange.java
MessageExchangeCorrelatorService.java
Added: java/src/org/apache/axis/ime/internal/util/handler
HandlerWrapper.java
java/src/org/apache/axis/ime/internal
MessageExchangeSendContext.java
MessageExchangeSendListener.java
FirstComeFirstServeDispatchPolicy.java
MessageExchangeReceiveContext.java
ReceivedMessageDispatchPolicy.java
java/src/org/apache/axis/ime MessageContextListener.java
java/src/org/apache/axis/ime/internal/util KeyedBuffer.java
WorkerPool.java NonPersistentKeyedBuffer.java
Removed: java/src/org/apache/axis/ime/internal/util/handler
HandlerWrapper1.java HandlerWrapper2.java
java/src/org/apache/axis/ime/internal
MessageExchangeProvider1.java MessageWorker.java
MessageExchangeProvider2.java
NonPersistentMessageChannel.java
MessageWorkerGroup.java
java/src/org/apache/axis/ime MessageChannel.java
MessageExchangeReceiveListener.java
MessageExchangeContextListener.java
MessageExchangeContext.java
Log:
Some fairly significant changes here. I went through and simplified the
MessageExchange interface and org.apache.axis.ime package and reworked
the internal impl quite a bit to improve how the way sending and receiving
is done. This still needs to be tested, but everything builds.
Revision Changes Path
1.1 xml-axis/java/src/org/apache/axis/ime/internal/util/handler/HandlerWrapper.java
Index: HandlerWrapper.java
===================================================================
/*
* The Apache Software License, Version 1.1
*
*
* Copyright (c) 2001 The Apache Software Foundation. All rights
* reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in
* the documentation and/or other materials provided with the
* distribution.
*
* 3. The end-user documentation included with the redistribution,
* if any, must include the following acknowledgment:
* "This product includes software developed by the
* Apache Software Foundation (http://www.apache.org/)."
* Alternately, this acknowledgment may appear in the software itself,
* if and wherever such third-party acknowledgments normally appear.
*
* 4. The names "Axis" and "Apache Software Foundation" must
* not be used to endorse or promote products derived from this
* software without prior written permission. For written
* permission, please contact apache@apache.org.
*
* 5. Products derived from this software may not be called "Apache",
* nor may "Apache" appear in their name, without prior written
* permission of the Apache Software Foundation.
*
* THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
* WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
* OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR
* ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
* USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
* ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
* OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
* OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*/
package org.apache.axis.ime.internal.util.handler;
import org.apache.axis.Handler;
import org.apache.axis.MessageContext;
import org.apache.axis.ime.MessageExchangeCorrelator;
import org.apache.axis.ime.MessageContextListener;
import org.apache.axis.ime.MessageExchangeFaultListener;
import org.apache.axis.ime.internal.MessageExchangeProvider;
import org.apache.axis.ime.internal.MessageExchangeSendContext;
import org.apache.axis.ime.internal.MessageExchangeSendListener;
import org.apache.axis.ime.internal.ReceivedMessageDispatchPolicy;
import org.apache.axis.ime.internal.FirstComeFirstServeDispatchPolicy;
/**
* Used to wrap synchronous handlers (e.g. Axis 1.0 transports)
*
* @author James M Snell (jasnell@us.ibm.com)
*/
public class HandlerWrapper
extends MessageExchangeProvider {
private Handler handler;
public HandlerWrapper(Handler handler) {
this.handler = handler;
}
/**
* @see org.apache.axis.ime.internal.MessageExchangeProvider1#createSendMessageContextListener()
*/
protected MessageExchangeSendListener getMessageExchangeSendListener() {
return new Listener(handler);
}
protected ReceivedMessageDispatchPolicy getReceivedMessageDispatchPolicy() {
return new FirstComeFirstServeDispatchPolicy(RECEIVE, RECEIVE_REQUESTS);
}
public class Listener
implements MessageExchangeSendListener {
private Handler handler;
public Listener(Handler handler) {
this.handler = handler;
}
/**
* @see org.apache.axis.ime.MessageExchangeContextListener#onMessageExchangeContext(MessageExchangeContext)
*/
public void onSend(
MessageExchangeSendContext context) {
MessageExchangeFaultListener listener =
context.getMessageExchangeFaultListener();
try {
MessageContext msgContext =
context.getMessageContext();
MessageExchangeCorrelator correlator =
context.getMessageExchangeCorrelator();
// should I do init's and cleanup's in here?
handler.invoke(msgContext);
RECEIVE.put(correlator, context);
} catch (Exception exception) {
if (listener != null)
listener.onFault(
context.getMessageExchangeCorrelator(),
exception);
}
}
}
}
1.3 +3 -4 xml-axis/java/src/org/apache/axis/ime/internal/NonPersistentMessageExchangeCorrelatorService.java
Index: NonPersistentMessageExchangeCorrelatorService.java
===================================================================
RCS file: /home/cvs/xml-axis/java/src/org/apache/axis/ime/internal/NonPersistentMessageExchangeCorrelatorService.java,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -r1.2 -r1.3
--- NonPersistentMessageExchangeCorrelatorService.java 28 Oct 2002 13:50:40 -0000 1.2
+++ NonPersistentMessageExchangeCorrelatorService.java 29 Oct 2002 05:15:29 -0000 1.3
@@ -55,7 +55,6 @@
package org.apache.axis.ime.internal;
-import org.apache.axis.ime.MessageExchangeContext;
import org.apache.axis.ime.MessageExchangeCorrelator;
import org.apache.axis.ime.MessageExchangeCorrelatorService;
@@ -74,7 +73,7 @@
*/
public void put(
MessageExchangeCorrelator correlator,
- MessageExchangeContext context) {
+ Object context) {
synchronized (contexts) {
contexts.put(correlator, context);
}
@@ -83,9 +82,9 @@
/**
* @see org.apache.axis.ime.MessageExchangeCorrelatorService#get(MessageExchangeCorrelator)
*/
- public MessageExchangeContext get(MessageExchangeCorrelator correlator) {
+ public Object get(MessageExchangeCorrelator correlator) {
synchronized (contexts) {
- return (MessageExchangeContext) contexts.remove(correlator);
+ return contexts.remove(correlator);
}
}
1.3 +123 -11 xml-axis/java/src/org/apache/axis/ime/internal/MessageExchangeProvider.java
Index: MessageExchangeProvider.java
===================================================================
RCS file: /home/cvs/xml-axis/java/src/org/apache/axis/ime/internal/MessageExchangeProvider.java,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -r1.2 -r1.3
--- MessageExchangeProvider.java 28 Oct 2002 13:50:40 -0000 1.2
+++ MessageExchangeProvider.java 29 Oct 2002 05:15:29 -0000 1.3
@@ -55,38 +55,68 @@
package org.apache.axis.ime.internal;
-import org.apache.axis.ime.MessageChannel;
+import org.apache.axis.i18n.Messages;
+import org.apache.axis.MessageContext;
import org.apache.axis.ime.MessageExchange;
+import org.apache.axis.ime.MessageContextListener;
+import org.apache.axis.ime.MessageExchangeCorrelator;
import org.apache.axis.ime.MessageExchangeFactory;
+import org.apache.axis.ime.MessageExchangeFaultListener;
+import org.apache.axis.ime.internal.util.WorkerPool;
+import org.apache.axis.ime.internal.util.KeyedBuffer;
+import org.apache.axis.ime.internal.util.NonPersistentKeyedBuffer;
/**
- * Serves as a base class for MessageExchangeProviders that
- * need to thread pooling on send AND receive message
- * flows (as opposed to MessageExchangeProvider2 which only
- * does thread pooling on send flows).
- *
* @author James M Snell (jasnell@us.ibm.com)
*/
public abstract class MessageExchangeProvider
implements MessageExchangeFactory {
+ public static final long SELECT_TIMEOUT = 1000 * 30;
public static final long DEFAULT_THREAD_COUNT = 5;
- protected final MessageWorkerGroup WORKERS = new MessageWorkerGroup();
- protected final MessageChannel SEND = new NonPersistentMessageChannel(WORKERS);
- protected final MessageChannel RECEIVE = new NonPersistentMessageChannel(WORKERS);
+ protected final WorkerPool WORKERS = new WorkerPool();
+ protected final KeyedBuffer SEND = new NonPersistentKeyedBuffer(WORKERS);
+ protected final KeyedBuffer RECEIVE = new NonPersistentKeyedBuffer(WORKERS);
+ protected final KeyedBuffer RECEIVE_REQUESTS = new NonPersistentKeyedBuffer(WORKERS);
protected boolean initialized = false;
+ protected abstract MessageExchangeSendListener getMessageExchangeSendListener();
+
+ protected abstract ReceivedMessageDispatchPolicy getReceivedMessageDispatchPolicy();
+
public MessageExchange createMessageExchange() {
- return new MessageExchangeImpl(this, SEND, RECEIVE);
+ return new MessageExchangeImpl(this);
}
public void init() {
init(DEFAULT_THREAD_COUNT);
}
- public abstract void init(long THREAD_COUNT);
+ public void init(long THREAD_COUNT) {
+ if (initialized)
+ throw new IllegalStateException(Messages.getMessage("illegalStateException00"));
+ for (int n = 0; n < THREAD_COUNT; n++) {
+ WORKERS.addWorker(new MessageSender(WORKERS, SEND, getMessageExchangeSendListener()));
+ WORKERS.addWorker(new MessageReceiver(WORKERS, RECEIVE, getReceivedMessageDispatchPolicy()));
+ }
+ initialized = true;
+ }
+
+ public void processReceive(
+ MessageExchangeReceiveContext context) {
+ RECEIVE_REQUESTS.put(
+ context.getMessageExchangeCorrelator(),
+ context);
+ }
+
+ public void processSend(
+ MessageExchangeSendContext context) {
+ SEND.put(
+ context.getMessageExchangeCorrelator(),
+ context);
+ }
public void shutdown() {
shutdown(false);
@@ -108,6 +138,88 @@
public void awaitShutdown(long shutdown)
throws InterruptedException {
WORKERS.awaitShutdown(shutdown);
+ }
+
+
+
+ // -- Worker Classes --- //
+ public static class MessageReceiver
+ implements Runnable {
+
+ protected WorkerPool pool;
+ protected KeyedBuffer channel;
+ protected ReceivedMessageDispatchPolicy policy;
+
+ protected MessageReceiver(
+ WorkerPool pool,
+ KeyedBuffer channel,
+ ReceivedMessageDispatchPolicy policy) {
+ this.pool = pool;
+ this.channel = channel;
+ this.policy = policy;
+ }
+
+ /**
+ * @see java.lang.Runnable#run()
+ */
+ public void run() {
+ try {
+ while (!pool.isShuttingDown()) {
+ MessageExchangeSendContext context = (MessageExchangeSendContext)channel.select(SELECT_TIMEOUT);
+ policy.dispatch(context);
+ }
+ } catch (Throwable t) {
+ // kill the thread if any type of exception occurs.
+ // don't worry, we'll create another one to replace it
+ // if we're not currently in the process of shutting down.
+ // once I get the logging function plugged in, we'll
+ // log whatever errors do occur
+ } finally {
+ pool.workerDone(this);
+ }
+ }
+
+ }
+
+
+
+ public static class MessageSender
+ implements Runnable {
+
+ protected WorkerPool pool;
+ protected KeyedBuffer channel;
+ protected MessageExchangeSendListener listener;
+
+ protected MessageSender(
+ WorkerPool pool,
+ KeyedBuffer channel,
+ MessageExchangeSendListener listener) {
+ this.pool = pool;
+ this.channel = channel;
+ this.listener = listener;
+ }
+
+ /**
+ * @see java.lang.Runnable#run()
+ */
+ public void run() {
+ try {
+ while (!pool.isShuttingDown()) {
+ MessageExchangeSendContext context = (MessageExchangeSendContext)channel.select(SELECT_TIMEOUT);
+ if (context != null)
+ listener.onSend(context);
+ }
+ } catch (Throwable t) {
+ // kill the thread if any type of exception occurs.
+ // don't worry, we'll create another one to replace it
+ // if we're not currently in the process of shutting down.
+ // once I get the logging function plugged in, we'll
+ // log whatever errors do occur
+ } finally {
+ pool.workerDone(this);
+ }
+ }
+
}
}
1.3 +112 -160 xml-axis/java/src/org/apache/axis/ime/internal/MessageExchangeImpl.java
Index: MessageExchangeImpl.java
===================================================================
RCS file: /home/cvs/xml-axis/java/src/org/apache/axis/ime/internal/MessageExchangeImpl.java,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -r1.2 -r1.3
--- MessageExchangeImpl.java 28 Oct 2002 13:50:40 -0000 1.2
+++ MessageExchangeImpl.java 29 Oct 2002 05:15:29 -0000 1.3
@@ -58,16 +58,14 @@
import org.apache.axis.AxisFault;
import org.apache.axis.MessageContext;
import org.apache.axis.i18n.Messages;
-import org.apache.axis.ime.MessageChannel;
import org.apache.axis.ime.MessageExchange;
import org.apache.axis.ime.MessageExchangeConstants;
-import org.apache.axis.ime.MessageExchangeContext;
-import org.apache.axis.ime.MessageExchangeContextListener;
-import org.apache.axis.ime.MessageExchangeCorrelator;
import org.apache.axis.ime.MessageExchangeFaultListener;
-import org.apache.axis.ime.MessageExchangeLifecycle;
-import org.apache.axis.ime.MessageExchangeReceiveListener;
import org.apache.axis.ime.MessageExchangeStatusListener;
+import org.apache.axis.ime.MessageExchangeCorrelator;
+import org.apache.axis.ime.MessageExchangeCorrelatorService;
+import org.apache.axis.ime.MessageContextListener;
+import org.apache.axis.ime.MessageExchangeLifecycle;
import org.apache.axis.ime.internal.util.uuid.UUIDGenFactory;
/**
@@ -76,25 +74,19 @@
public class MessageExchangeImpl
implements MessageExchange, MessageExchangeLifecycle {
+ private static final long NO_TIMEOUT = -1;
public static final long WORKER_COUNT = 5;
public static final long DEFAULT_TIMEOUT = 1000 * 20;
- private MessageExchangeProvider provider;
- private MessageChannel send;
- private MessageChannel receive;
- private MessageExchangeReceiveListener receiveListener;
- private MessageExchangeStatusListener statusListener;
- private MessageExchangeFaultListener faultListener;
private MessageWorkerGroup workers = new MessageWorkerGroup();
+ private MessageExchangeFaultListener faultListener;
+ private MessageExchangeStatusListener statusListener;
+ private MessageExchangeProvider provider;
private boolean listening = false;
protected Holder holder;
public MessageExchangeImpl(
- MessageExchangeProvider provider,
- MessageChannel sendChannel,
- MessageChannel receiveChannel) {
- this.send = sendChannel;
- this.receive = receiveChannel;
+ MessageExchangeProvider provider) {
}
/**
@@ -103,6 +95,16 @@
public MessageExchangeCorrelator send(
MessageContext context)
throws AxisFault {
+ return send(context,null); // should do default listener
+ }
+
+ /**
+ * @see org.apache.axis.ime.MessageExchange#send(MessageContext)
+ */
+ public MessageExchangeCorrelator send(
+ MessageContext context,
+ MessageContextListener listener)
+ throws AxisFault {
MessageExchangeCorrelator correlator =
(MessageExchangeCorrelator) context.getProperty(
MessageExchangeConstants.MESSAGE_CORRELATOR_PROPERTY);
@@ -113,88 +115,64 @@
MessageExchangeConstants.MESSAGE_CORRELATOR_PROPERTY,
correlator);
}
- MessageExchangeContext meContext =
- MessageExchangeContext.newInstance(
- correlator,
- statusListener,
- receiveListener,
- faultListener,
- context);
- send.put(correlator, meContext);
+ if (listener != null) {
+ provider.processReceive(
+ MessageExchangeReceiveContext.newInstance(
+ correlator,
+ listener,
+ faultListener,
+ statusListener));
+ }
+ provider.processSend(
+ MessageExchangeSendContext.newInstance(
+ correlator,
+ context,
+ faultListener,
+ statusListener));
return correlator;
}
/**
- * @see org.apache.axis.ime.MessageExchange#setMessageExchangeStatusListener(MessageExchangeStatusListener)
- */
- public void setMessageExchangeStatusListener(
- MessageExchangeStatusListener listener)
- throws AxisFault {
- if (listening)
- throw new IllegalStateException(Messages.getMessage("illegalStateException00"));
- this.statusListener = listener;
- }
-
- /**
- * @see org.apache.axis.ime.MessageExchange#setMessageExchangeReceiveListener(MessageExchangeReceiveListener)
+ * @see org.apache.axis.ime.MessageExchange#receive()
*/
- public void setMessageExchangeReceiveListener(
- MessageExchangeReceiveListener listener)
+ public MessageContext receive()
throws AxisFault {
- if (listening)
- throw new IllegalStateException(Messages.getMessage("illegalStateException00"));
- this.receiveListener = listener;
+ return receive(null,NO_TIMEOUT);
}
/**
- * @see org.apache.axis.ime.MessageExchange#setMessageExchangeReceiveListener(MessageExchangeReceiveListener)
+ * @see org.apache.axis.ime.MessageExchange#receive(long)
*/
- public void setMessageExchangeFaultListener(
- MessageExchangeFaultListener listener)
+ public MessageContext receive(
+ long timeout)
throws AxisFault {
- if (listening)
- throw new IllegalStateException(Messages.getMessage("illegalStateException00"));
- this.faultListener = listener;
+ return receive(null,timeout);
}
/**
- * @see org.apache.axis.ime.MessageExchange#cancel(MessageExchangeCorrelator)
+ * @see org.apache.axis.ime.MessageExchange#receive(MessageExchangeCorrelator)
*/
- public MessageContext cancel(
- MessageExchangeCorrelator correlator)
+ public MessageContext receive(
+ MessageExchangeCorrelator correlator)
throws AxisFault {
- MessageExchangeContext context = send.cancel(correlator);
- if (context != null)
- return context.getMessageContext();
- else
- return null;
+ return receive(correlator,NO_TIMEOUT);
}
/**
- * @see org.apache.axis.ime.MessageExchange#getReceiveChannel()
+ * @see org.apache.axis.ime.MessageExchange#receive(MessageExchangeCorrelator,long)
*/
- public MessageChannel getReceiveChannel() {
- return receive;
- }
-
- /**
- * @see org.apache.axis.ime.MessageExchange#getSendChannel()
- */
- public MessageChannel getSendChannel() {
- return send;
- }
-
-
- public MessageContext sendAndReceive(
- MessageContext context)
+ public MessageContext receive(
+ MessageExchangeCorrelator correlator,
+ long timeout)
throws AxisFault {
holder = new Holder();
Listener listener = new Listener(holder);
- this.setMessageExchangeFaultListener(listener);
- this.setMessageExchangeReceiveListener(listener);
try {
- this.send(context);
- holder.waitForNotify();
+ this.receive(correlator,listener);
+ if (timeout != NO_TIMEOUT)
+ holder.waitForNotify(timeout);
+ else
+ holder.waitForNotify();
} catch (InterruptedException ie) {
throw AxisFault.makeFault(ie);
}
@@ -207,17 +185,54 @@
return null;
}
+ /**
+ * @see org.apache.axis.ime.MessageExchange#receive(MessageContextListener)
+ */
+ public void receive(
+ MessageContextListener listener)
+ throws AxisFault {
+ receive(null,listener);
+ }
+
+ /**
+ * @see org.apache.axis.ime.MessageExchange#receive(MessageExchangeCorrelator,MessageContextListener)
+ */
+ public void receive(
+ MessageExchangeCorrelator correlator,
+ MessageContextListener listener)
+ throws AxisFault {
+ provider.processReceive(
+ MessageExchangeReceiveContext.newInstance(
+ correlator,
+ listener,
+ faultListener,
+ statusListener));
+ }
+
+ /**
+ * @see org.apache.axis.ime.MessageExchange#sendAndReceive(MessageContext)
+ */
+ public MessageContext sendAndReceive(
+ MessageContext context)
+ throws AxisFault {
+ return sendAndReceive(context,NO_TIMEOUT);
+ }
+
+ /**
+ * @see org.apache.axis.ime.MessageExchange#sendAndReceive(MessageContext,long)
+ */
public MessageContext sendAndReceive(
MessageContext context,
long timeout)
throws AxisFault {
holder = new Holder();
Listener listener = new Listener(holder);
- this.setMessageExchangeFaultListener(listener);
- this.setMessageExchangeReceiveListener(listener);
try {
- this.send(context);
- holder.waitForNotify(timeout);
+ this.send(context,listener);
+ if (timeout != NO_TIMEOUT)
+ holder.waitForNotify(timeout);
+ else
+ holder.waitForNotify();
} catch (InterruptedException ie) {
throw AxisFault.makeFault(ie);
}
@@ -230,44 +245,8 @@
return null;
}
- /**
- * @see org.apache.axis.ime.MessageExchange#startListening()
- */
- public void startListening() {
- if (provider instanceof MessageExchangeProvider1)
- throw new UnsupportedOperationException(Messages.getMessage("unsupportedOperationException00"));
- for (int n = 0; n < WORKER_COUNT; n++) {
- workers.addWorker(receive, new ReceiverListener());
- }
- listening = true;
- }
- /**
- * @see org.apache.axis.ime.MessageExchange#startListening()
- */
- public void startListening(MessageExchangeCorrelator correlator) {
- throw new UnsupportedOperationException(Messages.getMessage("unsupportedOperationException01", "Unsupported For Now"));
- }
-
- /**
- * @see org.apache.axis.ime.MessageExchange#stopListening()
- */
- public void stopListening() {
- stopListening(false);
- }
-
- /**
- * @see org.apache.axis.ime.MessageExchange#stopListening(boolean)
- */
- public void stopListening(boolean force) {
- if (provider instanceof MessageExchangeProvider1)
- throw new UnsupportedOperationException(Messages.getMessage("unsupportedOperationException00"));
- if (!force)
- workers.safeShutdown();
- else
- workers.shutdown();
- listening = false;
- }
+ // -- Utility Classes --- //
private class Holder {
private MessageExchangeCorrelator correlator;
@@ -305,8 +284,7 @@
}
public class Listener
- implements MessageExchangeReceiveListener,
- MessageExchangeFaultListener {
+ extends MessageContextListener {
protected Holder holder;
@@ -335,37 +313,8 @@
}
- private class ReceiverListener
- implements MessageExchangeContextListener {
- /**
- * @see org.apache.axis.ime.MessageExchangeContextListener#onMessageExchangeContext(MessageExchangeContext)
- */
- public void onMessageExchangeContext(
- MessageExchangeContext context) {
-
- MessageContext msgContext =
- context.getMessageContext();
- MessageExchangeCorrelator correlator =
- context.getMessageExchangeCorrelator();
-
- try {
- // there should be code here to see if the message
- // contains a fault. if so, the fault listener should
- // be invoked
- if (msgContext != null &&
- msgContext.getResponseMessage() != null &&
- receiveListener != null) {
- receiveListener.onReceive(correlator, msgContext);
- }
- } catch (Exception exception) {
- if (faultListener != null)
- faultListener.onFault(
- correlator, exception);
- }
-
- }
- }
+ // -- MessageExchangeLifecycle Implementation --- //
/**
* @see org.apache.axis.ime.MessageExchangeLifecycle#awaitShutdown()
@@ -404,18 +353,21 @@
provider.shutdown(force);
}
- /**
- * @see org.apache.axis.ime.MessageExchange#receive()
- */
- public MessageContext receive() throws AxisFault {
- throw new UnsupportedOperationException(Messages.getMessage("unsupportedOperationException00"));
+ public synchronized void setMessageExchangeFaultListener(
+ MessageExchangeFaultListener listener) {
+ this.faultListener = listener;
}
-
- /**
- * @see org.apache.axis.ime.MessageExchange#receive(long)
- */
- public MessageContext receive(long timeout) throws AxisFault {
- throw new UnsupportedOperationException(Messages.getMessage("unsupportedOperationException00"));
+
+ public synchronized MessageExchangeFaultListener getMessageExchangeFaultListener() {
+ return this.faultListener;
+ }
+
+ public synchronized void setMessageExchangeStatusListener(
+ MessageExchangeStatusListener listener) {
+ this.statusListener = listener;
+ }
+
+ public synchronized MessageExchangeStatusListener getMessageExchangeStatusListener() {
+ return this.statusListener;
}
-
}
1.1 xml-axis/java/src/org/apache/axis/ime/internal/MessageExchangeSendContext.java
Index: MessageExchangeSendContext.java
===================================================================
/*
* The Apache Software License, Version 1.1
*
*
* Copyright (c) 2001 The Apache Software Foundation. All rights
* reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in
* the documentation and/or other materials provided with the
* distribution.
*
* 3. The end-user documentation included with the redistribution,
* if any, must include the following acknowledgment:
* "This product includes software developed by the
* Apache Software Foundation (http://www.apache.org/)."
* Alternately, this acknowledgment may appear in the software itself,
* if and wherever such third-party acknowledgments normally appear.
*
* 4. The names "Axis" and "Apache Software Foundation" must
* not be used to endorse or promote products derived from this
* software without prior written permission. For written
* permission, please contact apache@apache.org.
*
* 5. Products derived from this software may not be called "Apache",
* nor may "Apache" appear in their name, without prior written
* permission of the Apache Software Foundation.
*
* THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
* WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
* OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR
* ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
* USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
* ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
* OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
* OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*/
package org.apache.axis.ime.internal;
import org.apache.axis.MessageContext;
import org.apache.axis.ime.MessageExchangeCorrelator;
import org.apache.axis.ime.MessageExchangeFaultListener;
import org.apache.axis.ime.MessageExchangeStatusListener;
import java.io.Serializable;
/**
* Note: the only challenge with making this class serializable
* is that org.apache.axis.MessageContext is currently NOT
* serializable. MessageContext needs to change in order to
* take advantage of persistent Channels and CorrelatorServices
*
* For thread safety, instances of this class are immutable
*
* @author James M Snell (jasnell@us.ibm.com)
*/
public final class MessageExchangeSendContext
implements Serializable {
public static MessageExchangeSendContext newInstance(
MessageExchangeCorrelator correlator,
MessageContext context,
MessageExchangeFaultListener faultListener,
MessageExchangeStatusListener statusListener) {
MessageExchangeSendContext mectx =
new MessageExchangeSendContext();
mectx.correlator = correlator;
mectx.context = context;
mectx.faultListener = faultListener;
mectx.statusListener = statusListener;
return mectx;
}
protected MessageExchangeCorrelator correlator;
protected MessageExchangeFaultListener faultListener;
protected MessageExchangeStatusListener statusListener;
protected MessageContext context;
protected MessageExchangeSendContext() {
}
public MessageExchangeCorrelator getMessageExchangeCorrelator() {
return this.correlator;
}
public MessageContext getMessageContext() {
return this.context;
}
public MessageExchangeFaultListener getMessageExchangeFaultListener() {
return this.faultListener;
}
public MessageExchangeStatusListener getMessageExchangeStatusListener() {
return this.statusListener;
}
}
1.1 xml-axis/java/src/org/apache/axis/ime/internal/MessageExchangeSendListener.java
Index: MessageExchangeSendListener.java
===================================================================
/*
* The Apache Software License, Version 1.1
*
*
* Copyright (c) 2001 The Apache Software Foundation. All rights
* reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in
* the documentation and/or other materials provided with the
* distribution.
*
* 3. The end-user documentation included with the redistribution,
* if any, must include the following acknowledgment:
* "This product includes software developed by the
* Apache Software Foundation (http://www.apache.org/)."
* Alternately, this acknowledgment may appear in the software itself,
* if and wherever such third-party acknowledgments normally appear.
*
* 4. The names "Axis" and "Apache Software Foundation" must
* not be used to endorse or promote products derived from this
* software without prior written permission. For written
* permission, please contact apache@apache.org.
*
* 5. Products derived from this software may not be called "Apache",
* nor may "Apache" appear in their name, without prior written
* permission of the Apache Software Foundation.
*
* THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
* WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
* OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR
* ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
* USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
* ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
* OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
* OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*/
package org.apache.axis.ime.internal;
import java.io.Serializable;
/**
* @author James M Snell (jasnell@us.ibm.com)
*/
public interface MessageExchangeSendListener
extends Serializable {
public void onSend(
MessageExchangeSendContext context);
}
1.1 xml-axis/java/src/org/apache/axis/ime/internal/FirstComeFirstServeDispatchPolicy.java
Index: FirstComeFirstServeDispatchPolicy.java
===================================================================
/*
* The Apache Software License, Version 1.1
*
*
* Copyright (c) 2001 The Apache Software Foundation. All rights
* reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in
* the documentation and/or other materials provided with the
* distribution.
*
* 3. The end-user documentation included with the redistribution,
* if any, must include the following acknowledgment:
* "This product includes software developed by the
* Apache Software Foundation (http://www.apache.org/)."
* Alternately, this acknowledgment may appear in the software itself,
* if and wherever such third-party acknowledgments normally appear.
*
* 4. The names "Axis" and "Apache Software Foundation" must
* not be used to endorse or promote products derived from this
* software without prior written permission. For written
* permission, please contact apache@apache.org.
*
* 5. Products derived from this software may not be called "Apache",
* nor may "Apache" appear in their name, without prior written
* permission of the Apache Software Foundation.
*
* THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
* WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
* OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR
* ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
* USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
* ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
* OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
* OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*/
package org.apache.axis.ime.internal;
import org.apache.axis.MessageContext;
import org.apache.axis.ime.MessageExchangeCorrelator;
import org.apache.axis.ime.MessageContextListener;
import org.apache.axis.ime.MessageExchangeFaultListener;
import org.apache.axis.ime.internal.util.KeyedBuffer;
/**
* @author James M Snell (jasnell@us.ibm.com)
*/
public class FirstComeFirstServeDispatchPolicy
implements ReceivedMessageDispatchPolicy {
protected KeyedBuffer RECEIVE_REQUESTS;
protected KeyedBuffer RECEIVE;
public FirstComeFirstServeDispatchPolicy(
KeyedBuffer RECEIVE,
KeyedBuffer RECEIVE_REQUESTS) {
this.RECEIVE = RECEIVE;
this.RECEIVE_REQUESTS = RECEIVE_REQUESTS;
}
public void dispatch(
MessageExchangeSendContext context) {
// 1. Get the correlator
// 2. See if there are any receive requests based on the correlator
// 3. If there are receive requests for the correlator, deliver to the first one
// 4. If there are no receive requests for the correlator, deliver to the first "anonymous" receive request
// 5. If there are no receive requests, put the message back on the Queue
MessageExchangeReceiveContext receiveContext = null;
MessageExchangeCorrelator correlator =
context.getMessageExchangeCorrelator();
receiveContext = (MessageExchangeReceiveContext)RECEIVE_REQUESTS.get(correlator);
if (receiveContext == null) {
receiveContext = (MessageExchangeReceiveContext)RECEIVE_REQUESTS.get();
}
if (receiveContext == null)
RECEIVE.put(correlator,context);
else {
MessageExchangeFaultListener faultListener =
receiveContext.getMessageExchangeFaultListener();
MessageContextListener contextListener =
receiveContext.getMessageContextListener();
MessageContext msgContext =
context.getMessageContext();
try {
contextListener.onReceive(
correlator, msgContext);
} catch (Exception exception) {
if (faultListener != null)
faultListener.onFault(
correlator, exception);
}
}
}
}
1.1 xml-axis/java/src/org/apache/axis/ime/internal/MessageExchangeReceiveContext.java
Index: MessageExchangeReceiveContext.java
===================================================================
package org.apache.axis.ime.internal;
import org.apache.axis.ime.MessageContextListener;
import org.apache.axis.ime.MessageExchangeFaultListener;
import org.apache.axis.ime.MessageExchangeStatusListener;
import org.apache.axis.ime.MessageExchangeCorrelator;
/**
* @author James M Snell (jasnell@us.ibm.com)
*/
public class MessageExchangeReceiveContext {
public static MessageExchangeReceiveContext newInstance(
MessageExchangeCorrelator correlator,
MessageContextListener listener,
MessageExchangeFaultListener faultListener,
MessageExchangeStatusListener statusListener) {
MessageExchangeReceiveContext mectx =
new MessageExchangeReceiveContext();
mectx.correlator = correlator;
mectx.listener = listener;
mectx.faultListener = faultListener;
mectx.statusListener = statusListener;
return mectx;
}
protected MessageContextListener listener;
protected MessageExchangeFaultListener faultListener;
protected MessageExchangeStatusListener statusListener;
protected MessageExchangeCorrelator correlator;
protected MessageExchangeReceiveContext() {}
public MessageExchangeCorrelator getMessageExchangeCorrelator() {
return this.correlator;
}
public MessageContextListener getMessageContextListener() {
return this.listener;
}
public MessageExchangeFaultListener getMessageExchangeFaultListener() {
return this.faultListener;
}
public MessageExchangeStatusListener getMessageExchangeStatusListener() {
return this.statusListener;
}
}
1.1 xml-axis/java/src/org/apache/axis/ime/internal/ReceivedMessageDispatchPolicy.java
Index: ReceivedMessageDispatchPolicy.java
===================================================================
/*
* The Apache Software License, Version 1.1
*
*
* Copyright (c) 2001 The Apache Software Foundation. All rights
* reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in
* the documentation and/or other materials provided with the
* distribution.
*
* 3. The end-user documentation included with the redistribution,
* if any, must include the following acknowledgment:
* "This product includes software developed by the
* Apache Software Foundation (http://www.apache.org/)."
* Alternately, this acknowledgment may appear in the software itself,
* if and wherever such third-party acknowledgments normally appear.
*
* 4. The names "Axis" and "Apache Software Foundation" must
* not be used to endorse or promote products derived from this
* software without prior written permission. For written
* permission, please contact apache@apache.org.
*
* 5. Products derived from this software may not be called "Apache",
* nor may "Apache" appear in their name, without prior written
* permission of the Apache Software Foundation.
*
* THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
* WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
* OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR
* ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
* USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
* ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
* OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
* OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*/
package org.apache.axis.ime.internal;
/**
* @author James M Snell (jasnell@us.ibm.com)
*/
public interface ReceivedMessageDispatchPolicy {
public void dispatch(
MessageExchangeSendContext context);
}
1.5 +54 -65 xml-axis/java/src/org/apache/axis/ime/MessageExchange.java
Index: MessageExchange.java
===================================================================
RCS file: /home/cvs/xml-axis/java/src/org/apache/axis/ime/MessageExchange.java,v
retrieving revision 1.4
retrieving revision 1.5
diff -u -r1.4 -r1.5
--- MessageExchange.java 28 Oct 2002 21:59:45 -0000 1.4
+++ MessageExchange.java 29 Oct 2002 05:15:29 -0000 1.5
@@ -80,22 +80,23 @@
throws AxisFault;
/**
- * Will attempt to cancel the outbound MessageExchange
- * process for a given message context. Returns true if
- * an only if the MessageContext was canceled. A false
- * response indicates that the MessageContext could not
- * be removed from the outbound channel for whatever
- * reason.
- * @param MessageExchangeCorrelator The correlator for the message being canceled
- * @return MessageContext The canceled MessageContext
+ * Send an outbound message. (Impl's of this method
+ * need to create a new MessageExchangeCorrelator and
+ * put it into the MessageContext if one does not already
+ * exist.)
+ * @param MessageContext The Axis MessageContext being sent
+ * @param MessageContextListener The listener to which responses, faults, and status updates should be delivered
+ * @return MessageExchangeCorrelator The correlator for the sent MessageContext
* @throws AxisFault
*/
- public MessageContext cancel(
- MessageExchangeCorrelator correlator)
+ public MessageExchangeCorrelator send(
+ MessageContext context,
+ MessageContextListener listener)
throws AxisFault;
/**
* Waits indefinitely for a message to be received
+ * (blocking)
* @return MessageContext The received MessageContext
* @throws AxisFault
*/
@@ -105,6 +106,7 @@
/**
* Waits the specified amount of time for a message to
* be received
+ * (blocking)
* @param long The amount of time (ms) to wait
* @return MessageContext The received MessageContext
* @throws AxisFault
@@ -114,29 +116,51 @@
throws AxisFault;
/**
- * Will instruct the MessageExchange provider to
- * wait for a message to be received.
+ * Waits indefinitely for a message matching the
+ * specified correlator
+ * (blocking)
+ * @param MessageExchangeCorrelator
+ * @return MessageContext
* @throws AxisFault
*/
- public void startListening()
+ public MessageContext receive(
+ MessageExchangeCorrelator correlator)
throws AxisFault;
/**
- * Will instruct the MessageExchange provider to
- * wait for a specific MessageExchangeCorrelator
- * @param MessageExchangeCorrelator The correlator of the MessageContext to listen for
+ * Waits the specified amount of time for a message matching the
+ * specified correlator
+ * (blocking)
+ * @param MessageExchangeCorrelator
+ * @param long timeout
+ * @returns MessageContext
* @throws AxisFault
*/
- public void startListening(
- MessageExchangeCorrelator correlator)
+ public MessageContext receive(
+ MessageExchangeCorrelator correlator,
+ long timeout)
throws AxisFault;
/**
- * Will instruct the MessageExchange provider to
- * stop listening
+ * Registers a listener for receiving messages
+ * (nonblocking)
+ * @param MessageContextListener
* @throws AxisFault
*/
- public void stopListening()
+ public void receive(
+ MessageContextListener listener)
+ throws AxisFault;
+
+ /**
+ * Registers a listener for receiving messages
+ * (nonblocking)
+ * @param MessageExchangeCorrelator
+ * @param MessageContextListener
+ * @throws AxisFault
+ */
+ public void receive(
+ MessageExchangeCorrelator correlator,
+ MessageContextListener listener)
throws AxisFault;
/**
@@ -161,49 +185,14 @@
long timeout)
throws AxisFault;
- /**
- * Allows applications to listen for changes to
- * the current disposition of the MessageExchange operation
- * (push model)
- * @param MessageExchangeStatusListener
- * @throws AxisFault
- */
- public void setMessageExchangeStatusListener(
- MessageExchangeStatusListener listener)
- throws AxisFault;
-
- /**
- * Allows applications to listen for inbound messages
- * (push model)
- * @param MessageExchangeReceiveListener
- * @throws AxisFault
- */
- public void setMessageExchangeReceiveListener(
- MessageExchangeReceiveListener listener)
- throws AxisFault;
-
- /**
- * Allows applications to listen for faults/exceptions
- * (push model)
- * @param MessageExchangeFaultListener
- * @throws AxisFault
- */
public void setMessageExchangeFaultListener(
- MessageExchangeFaultListener listener)
- throws AxisFault;
-
- /**
- * Allows MessageExchange consumers low level access
- * to the Send message channel
- * @return MessageChannel
- */
- public MessageChannel getSendChannel();
-
- /**
- * Allows MessageExchange consumers low level access
- * to the Receive message channel
- * @return MessageChannel
- */
- public MessageChannel getReceiveChannel();
-
+ MessageExchangeFaultListener listener);
+
+ public MessageExchangeFaultListener getMessageExchangeFaultListener();
+
+ public void setMessageExchangeStatusListener(
+ MessageExchangeStatusListener listener);
+
+ public MessageExchangeStatusListener getMessageExchangeStatusListener();
+
}
1.4 +2 -2 xml-axis/java/src/org/apache/axis/ime/MessageExchangeCorrelatorService.java
Index: MessageExchangeCorrelatorService.java
===================================================================
RCS file: /home/cvs/xml-axis/java/src/org/apache/axis/ime/MessageExchangeCorrelatorService.java,v
retrieving revision 1.3
retrieving revision 1.4
diff -u -r1.3 -r1.4
--- MessageExchangeCorrelatorService.java 28 Oct 2002 21:45:59 -0000 1.3
+++ MessageExchangeCorrelatorService.java 29 Oct 2002 05:15:30 -0000 1.4
@@ -61,9 +61,9 @@
public void put(
MessageExchangeCorrelator correlator,
- MessageExchangeContext context);
+ Object context);
- public MessageExchangeContext get(
+ public Object get(
MessageExchangeCorrelator correlator);
}
1.1 xml-axis/java/src/org/apache/axis/ime/MessageContextListener.java
Index: MessageContextListener.java
===================================================================
/*
* The Apache Software License, Version 1.1
*
*
* Copyright (c) 2001 The Apache Software Foundation. All rights
* reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in
* the documentation and/or other materials provided with the
* distribution.
*
* 3. The end-user documentation included with the redistribution,
* if any, must include the following acknowledgment:
* "This product includes software developed by the
* Apache Software Foundation (http://www.apache.org/)."
* Alternately, this acknowledgment may appear in the software itself,
* if and wherever such third-party acknowledgments normally appear.
*
* 4. The names "Axis" and "Apache Software Foundation" must
* not be used to endorse or promote products derived from this
* software without prior written permission. For written
* permission, please contact apache@apache.org.
*
* 5. Products derived from this software may not be called "Apache",
* nor may "Apache" appear in their name, without prior written
* permission of the Apache Software Foundation.
*
* THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
* WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
* OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR
* ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
* USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
* ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
* OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
* OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*/
package org.apache.axis.ime;
import java.io.Serializable;
import org.apache.axis.MessageContext;
/**
* @author James M Snell (jasnell@us.ibm.com)
*/
public abstract class MessageContextListener
implements Serializable {
public void onFault(
MessageExchangeCorrelator correlator,
Throwable exception) {}
public void onReceive(
MessageExchangeCorrelator correlator,
MessageContext context) {}
public void onStatus(
MessageExchangeCorrelator correlator,
MessageExchangeStatus status) {}
}
1.1 xml-axis/java/src/org/apache/axis/ime/internal/util/KeyedBuffer.java
Index: KeyedBuffer.java
===================================================================
/*
* The Apache Software License, Version 1.1
*
*
* Copyright (c) 2001 The Apache Software Foundation. All rights
* reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in
* the documentation and/or other materials provided with the
* distribution.
*
* 3. The end-user documentation included with the redistribution,
* if any, must include the following acknowledgment:
* "This product includes software developed by the
* Apache Software Foundation (http://www.apache.org/)."
* Alternately, this acknowledgment may appear in the software itself,
* if and wherever such third-party acknowledgments normally appear.
*
* 4. The names "Axis" and "Apache Software Foundation" must
* not be used to endorse or promote products derived from this
* software without prior written permission. For written
* permission, please contact apache@apache.org.
*
* 5. Products derived from this software may not be called "Apache",
* nor may "Apache" appear in their name, without prior written
* permission of the Apache Software Foundation.
*
* THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
* WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
* OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR
* ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
* USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
* ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
* OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
* OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*/
package org.apache.axis.ime.internal.util;
/**
* A KeyedBuffer is a low level hybrid FIFO Queue and Keyed map
* Each MessageExchange implementation will create at least two
* KeyedBuffer's, one for messages being sent, and another for
* messages that have been received.
*
* KeyedBuffers differ from traditional FIFO Queues in that
* elements put in are keyed and can be taken out of order.
*
* Different implementations may allow for variations on
* how the KeyedBuffer model is implemented. For instance,
* the code will ship with a NonPersistentKeyedBuffer that
* will store all contained objects in memory. The fact that
* everything is stored in memory means that the buffer is not
* fault tolerant. If fault tolerance is required, then a
* Persistent KeyedBuffer must be created that persists the
* objects somehow.
*
* @author James M Snell (jasnell@us.ibm.com)
*/
public interface KeyedBuffer {
/**
* Select, but do not remove the next message on the
* channel. If one does not exist, return null
*/
public Object peek();
/**
* Put a message onto the channel
*/
public void put(
Object key,
Object context);
/**
* Cancel a message that has been put on the channel.
* Unlike select(Object key), this method will not block
* and wait for a message with the specified key to be
* put onto the MessageChannel.
*/
public Object cancel(
Object key);
/**
* Select and remove all of the messages currently in
* the channel (useful for bulk operations). This
* method will not block. It is also not guaranteed
* that the Channel will be empty once this operation
* returns (it is possible that another thread may
* put new MessageContexts into the channel before this
* operation completes)
*/
public Object[] selectAll();
/**
* Select and remove the next message in the channel
* If a message is not available, wait indefinitely for one
*/
public Object select()
throws InterruptedException;
/**
* Select and remove the next message in the channel
* If a message is not available, wait the specified amount
* of time for one
*/
public Object select(
long timeout)
throws InterruptedException;
/**
* Select and remove a specific message in the channel
* If the message is not available, wait indefinitely
* for one to be available
*/
public Object select(
Object key)
throws InterruptedException;
/**
* Select and remove a specific message in the channel
* If the message is not available, wait the specified
* amount of time for one
*/
public Object select(
Object key,
long timeout)
throws InterruptedException;
/**
* Select and remove the next object in the buffer
* (does not wait for a message to be put into the buffer)
*/
public Object get();
/**
* Select and remove the specified object in the buffer
* (does not wait for a message to be put into the buffer)
*/
public Object get(Object key);
}
1.1 xml-axis/java/src/org/apache/axis/ime/internal/util/WorkerPool.java
Index: WorkerPool.java
===================================================================
/*
* The Apache Software License, Version 1.1
*
*
* Copyright (c) 2001 The Apache Software Foundation. All rights
* reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in
* the documentation and/or other materials provided with the
* distribution.
*
* 3. The end-user documentation included with the redistribution,
* if any, must include the following acknowledgment:
* "This product includes software developed by the
* Apache Software Foundation (http://www.apache.org/)."
* Alternately, this acknowledgment may appear in the software itself,
* if and wherever such third-party acknowledgments normally appear.
*
* 4. The names "Axis" and "Apache Software Foundation" must
* not be used to endorse or promote products derived from this
* software without prior written permission. For written
* permission, please contact apache@apache.org.
*
* 5. Products derived from this software may not be called "Apache",
* nor may "Apache" appear in their name, without prior written
* permission of the Apache Software Foundation.
*
* THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
* WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
* OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR
* ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
* USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
* ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
* OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
* OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*/
package org.apache.axis.ime.internal.util;
import org.apache.axis.i18n.Messages;
import java.util.Hashtable;
import java.util.Iterator;
import java.util.Map;
/**
* @author James M Snell (jasnell@us.ibm.com)
*/
public class WorkerPool {
public static final long MAX_THREADS = 100;
protected Map threads = new Hashtable();
protected boolean interrupt;
protected long threadcount;
public boolean _shutdown;
/**
* Returns true if all workers have been shutdown
*/
public boolean isShutdown() {
synchronized (this) {
return _shutdown && threadcount == 0;
}
}
/**
* Returns true if all workers are in the process of shutting down
*/
public boolean isShuttingDown() {
synchronized (this) {
return _shutdown;
}
}
/**
* Returns the total number of currently active workers
*/
public long getWorkerCount() {
synchronized (this) {
return threadcount;
}
}
/**
* Adds a new worker to the pool
*/
public void addWorker(
Runnable worker) {
if (_shutdown ||
threadcount == MAX_THREADS)
throw new IllegalStateException(Messages.getMessage("illegalStateException00"));
Thread thread = new Thread(worker);
threads.put(worker, thread);
threadcount++;
thread.start();
}
/**
* Forcefully interrupt all workers
*/
public void interruptAll() {
synchronized (threads) {
for (Iterator i = threads.values().iterator(); i.hasNext();) {
Thread t = (Thread) i.next();
t.interrupt();
}
}
}
/**
* Forcefully shutdown the pool
*/
public void shutdown() {
synchronized (this) {
_shutdown = true;
}
interruptAll();
}
/**
* Forcefully shutdown the pool
*/
public void safeShutdown() {
synchronized (this) {
_shutdown = true;
}
}
/**
* Await shutdown of the worker
*/
public synchronized void awaitShutdown()
throws InterruptedException {
if (!_shutdown)
throw new IllegalStateException(Messages.getMessage("illegalStateException00"));
while (threadcount > 0)
wait();
}
/**
* Await shutdown of the worker
*/
public synchronized boolean awaitShutdown(long timeout)
throws InterruptedException {
if (!_shutdown)
throw new IllegalStateException(Messages.getMessage("illegalStateException00"));
if (threadcount == 0)
return true;
long waittime = timeout;
if (waittime <= 0)
return false;
long start = System.currentTimeMillis();
for (; ;) {
wait(waittime);
if (threadcount == 0)
return true;
waittime = timeout - System.currentTimeMillis();
if (waittime <= 0)
return false;
}
}
/**
* Used by MessageWorkers to notify the pool that it is done
*/
public synchronized void workerDone(
Runnable worker) {
threads.remove(worker);
if (--threadcount == 0 && _shutdown) {
notifyAll();
}
if (!_shutdown) {
addWorker(worker);
}
}
}
1.1 xml-axis/java/src/org/apache/axis/ime/internal/util/NonPersistentKeyedBuffer.java
Index: NonPersistentKeyedBuffer.java
===================================================================
/*
* The Apache Software License, Version 1.1
*
*
* Copyright (c) 2001 The Apache Software Foundation. All rights
* reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in
* the documentation and/or other materials provided with the
* distribution.
*
* 3. The end-user documentation included with the redistribution,
* if any, must include the following acknowledgment:
* "This product includes software developed by the
* Apache Software Foundation (http://www.apache.org/)."
* Alternately, this acknowledgment may appear in the software itself,
* if and wherever such third-party acknowledgments normally appear.
*
* 4. The names "Axis" and "Apache Software Foundation" must
* not be used to endorse or promote products derived from this
* software without prior written permission. For written
* permission, please contact apache@apache.org.
*
* 5. Products derived from this software may not be called "Apache",
* nor may "Apache" appear in their name, without prior written
* permission of the Apache Software Foundation.
*
* THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
* WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
* OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE APACHE SOFTWARE FOUNDATION OR
* ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
* USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
* ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
* OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
* OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*/
package org.apache.axis.ime.internal.util;
import org.apache.axis.i18n.Messages;
import java.util.Vector;
/**
* Creates a non-persistent KeyedBuffer. Queued messages
* are stored in memory. If the buffer instance is destroyed,
* so is the Queue.
*
* @author James M Snell (jasnell@us.ibm.com)
*/
public class NonPersistentKeyedBuffer
implements KeyedBuffer {
private final KeyedQueue messages = new KeyedQueue();
private WorkerPool WORKERS;
public NonPersistentKeyedBuffer(
WorkerPool workers) {
this.WORKERS = workers;
}
public Object peek() {
KeyedNode node = null;
synchronized (messages) {
node = messages.peek();
}
if (node != null) {
return node.value;
} else {
return null;
}
}
public void put(
Object key,
Object object) {
if (key == null ||
object == null)
throw new IllegalArgumentException(Messages.getMessage("illegalArgumentException00"));
synchronized (messages) {
messages.put(new KeyedNode(key, object));
messages.notify();
}
}
public Object cancel(Object key) {
if (key == null)
throw new IllegalArgumentException(Messages.getMessage("illegalArgumentException00"));
Object object = null;
synchronized (messages) {
KeyedNode node = messages.select(key); // will attempt to find and remove
if (node != null)
object = node.value;
node.key = null;
node.value = null;
}
return object;
}
public Object[] selectAll() {
Vector v = new Vector();
KeyedNode node = null;
synchronized (messages) {
while ((node = messages.select()) != null) {
v.add(node.value);
node.key = null;
node.value = null;
}
}
Object[] objects = new
Object[v.size()];
v.copyInto(objects);
return objects;
}
public Object select()
throws InterruptedException {
for (; ;) {
if (WORKERS.isShuttingDown())
throw new IllegalStateException(Messages.getMessage("illegalStateException00"));
KeyedNode node = null;
synchronized (messages) {
node = messages.select();
}
if (node != null) {
Object object = node.value;
node.key = null;
node.value = null;
return object;
} else {
messages.wait();
}
}
}
public Object select(long timeout)
throws InterruptedException {
for (; ;) {
if (WORKERS.isShuttingDown())
throw new IllegalStateException(Messages.getMessage("illegalStateException00"));
KeyedNode node = null;
synchronized (messages) {
node = messages.select();
}
if (node != null) {
Object object = node.value;
node.key = null;
node.value = null;
return object;
} else {
messages.wait(timeout);
}
}
}
public Object select(Object key)
throws InterruptedException {
for (; ;) {
if (WORKERS.isShuttingDown())
throw new IllegalStateException(Messages.getMessage("illegalStateException00"));
KeyedNode node = null;
synchronized (messages) {
node = messages.select(key);
}
if (node != null) {
Object object = node.value;
node.key = null;
node.value = null;
return object;
} else {
messages.wait();
}
}
}
public Object select(Object key, long timeout)
throws InterruptedException {
for (; ;) {
if (WORKERS.isShuttingDown())
throw new IllegalStateException(Messages.getMessage("illegalStateException00"));
KeyedNode node = null;
synchronized (messages) {
node = messages.select(key);
}
if (node != null) {
Object object = node.value;
node.key = null;
node.value = null;
return object;
} else {
messages.wait(timeout);
}
}
}
public Object get() {
KeyedNode node = null;
Object object = null;
synchronized (messages) {
node = messages.select();
}
if (node != null) {
object = node.value;
node.key = null;
node.value = null;
}
return object;
}
public Object get(Object key) {
KeyedNode node = null;
Object object = null;
synchronized (messages) {
node = messages.select(key);
}
if (node != null) {
object = node.value;
node.key = null;
node.value = null;
}
return object;
}
/// Support Classes ///
protected static class KeyedNode {
public Object key;
public Object value;
public KeyedNode next;
public KeyedNode() {
}
public KeyedNode(
Object key,
Object value) {
this.key = key;
this.value = value;
}
public KeyedNode(
Object key,
Object value,
KeyedNode next) {
this(key, value);
this.next = next;
}
}
protected static class KeyedQueue {
protected KeyedNode head;
protected KeyedNode last;
protected void put(KeyedNode node) {
if (last == null) {
last = head = node;
} else {
last = last.next = node;
}
}
protected KeyedNode select() {
KeyedNode node = head;
if (node != null && (head = node.next) == null) {
last = null;
}
if (node != null)
node.next = null;
return node;
}
protected KeyedNode select(Object key) {
KeyedNode previous = null;
for (KeyedNode node = head; node != null; node = node.next) {
if (node.key.equals(key)) {
if (previous != null)
previous.next = node.next;
node.next = null;
return node;
}
previous = node;
}
return null;
}
protected KeyedNode peek() {
KeyedNode node = head;
return node;
}
}
}