You are viewing a plain text version of this content. The canonical link for it is here.
Posted to java-dev@axis.apache.org by jm...@apache.org on 2002/10/30 00:33:41 UTC
cvs commit: xml-axis/java/src/org/apache/axis/ime/internal MessageExchangeImpl.java FirstComeFirstServeDispatchPolicy.java MessageExchangeProvider.java
jmsnell 2002/10/29 15:33:40
Modified: java/src/org/apache/axis/ime/internal/util/handler
HandlerMessageExchange.java
MessageExchangeHandler.java
java/src/org/apache/axis/ime/internal/util
NonPersistentKeyedBuffer.java WorkerPool.java
java/src/org/apache/axis/ime/internal
MessageExchangeImpl.java
FirstComeFirstServeDispatchPolicy.java
MessageExchangeProvider.java
Log:
Added logging support to the impl classes
Revision Changes Path
1.2 +12 -0 xml-axis/java/src/org/apache/axis/ime/internal/util/handler/HandlerMessageExchange.java
Index: HandlerMessageExchange.java
===================================================================
RCS file: /home/cvs/xml-axis/java/src/org/apache/axis/ime/internal/util/handler/HandlerMessageExchange.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- HandlerMessageExchange.java 29 Oct 2002 06:15:08 -0000 1.1
+++ HandlerMessageExchange.java 29 Oct 2002 23:33:40 -0000 1.2
@@ -65,6 +65,8 @@
import org.apache.axis.ime.internal.MessageExchangeSendListener;
import org.apache.axis.ime.internal.ReceivedMessageDispatchPolicy;
import org.apache.axis.ime.internal.FirstComeFirstServeDispatchPolicy;
+import org.apache.axis.components.logger.LogFactory;
+import org.apache.commons.logging.Log;
/**
* Used to wrap synchronous handlers (e.g. Axis 1.0 transports)
@@ -74,6 +76,9 @@
public class HandlerMessageExchange
extends MessageExchangeProvider {
+ protected static Log log =
+ LogFactory.getLog(HandlerMessageExchange.class.getName());
+
private Handler handler;
public HandlerMessageExchange(Handler handler) {
@@ -105,6 +110,9 @@
*/
public void onSend(
MessageExchangeSendContext context) {
+ if (log.isDebugEnabled()) {
+ log.debug("Enter: HandlerMessageExchange.Listener::onSend");
+ }
MessageExchangeFaultListener listener =
context.getMessageExchangeFaultListener();
try {
@@ -123,6 +131,10 @@
listener.onFault(
context.getMessageExchangeCorrelator(),
exception);
+ } finally {
+ if (log.isDebugEnabled()) {
+ log.debug("Exit: HandlerMessageExchange.Listener::onSend");
+ }
}
}
}
1.2 +11 -0 xml-axis/java/src/org/apache/axis/ime/internal/util/handler/MessageExchangeHandler.java
Index: MessageExchangeHandler.java
===================================================================
RCS file: /home/cvs/xml-axis/java/src/org/apache/axis/ime/internal/util/handler/MessageExchangeHandler.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- MessageExchangeHandler.java 29 Oct 2002 06:15:08 -0000 1.1
+++ MessageExchangeHandler.java 29 Oct 2002 23:33:40 -0000 1.2
@@ -58,6 +58,8 @@
import org.apache.axis.MessageContext;
import org.apache.axis.ime.MessageExchange;
import org.apache.axis.handlers.BasicHandler;
+import org.apache.axis.components.logger.LogFactory;
+import org.apache.commons.logging.Log;
/**
* This could probably be a bit more sophisticated,
@@ -68,6 +70,9 @@
public class MessageExchangeHandler
extends BasicHandler {
+ protected static Log log =
+ LogFactory.getLog(MessageExchangeHandler.class.getName());
+
private MessageExchange messageExchange;
public MessageExchangeHandler() {}
@@ -79,7 +84,13 @@
public void invoke(
MessageContext msgContext)
throws AxisFault {
+ if (log.isDebugEnabled()) {
+ log.debug("Enter: MessageExchangeHandler::invoke");
+ }
msgContext = messageExchange.sendAndReceive(msgContext);
+ if (log.isDebugEnabled()) {
+ log.debug("Exit: MessageExchangeHandler::invoke");
+ }
}
public MessageExchange getMessageExchange() {
1.2 +60 -1 xml-axis/java/src/org/apache/axis/ime/internal/util/NonPersistentKeyedBuffer.java
Index: NonPersistentKeyedBuffer.java
===================================================================
RCS file: /home/cvs/xml-axis/java/src/org/apache/axis/ime/internal/util/NonPersistentKeyedBuffer.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- NonPersistentKeyedBuffer.java 29 Oct 2002 05:15:30 -0000 1.1
+++ NonPersistentKeyedBuffer.java 29 Oct 2002 23:33:40 -0000 1.2
@@ -56,6 +56,9 @@
package org.apache.axis.ime.internal.util;
import org.apache.axis.i18n.Messages;
+import org.apache.axis.components.logger.LogFactory;
+import org.apache.commons.logging.Log;
+
import java.util.Vector;
/**
@@ -68,6 +71,9 @@
public class NonPersistentKeyedBuffer
implements KeyedBuffer {
+ protected static Log log =
+ LogFactory.getLog(NonPersistentKeyedBuffer.class.getName());
+
private final KeyedQueue messages = new KeyedQueue();
private WorkerPool WORKERS;
@@ -92,7 +98,9 @@
public void put(
Object key,
Object object) {
-
+ if (log.isDebugEnabled()) {
+ log.debug("Enter: KeyedBuffer::put");
+ }
if (key == null ||
object == null)
throw new IllegalArgumentException(Messages.getMessage("illegalArgumentException00"));
@@ -101,9 +109,15 @@
messages.put(new KeyedNode(key, object));
messages.notify();
}
+ if (log.isDebugEnabled()) {
+ log.debug("Exit: KeyedBuffer::put");
+ }
}
public Object cancel(Object key) {
+ if (log.isDebugEnabled()) {
+ log.debug("Enter: KeyedBuffer::cancel");
+ }
if (key == null)
throw new IllegalArgumentException(Messages.getMessage("illegalArgumentException00"));
Object object = null;
@@ -114,10 +128,16 @@
node.key = null;
node.value = null;
}
+ if (log.isDebugEnabled()) {
+ log.debug("Exit: KeyedBuffer::cancel");
+ }
return object;
}
public Object[] selectAll() {
+ if (log.isDebugEnabled()) {
+ log.debug("Enter: KeyedBuffer::selectAll");
+ }
Vector v = new Vector();
KeyedNode node = null;
synchronized (messages) {
@@ -130,11 +150,17 @@
Object[] objects = new
Object[v.size()];
v.copyInto(objects);
+ if (log.isDebugEnabled()) {
+ log.debug("Exit: KeyedBuffer::selectAll");
+ }
return objects;
}
public Object select()
throws InterruptedException {
+ if (log.isDebugEnabled()) {
+ log.debug("Enter: KeyedBuffer::select");
+ }
for (; ;) {
if (WORKERS.isShuttingDown())
throw new IllegalStateException(Messages.getMessage("illegalStateException00"));
@@ -146,6 +172,9 @@
Object object = node.value;
node.key = null;
node.value = null;
+ if (log.isDebugEnabled()) {
+ log.debug("Exit: KeyedBuffer::select");
+ }
return object;
} else {
messages.wait();
@@ -155,6 +184,9 @@
public Object select(long timeout)
throws InterruptedException {
+ if (log.isDebugEnabled()) {
+ log.debug("Enter: KeyedBuffer::select");
+ }
for (; ;) {
if (WORKERS.isShuttingDown())
throw new IllegalStateException(Messages.getMessage("illegalStateException00"));
@@ -166,6 +198,9 @@
Object object = node.value;
node.key = null;
node.value = null;
+ if (log.isDebugEnabled()) {
+ log.debug("Exit: KeyedBuffer::select");
+ }
return object;
} else {
messages.wait(timeout);
@@ -175,6 +210,9 @@
public Object select(Object key)
throws InterruptedException {
+ if (log.isDebugEnabled()) {
+ log.debug("Enter: KeyedBuffer::select");
+ }
for (; ;) {
if (WORKERS.isShuttingDown())
throw new IllegalStateException(Messages.getMessage("illegalStateException00"));
@@ -186,6 +224,9 @@
Object object = node.value;
node.key = null;
node.value = null;
+ if (log.isDebugEnabled()) {
+ log.debug("Exit: KeyedBuffer::select");
+ }
return object;
} else {
messages.wait();
@@ -195,6 +236,9 @@
public Object select(Object key, long timeout)
throws InterruptedException {
+ if (log.isDebugEnabled()) {
+ log.debug("Enter: KeyedBuffer::select");
+ }
for (; ;) {
if (WORKERS.isShuttingDown())
throw new IllegalStateException(Messages.getMessage("illegalStateException00"));
@@ -206,6 +250,9 @@
Object object = node.value;
node.key = null;
node.value = null;
+ if (log.isDebugEnabled()) {
+ log.debug("Exit: KeyedBuffer::select");
+ }
return object;
} else {
messages.wait(timeout);
@@ -214,6 +261,9 @@
}
public Object get() {
+ if (log.isDebugEnabled()) {
+ log.debug("Enter: KeyedBuffer::get");
+ }
KeyedNode node = null;
Object object = null;
synchronized (messages) {
@@ -224,10 +274,16 @@
node.key = null;
node.value = null;
}
+ if (log.isDebugEnabled()) {
+ log.debug("Exit: KeyedBuffer::get");
+ }
return object;
}
public Object get(Object key) {
+ if (log.isDebugEnabled()) {
+ log.debug("Enter: KeyedBuffer::get");
+ }
KeyedNode node = null;
Object object = null;
synchronized (messages) {
@@ -237,6 +293,9 @@
object = node.value;
node.key = null;
node.value = null;
+ }
+ if (log.isDebugEnabled()) {
+ log.debug("Exit: KeyedBuffer::get");
}
return object;
}
1.3 +78 -10 xml-axis/java/src/org/apache/axis/ime/internal/util/WorkerPool.java
Index: WorkerPool.java
===================================================================
RCS file: /home/cvs/xml-axis/java/src/org/apache/axis/ime/internal/util/WorkerPool.java,v
retrieving revision 1.2
retrieving revision 1.3
diff -u -r1.2 -r1.3
--- WorkerPool.java 29 Oct 2002 22:29:26 -0000 1.2
+++ WorkerPool.java 29 Oct 2002 23:33:40 -0000 1.3
@@ -56,6 +56,8 @@
package org.apache.axis.ime.internal.util;
import org.apache.axis.i18n.Messages;
+import org.apache.axis.components.logger.LogFactory;
+import org.apache.commons.logging.Log;
import java.util.Hashtable;
import java.util.Iterator;
@@ -66,6 +68,9 @@
*/
public class WorkerPool {
+ protected static Log log =
+ LogFactory.getLog(WorkerPool.class.getName());
+
public static final long MAX_THREADS = 100;
protected Map threads = new Hashtable();
@@ -74,6 +79,9 @@
public void cleanup()
throws InterruptedException {
+ if (log.isDebugEnabled()) {
+ log.debug("Enter: WorkerPool::cleanup");
+ }
if (!isShutdown()) {
safeShutdown();
awaitShutdown();
@@ -82,6 +90,9 @@
threads.clear();
_shutdown = false;
}
+ if (log.isDebugEnabled()) {
+ log.debug("Exit: WorkerPool::cleanup");
+ }
}
/**
@@ -116,6 +127,9 @@
*/
public void addWorker(
Runnable worker) {
+ if (log.isDebugEnabled()) {
+ log.debug("Enter: WorkerPool::addWorker");
+ }
if (_shutdown ||
threadcount == MAX_THREADS)
throw new IllegalStateException(Messages.getMessage("illegalStateException00"));
@@ -123,37 +137,58 @@
threads.put(worker, thread);
threadcount++;
thread.start();
+ if (log.isDebugEnabled()) {
+ log.debug("Exit: WorkerPool::addWorker");
+ }
}
/**
* Forcefully interrupt all workers
*/
public void interruptAll() {
+ if (log.isDebugEnabled()) {
+ log.debug("Enter: WorkerPool::interruptAll");
+ }
synchronized (threads) {
for (Iterator i = threads.values().iterator(); i.hasNext();) {
Thread t = (Thread) i.next();
t.interrupt();
}
}
+ if (log.isDebugEnabled()) {
+ log.debug("Exit: WorkerPool::interruptAll");
+ }
}
/**
* Forcefully shutdown the pool
*/
public void shutdown() {
+ if (log.isDebugEnabled()) {
+ log.debug("Enter: WorkerPool::shutdown");
+ }
synchronized (this) {
_shutdown = true;
}
interruptAll();
+ if (log.isDebugEnabled()) {
+ log.debug("Exit: WorkerPool::shutdown");
+ }
}
/**
* Forcefully shutdown the pool
*/
public void safeShutdown() {
+ if (log.isDebugEnabled()) {
+ log.debug("Enter: WorkerPool::safeShutdown");
+ }
synchronized (this) {
_shutdown = true;
}
+ if (log.isDebugEnabled()) {
+ log.debug("Exit: WorkerPool::safeShutdown");
+ }
}
/**
@@ -161,10 +196,16 @@
*/
public synchronized void awaitShutdown()
throws InterruptedException {
+ if (log.isDebugEnabled()) {
+ log.debug("Enter: WorkerPool::awaitShutdown");
+ }
if (!_shutdown)
throw new IllegalStateException(Messages.getMessage("illegalStateException00"));
while (threadcount > 0)
wait();
+ if (log.isDebugEnabled()) {
+ log.debug("Exit: WorkerPool::awaitShutdown");
+ }
}
/**
@@ -172,36 +213,63 @@
*/
public synchronized boolean awaitShutdown(long timeout)
throws InterruptedException {
+ if (log.isDebugEnabled()) {
+ log.debug("Enter: WorkerPool::awaitShutdown");
+ }
if (!_shutdown)
throw new IllegalStateException(Messages.getMessage("illegalStateException00"));
- if (threadcount == 0)
+ if (threadcount == 0) {
+ if (log.isDebugEnabled()) {
+ log.debug("Exit: WorkerPool::awaitShutdown");
+ }
return true;
+ }
long waittime = timeout;
- if (waittime <= 0)
+ if (waittime <= 0) {
+ if (log.isDebugEnabled()) {
+ log.debug("Exit: WorkerPool::awaitShutdown");
+ }
return false;
+ }
long start = System.currentTimeMillis();
for (; ;) {
wait(waittime);
- if (threadcount == 0)
+ if (threadcount == 0) {
+ if (log.isDebugEnabled()) {
+ log.debug("Exit: WorkerPool::awaitShutdown");
+ }
return true;
+ }
waittime = timeout - System.currentTimeMillis();
- if (waittime <= 0)
+ if (waittime <= 0) {
+ if (log.isDebugEnabled()) {
+ log.debug("Exit: WorkerPool::awaitShutdown");
+ }
return false;
+ }
}
}
/**
* Used by MessageWorkers to notify the pool that it is done
*/
- public synchronized void workerDone(
+ public void workerDone(
Runnable worker) {
- threads.remove(worker);
- if (--threadcount == 0 && _shutdown) {
- notifyAll();
+ if (log.isDebugEnabled()) {
+ log.debug("Enter: WorkerPool::workerDone");
}
- if (!_shutdown) {
- addWorker(worker);
+ synchronized(this) {
+ threads.remove(worker);
+ if (--threadcount == 0 && _shutdown) {
+ notifyAll();
+ }
+ if (!_shutdown) {
+ addWorker(worker);
+ }
}
+ if (log.isDebugEnabled()) {
+ log.debug("Exit: WorkerPool::workerDone");
+ }
}
}
1.7 +66 -0 xml-axis/java/src/org/apache/axis/ime/internal/MessageExchangeImpl.java
Index: MessageExchangeImpl.java
===================================================================
RCS file: /home/cvs/xml-axis/java/src/org/apache/axis/ime/internal/MessageExchangeImpl.java,v
retrieving revision 1.6
retrieving revision 1.7
diff -u -r1.6 -r1.7
--- MessageExchangeImpl.java 29 Oct 2002 22:29:26 -0000 1.6
+++ MessageExchangeImpl.java 29 Oct 2002 23:33:40 -0000 1.7
@@ -66,6 +66,8 @@
import org.apache.axis.ime.MessageContextListener;
import org.apache.axis.ime.MessageExchangeLifecycle;
import org.apache.axis.ime.internal.util.uuid.UUIDGenFactory;
+import org.apache.axis.components.logger.LogFactory;
+import org.apache.commons.logging.Log;
import java.util.Map;
@@ -75,6 +77,9 @@
public class MessageExchangeImpl
implements MessageExchange, MessageExchangeLifecycle {
+ protected static Log log =
+ LogFactory.getLog(MessageExchangeImpl.class.getName());
+
public static final long NO_TIMEOUT = -1;
public static final long DEFAULT_TIMEOUT = 1000 * 30;
@@ -103,6 +108,9 @@
MessageContext context,
MessageContextListener listener)
throws AxisFault {
+ if (log.isDebugEnabled()) {
+ log.debug("Enter: MessageExchangeImpl::send");
+ }
MessageExchangeCorrelator correlator =
(MessageExchangeCorrelator) context.getProperty(
MessageExchangeConstants.MESSAGE_CORRELATOR_PROPERTY);
@@ -127,6 +135,9 @@
context,
faultListener,
statusListener));
+ if (log.isDebugEnabled()) {
+ log.debug("Exit: MessageExchangeImpl::send");
+ }
return correlator;
}
@@ -163,6 +174,9 @@
MessageExchangeCorrelator correlator,
long timeout)
throws AxisFault {
+ if (log.isDebugEnabled()) {
+ log.debug("Enter: MessageExchangeImpl::receive");
+ }
holder = new Holder();
Listener listener = new Listener(holder);
setMessageExchangeFaultListener(listener);
@@ -175,6 +189,9 @@
} catch (InterruptedException ie) {
throw AxisFault.makeFault(ie);
}
+ if (log.isDebugEnabled()) {
+ log.debug("Exit: MessageExchangeImpl::receive");
+ }
if (holder.context != null) {
return holder.context;
}
@@ -200,12 +217,19 @@
MessageExchangeCorrelator correlator,
MessageContextListener listener)
throws AxisFault {
+ if (log.isDebugEnabled()) {
+ log.debug("Enter: MessageExchangeImpl::receive");
+ }
provider.processReceive(
MessageExchangeReceiveContext.newInstance(
correlator,
listener,
faultListener,
statusListener));
+ if (log.isDebugEnabled()) {
+ log.debug("Exit: MessageExchangeImpl::receive");
+ }
+
}
/**
@@ -224,6 +248,9 @@
MessageContext context,
long timeout)
throws AxisFault {
+ if (log.isDebugEnabled()) {
+ log.debug("Enter: MessageExchangeImpl::sendAndReceive");
+ }
holder = new Holder();
Listener listener = new Listener(holder);
setMessageExchangeFaultListener(listener);
@@ -236,6 +263,9 @@
} catch (InterruptedException ie) {
throw AxisFault.makeFault(ie);
}
+ if (log.isDebugEnabled()) {
+ log.debug("Exit: MessageExchangeImpl::sendAndReceive");
+ }
if (holder.context != null) {
return holder.context;
}
@@ -436,7 +466,13 @@
*/
public void awaitShutdown()
throws InterruptedException {
+ if (log.isDebugEnabled()) {
+ log.debug("Enter: MessageExchangeImpl::awaitShutdown");
+ }
provider.awaitShutdown();
+ if (log.isDebugEnabled()) {
+ log.debug("Exit: MessageExchangeImpl::awaitShutdown");
+ }
}
/**
@@ -444,7 +480,13 @@
*/
public void cleanup()
throws InterruptedException {
+ if (log.isDebugEnabled()) {
+ log.debug("Enter: MessageExchangeImpl::cleanup");
+ }
provider.cleanup();
+ if (log.isDebugEnabled()) {
+ log.debug("Exit: MessageExchangeImpl::cleanup");
+ }
}
/**
@@ -452,28 +494,52 @@
*/
public void awaitShutdown(long timeout)
throws InterruptedException {
+ if (log.isDebugEnabled()) {
+ log.debug("Enter: MessageExchangeImpl::awaitShutdown");
+ }
provider.awaitShutdown(timeout);
+ if (log.isDebugEnabled()) {
+ log.debug("Exit: MessageExchangeImpl::awaitShutdown");
+ }
}
/**
* @see org.apache.axis.ime.MessageExchangeLifecycle#init()
*/
public void init() {
+ if (log.isDebugEnabled()) {
+ log.debug("Enter: MessageExchangeImpl::init");
+ }
provider.init();
+ if (log.isDebugEnabled()) {
+ log.debug("Exit: MessageExchangeImpl::init");
+ }
}
/**
* @see org.apache.axis.ime.MessageExchangeLifecycle#shutdown()
*/
public void shutdown() {
+ if (log.isDebugEnabled()) {
+ log.debug("Enter: MessageExchangeImpl::shutdown");
+ }
provider.shutdown();
+ if (log.isDebugEnabled()) {
+ log.debug("Exit: MessageExchangeImpl::shutdown");
+ }
}
/**
* @see org.apache.axis.ime.MessageExchangeLifecycle#shutdown(boolean)
*/
public void shutdown(boolean force) {
+ if (log.isDebugEnabled()) {
+ log.debug("Enter: MessageExchangeImpl::shutdown");
+ }
provider.shutdown(force);
+ if (log.isDebugEnabled()) {
+ log.debug("Exit: MessageExchangeImpl::shutdown");
+ }
}
}
1.2 +13 -0 xml-axis/java/src/org/apache/axis/ime/internal/FirstComeFirstServeDispatchPolicy.java
Index: FirstComeFirstServeDispatchPolicy.java
===================================================================
RCS file: /home/cvs/xml-axis/java/src/org/apache/axis/ime/internal/FirstComeFirstServeDispatchPolicy.java,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -r1.1 -r1.2
--- FirstComeFirstServeDispatchPolicy.java 29 Oct 2002 05:15:29 -0000 1.1
+++ FirstComeFirstServeDispatchPolicy.java 29 Oct 2002 23:33:40 -0000 1.2
@@ -59,6 +59,8 @@
import org.apache.axis.ime.MessageContextListener;
import org.apache.axis.ime.MessageExchangeFaultListener;
import org.apache.axis.ime.internal.util.KeyedBuffer;
+import org.apache.axis.components.logger.LogFactory;
+import org.apache.commons.logging.Log;
/**
* @author James M Snell (jasnell@us.ibm.com)
@@ -66,6 +68,9 @@
public class FirstComeFirstServeDispatchPolicy
implements ReceivedMessageDispatchPolicy {
+ protected static Log log =
+ LogFactory.getLog(FirstComeFirstServeDispatchPolicy.class.getName());
+
protected KeyedBuffer RECEIVE_REQUESTS;
protected KeyedBuffer RECEIVE;
@@ -78,6 +83,10 @@
public void dispatch(
MessageExchangeSendContext context) {
+
+ if (log.isDebugEnabled()) {
+ log.debug("Enter: FirstComeFirstServeDispatchPolicy::dispatch");
+ }
// 1. Get the correlator
// 2. See if there are any receive requests based on the correlator
@@ -110,6 +119,10 @@
correlator, exception);
}
}
+ if (log.isDebugEnabled()) {
+ log.debug("Exit: FirstComeFirstServeDispatchPolicy::dispatch");
+ }
+
}
}
1.8 +67 -10 xml-axis/java/src/org/apache/axis/ime/internal/MessageExchangeProvider.java
Index: MessageExchangeProvider.java
===================================================================
RCS file: /home/cvs/xml-axis/java/src/org/apache/axis/ime/internal/MessageExchangeProvider.java,v
retrieving revision 1.7
retrieving revision 1.8
diff -u -r1.7 -r1.8
--- MessageExchangeProvider.java 29 Oct 2002 22:42:24 -0000 1.7
+++ MessageExchangeProvider.java 29 Oct 2002 23:33:40 -0000 1.8
@@ -67,6 +67,8 @@
import org.apache.axis.ime.internal.util.WorkerPool;
import org.apache.axis.ime.internal.util.KeyedBuffer;
import org.apache.axis.ime.internal.util.NonPersistentKeyedBuffer;
+import org.apache.axis.components.logger.LogFactory;
+import org.apache.commons.logging.Log;
import java.util.Map;
@@ -76,6 +78,9 @@
public abstract class MessageExchangeProvider
implements MessageExchangeFactory {
+ protected static Log log =
+ LogFactory.getLog(MessageExchangeProvider.class.getName());
+
public static final long SELECT_TIMEOUT = 1000 * 30;
public static final long DEFAULT_THREAD_COUNT = 5;
@@ -117,7 +122,13 @@
public void cleanup()
throws InterruptedException {
+ if (log.isDebugEnabled()) {
+ log.debug("Enter: MessageExchangeProvider::cleanup");
+ }
WORKERS.cleanup();
+ if (log.isDebugEnabled()) {
+ log.debug("Exit: MessageExchangeProvider::cleanup");
+ }
}
public void init() {
@@ -125,6 +136,9 @@
}
public void init(long THREAD_COUNT) {
+ if (log.isDebugEnabled()) {
+ log.debug("Enter: MessageExchangeProvider::init");
+ }
if (initialized)
throw new IllegalStateException(Messages.getMessage("illegalStateException00"));
for (int n = 0; n < THREAD_COUNT; n++) {
@@ -132,20 +146,35 @@
WORKERS.addWorker(new MessageReceiver(WORKERS, RECEIVE, getReceivedMessageDispatchPolicy(), getReceiveHandler()));
}
initialized = true;
+ if (log.isDebugEnabled()) {
+ log.debug("Exit: MessageExchangeProvider::init");
+ }
}
public void processReceive(
MessageExchangeReceiveContext context) {
+ if (log.isDebugEnabled()) {
+ log.debug("Enter: MessageExchangeProvider::processReceive");
+ }
RECEIVE_REQUESTS.put(
context.getMessageExchangeCorrelator(),
context);
+ if (log.isDebugEnabled()) {
+ log.debug("Exit: MessageExchangeProvider::processReceive");
+ }
}
public void processSend(
MessageExchangeSendContext context) {
+ if (log.isDebugEnabled()) {
+ log.debug("Enter: MessageExchangeProvider::processSend");
+ }
SEND.put(
context.getMessageExchangeCorrelator(),
context);
+ if (log.isDebugEnabled()) {
+ log.debug("Exit: MessageExchangeProvider::processSend");
+ }
}
public void shutdown() {
@@ -153,21 +182,39 @@
}
public void shutdown(boolean force) {
+ if (log.isDebugEnabled()) {
+ log.debug("Enter: MessageExchangeProvider::shutdown");
+ }
if (!force) {
WORKERS.safeShutdown();
} else {
WORKERS.shutdown();
}
+ if (log.isDebugEnabled()) {
+ log.debug("Exit: MessageExchangeProvider::shutdown");
+ }
}
public void awaitShutdown()
throws InterruptedException {
+ if (log.isDebugEnabled()) {
+ log.debug("Enter: MessageExchangeProvider::awaitShutdown");
+ }
WORKERS.awaitShutdown();
+ if (log.isDebugEnabled()) {
+ log.debug("Exit: MessageExchangeProvider::awaitShutdown");
+ }
}
public void awaitShutdown(long shutdown)
throws InterruptedException {
+ if (log.isDebugEnabled()) {
+ log.debug("Enter: MessageExchangeProvider::awaitShutdown");
+ }
WORKERS.awaitShutdown(shutdown);
+ if (log.isDebugEnabled()) {
+ log.debug("Exit: MessageExchangeProvider::awaitShutdown");
+ }
}
@@ -176,6 +223,9 @@
public static class MessageReceiver
implements Runnable {
+ protected static Log log =
+ LogFactory.getLog(MessageReceiver.class.getName());
+
protected WorkerPool pool;
protected KeyedBuffer channel;
protected ReceivedMessageDispatchPolicy policy;
@@ -196,6 +246,9 @@
* @see java.lang.Runnable#run()
*/
public void run() {
+ if (log.isDebugEnabled()) {
+ log.debug("Enter: MessageExchangeProvider.MessageReceiver::run");
+ }
try {
while (!pool.isShuttingDown()) {
MessageExchangeSendContext context = (MessageExchangeSendContext)channel.select(SELECT_TIMEOUT);
@@ -206,13 +259,12 @@
}
}
} catch (Throwable t) {
- // kill the thread if any type of exception occurs.
- // don't worry, we'll create another one to replace it
- // if we're not currently in the process of shutting down.
- // once I get the logging function plugged in, we'll
- // log whatever errors do occur
+ log.error(Messages.getMessage("fault00"), t);
} finally {
pool.workerDone(this);
+ if (log.isDebugEnabled()) {
+ log.debug("Exit: MessageExchangeProvider.MesageReceiver::run");
+ }
}
}
@@ -222,6 +274,9 @@
public static class MessageSender
implements Runnable {
+
+ protected static Log log =
+ LogFactory.getLog(MessageReceiver.class.getName());
protected WorkerPool pool;
protected KeyedBuffer channel;
@@ -243,6 +298,9 @@
* @see java.lang.Runnable#run()
*/
public void run() {
+ if (log.isDebugEnabled()) {
+ log.debug("Enter: MessageExchangeProvider.MessageSender::run");
+ }
try {
while (!pool.isShuttingDown()) {
MessageExchangeSendContext context = (MessageExchangeSendContext)channel.select(SELECT_TIMEOUT);
@@ -253,13 +311,12 @@
}
}
} catch (Throwable t) {
- // kill the thread if any type of exception occurs.
- // don't worry, we'll create another one to replace it
- // if we're not currently in the process of shutting down.
- // once I get the logging function plugged in, we'll
- // log whatever errors do occur
+ log.error(Messages.getMessage("fault00"), t);
} finally {
pool.workerDone(this);
+ if (log.isDebugEnabled()) {
+ log.debug("Exit: MessageExchangeProvider.MessageSender::run");
+ }
}
}