You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@synapse.apache.org by as...@apache.org on 2008/06/02 06:56:14 UTC
svn commit: r662321 - in
/synapse/branches/1.2/modules/transports/src/main/java/org/apache/synapse/transport/nhttp:
Axis2HttpRequest.java ClientHandler.java HttpCoreNIOSender.java
Author: asankha
Date: Sun Jun 1 21:56:13 2008
New Revision: 662321
URL: http://svn.apache.org/viewvc?rev=662321&view=rev
Log:
fix SYNAPSE-338
Modified:
synapse/branches/1.2/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/Axis2HttpRequest.java
synapse/branches/1.2/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/ClientHandler.java
synapse/branches/1.2/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/HttpCoreNIOSender.java
Modified: synapse/branches/1.2/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/Axis2HttpRequest.java
URL: http://svn.apache.org/viewvc/synapse/branches/1.2/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/Axis2HttpRequest.java?rev=662321&r1=662320&r2=662321&view=diff
==============================================================================
--- synapse/branches/1.2/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/Axis2HttpRequest.java (original)
+++ synapse/branches/1.2/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/Axis2HttpRequest.java Sun Jun 1 21:56:13 2008
@@ -208,39 +208,39 @@
}
synchronized(this) {
- while (!readyToStream) {
+ while (!readyToStream && !completed) {
try {
this.wait();
} catch (InterruptedException ignore) {}
}
}
- OutputStream out = new ContentOutputStream(outputBuffer);
- try {
- messageFormatter.writeTo(msgContext, format, out, true);
- } catch (Exception e) {
- Throwable t = e.getCause();
- if (t != null && t.getCause() != null && t.getCause() instanceof ClosedChannelException) {
- if (log.isDebugEnabled()) {
- log.debug("Ignore closed channel exception, as the " +
- "SessionRequestCallback handles this exception");
- }
- } else {
- /* close PipeImpl will manually raise exception
- while streaming, so blocking status will be released */
- if (e instanceof AxisFault) {
- throw (AxisFault) e;
+ if (!completed) {
+ OutputStream out = new ContentOutputStream(outputBuffer);
+ try {
+ messageFormatter.writeTo(msgContext, format, out, true);
+ } catch (Exception e) {
+ Throwable t = e.getCause();
+ if (t != null && t.getCause() != null && t.getCause() instanceof ClosedChannelException) {
+ if (log.isDebugEnabled()) {
+ log.debug("Ignore closed channel exception, as the " +
+ "SessionRequestCallback handles this exception");
+ }
} else {
- handleException("Error streaming message context", e);
+ if (e instanceof AxisFault) {
+ throw (AxisFault) e;
+ } else {
+ handleException("Error streaming message context", e);
+ }
}
}
- }
- finally {
- try {
- out.flush();
- out.close();
- } catch (IOException e) {
- handleException("Error closing outgoing message stream", e);
+ finally {
+ try {
+ out.flush();
+ out.close();
+ } catch (IOException e) {
+ handleException("Error closing outgoing message stream", e);
+ }
}
}
}
@@ -257,5 +257,8 @@
public void setCompleted(boolean completed) {
this.completed = completed;
+ synchronized(this) {
+ this.notifyAll();
+ }
}
}
Modified: synapse/branches/1.2/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/ClientHandler.java
URL: http://svn.apache.org/viewvc/synapse/branches/1.2/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/ClientHandler.java?rev=662321&r1=662320&r2=662321&view=diff
==============================================================================
--- synapse/branches/1.2/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/ClientHandler.java (original)
+++ synapse/branches/1.2/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/ClientHandler.java Sun Jun 1 21:56:13 2008
@@ -261,7 +261,7 @@
* @param exceptionToRaise an Exception to be returned to the MR on failure
*/
private void checkAxisRequestComplete(NHttpClientConnection conn,
- String errorMessage, Exception exceptionToRaise) {
+ final String errorMessage, final Exception exceptionToRaise) {
Axis2HttpRequest axis2Request = (Axis2HttpRequest)
conn.getContext().getAttribute(AXIS2_HTTP_REQUEST);
@@ -273,32 +273,36 @@
return; // no need to continue
}
- MessageContext mc = axis2Request.getMsgContext();
+ final MessageContext mc = axis2Request.getMsgContext();
if (mc.getAxisOperation() != null &&
mc.getAxisOperation().getMessageReceiver() != null) {
- MessageReceiver mr = mc.getAxisOperation().getMessageReceiver();
- try {
- MessageContext nioFaultMessageContext = null;
- if (errorMessage != null) {
- nioFaultMessageContext = MessageContextBuilder.createFaultMessageContext(
- mc, new AxisFault(errorMessage));
- } else if (exceptionToRaise != null) {
- nioFaultMessageContext = MessageContextBuilder.createFaultMessageContext(
- /** this is not a mistake I do NOT want getMessage()*/
- mc, new AxisFault(exceptionToRaise.toString(), exceptionToRaise));
- }
+ workerPool.execute( new Runnable() {
+ public void run() {
+ MessageReceiver mr = mc.getAxisOperation().getMessageReceiver();
+ try {
+ MessageContext nioFaultMessageContext = null;
+ if (errorMessage != null) {
+ nioFaultMessageContext = MessageContextBuilder.createFaultMessageContext(
+ mc, new AxisFault(errorMessage));
+ } else if (exceptionToRaise != null) {
+ nioFaultMessageContext = MessageContextBuilder.createFaultMessageContext(
+ /** this is not a mistake I do NOT want getMessage()*/
+ mc, new AxisFault(exceptionToRaise.toString(), exceptionToRaise));
+ }
+
+ if (nioFaultMessageContext != null) {
+ nioFaultMessageContext.setProperty(
+ NhttpConstants.SENDING_FAULT, Boolean.TRUE);
+ mr.receive(nioFaultMessageContext);
+ }
- if (nioFaultMessageContext != null) {
- nioFaultMessageContext.setProperty(
- NhttpConstants.SENDING_FAULT, Boolean.TRUE);
- mr.receive(nioFaultMessageContext);
+ } catch (AxisFault af) {
+ log.error("Unable to report back failure to the message receiver", af);
+ }
}
-
- } catch (AxisFault af) {
- log.error("Unable to report back failure to the message receiver", af);
- }
+ });
}
}
}
@@ -546,6 +550,10 @@
}
+ public void execute(Runnable task) {
+ workerPool.execute(task);
+ }
+
// ----------- utility methods -----------
private void handleException(String msg, Exception e, NHttpClientConnection conn) {
Modified: synapse/branches/1.2/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/HttpCoreNIOSender.java
URL: http://svn.apache.org/viewvc/synapse/branches/1.2/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/HttpCoreNIOSender.java?rev=662321&r1=662320&r2=662321&view=diff
==============================================================================
--- synapse/branches/1.2/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/HttpCoreNIOSender.java (original)
+++ synapse/branches/1.2/modules/transports/src/main/java/org/apache/synapse/transport/nhttp/HttpCoreNIOSender.java Sun Jun 1 21:56:13 2008
@@ -476,7 +476,7 @@
* related to the outgoing request
* @return a Session request callback
*/
- private static SessionRequestCallback getSessionRequestCallback() {
+ private SessionRequestCallback getSessionRequestCallback() {
return new SessionRequestCallback() {
public void completed(SessionRequest request) {
}
@@ -503,7 +503,7 @@
axis2Request.setCompleted(true);
MessageContext mc = axis2Request.getMsgContext();
- MessageReceiver mr = mc.getAxisOperation().getMessageReceiver();
+ final MessageReceiver mr = mc.getAxisOperation().getMessageReceiver();
if (mr != null) {
try {
@@ -520,10 +520,19 @@
/** this is not a mistake I do NOT want getMessage()*/
axisFault = new AxisFault(exception.toString(), exception);
}
- MessageContext nioFaultMessageContext =
+ final MessageContext nioFaultMessageContext =
MessageContextBuilder.createFaultMessageContext(mc, axisFault);
nioFaultMessageContext.setProperty(NhttpConstants.SENDING_FAULT, Boolean.TRUE);
- mr.receive(nioFaultMessageContext);
+
+ handler.execute(new Runnable() {
+ public void run() {
+ try {
+ mr.receive(nioFaultMessageContext);
+ } catch (AxisFault af) {
+ log.error("Error processing fault message context", af);
+ }
+ }
+ });
} catch (AxisFault af) {
log.error("Unable to report back failure to the message receiver", af);