You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@synapse.apache.org by as...@apache.org on 2007/03/29 20:10:56 UTC

svn commit: r523787 - in /webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp: Axis2HttpRequest.java ClientHandler.java ServerHandler.java util/ util/PipeImpl.java

Author: asankha
Date: Thu Mar 29 11:10:55 2007
New Revision: 523787

URL: http://svn.apache.org/viewvc?view=rev&rev=523787
Log:
Fix SYNAPSE-86

Added:
    webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/util/
    webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/util/PipeImpl.java
Modified:
    webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/Axis2HttpRequest.java
    webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/ClientHandler.java
    webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/ServerHandler.java

Modified: webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/Axis2HttpRequest.java
URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/Axis2HttpRequest.java?view=diff&rev=523787&r1=523786&r2=523787
==============================================================================
--- webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/Axis2HttpRequest.java (original)
+++ webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/Axis2HttpRequest.java Thu Mar 29 11:10:55 2007
@@ -21,6 +21,7 @@
 import org.apache.axis2.addressing.EndpointReference;
 import org.apache.axis2.context.MessageContext;
 import org.apache.axis2.AxisFault;
+import org.apache.axis2.transport.nhttp.util.PipeImpl;
 import org.apache.http.*;
 import org.apache.http.protocol.HTTP;
 import org.apache.http.entity.BasicHttpEntity;
@@ -34,6 +35,7 @@
 import java.io.OutputStream;
 import java.nio.channels.Pipe;
 import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
 import java.util.Map;
 import java.util.Iterator;
 
@@ -53,14 +55,14 @@
     /** the message context being sent */
     private MessageContext msgContext = null;
     /** the Pipe which facilitates the serialization output to be written to the channel */
-    private Pipe pipe = null;
+    private PipeImpl pipe = null;
 
     public Axis2HttpRequest(EndpointReference epr, HttpHost httpHost, MessageContext msgContext) {
         this.epr = epr;
         this.httpHost = httpHost;
         this.msgContext = msgContext;
         try {
-            this.pipe = Pipe.open();
+            this.pipe = new PipeImpl();
         } catch (IOException e) {
             log.error("Error creating pipe to write message body");
         }
@@ -111,7 +113,7 @@
      * Return the source channel of the pipe that bridges the serialized output to the socket
      * @return source channel to read serialized message contents
      */
-    public Pipe.SourceChannel getSourceChannel() {
+    public ReadableByteChannel getSourceChannel() {
         log.debug("get source channel of the pipe on which the outgoing response is written");
         return pipe.source();
     }

Modified: webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/ClientHandler.java
URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/ClientHandler.java?view=diff&rev=523787&r1=523786&r2=523787
==============================================================================
--- webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/ClientHandler.java (original)
+++ webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/ClientHandler.java Thu Mar 29 11:10:55 2007
@@ -30,6 +30,7 @@
 import org.apache.axis2.util.threadpool.DefaultThreadFactory;
 import org.apache.axis2.context.ConfigurationContext;
 import org.apache.axis2.context.MessageContext;
+import org.apache.axis2.transport.nhttp.util.PipeImpl;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.axiom.soap.SOAP11Constants;
@@ -38,6 +39,8 @@
 import java.nio.ByteBuffer;
 import java.nio.channels.Channels;
 import java.nio.channels.Pipe;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
 import java.io.IOException;
 
 import edu.emory.mathcs.backport.java.util.concurrent.Executor;
@@ -205,7 +208,7 @@
     public void inputReady(final NHttpClientConnection conn, final ContentDecoder decoder) {
         HttpContext context = conn.getContext();
         HttpResponse response = conn.getHttpResponse();
-        Pipe.SinkChannel sink = (Pipe.SinkChannel) context.getAttribute(RESPONSE_SINK_CHANNEL);
+        WritableByteChannel sink = (WritableByteChannel) context.getAttribute(RESPONSE_SINK_CHANNEL);
         ByteBuffer inbuf = (ByteBuffer) context.getAttribute(REQUEST_BUFFER);
 
         try {
@@ -238,7 +241,7 @@
         HttpContext context = conn.getContext();
         HttpResponse response = conn.getHttpResponse();
 
-        Pipe.SourceChannel source = (Pipe.SourceChannel) context.getAttribute(REQUEST_SOURCE_CHANNEL);
+        ReadableByteChannel source = (ReadableByteChannel) context.getAttribute(REQUEST_SOURCE_CHANNEL);
         ByteBuffer outbuf = (ByteBuffer) context.getAttribute(RESPONSE_BUFFER);
 
         try {
@@ -301,7 +304,7 @@
     private void processResponse(final NHttpClientConnection conn, HttpContext context, HttpResponse response) {
 
         try {
-            Pipe responsePipe = Pipe.open();
+            PipeImpl responsePipe = new PipeImpl();
             context.setAttribute(RESPONSE_SINK_CHANNEL, responsePipe.sink());
 
             BasicHttpEntity entity = new BasicHttpEntity();

Modified: webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/ServerHandler.java
URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/ServerHandler.java?view=diff&rev=523787&r1=523786&r2=523787
==============================================================================
--- webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/ServerHandler.java (original)
+++ webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/ServerHandler.java Thu Mar 29 11:10:55 2007
@@ -24,6 +24,7 @@
 import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
 import org.apache.axis2.context.ConfigurationContext;
 import org.apache.axis2.util.threadpool.DefaultThreadFactory;
+import org.apache.axis2.transport.nhttp.util.PipeImpl;
 import org.apache.http.*;
 import org.apache.http.entity.BasicHttpEntity;
 import org.apache.http.entity.ByteArrayEntity;
@@ -43,6 +44,8 @@
 import java.nio.ByteBuffer;
 import java.nio.channels.Channels;
 import java.nio.channels.Pipe;
+import java.nio.channels.WritableByteChannel;
+import java.nio.channels.ReadableByteChannel;
 
 /**
  * The server connection handler. An instance of this class is used by each IOReactor, to
@@ -104,8 +107,8 @@
         context.setAttribute(RESPONSE_BUFFER, ByteBuffer.allocate(2048));
 
         try {
-            Pipe requestPipe = Pipe.open();     // the pipe used to process the request
-            Pipe responsePipe = Pipe.open();    // the pipe used to process the response
+            PipeImpl requestPipe  = new PipeImpl(); // the pipe used to process the request
+            PipeImpl responsePipe = new PipeImpl(); // the pipe used to process the response
             context.setAttribute(REQUEST_SINK_CHANNEL, requestPipe.sink());
             context.setAttribute(RESPONSE_SOURCE_CHANNEL, responsePipe.source());
 
@@ -143,7 +146,7 @@
     public void inputReady(final NHttpServerConnection conn, final ContentDecoder decoder) {
 
         HttpContext context = conn.getContext();
-        Pipe.SinkChannel sink = (Pipe.SinkChannel) context.getAttribute(REQUEST_SINK_CHANNEL);
+        WritableByteChannel sink = (WritableByteChannel) context.getAttribute(REQUEST_SINK_CHANNEL);
         ByteBuffer inbuf = (ByteBuffer) context.getAttribute(REQUEST_BUFFER);
 
         try {
@@ -175,7 +178,7 @@
 
         HttpContext context = conn.getContext();
         HttpResponse response = conn.getHttpResponse();
-        Pipe.SourceChannel source = (Pipe.SourceChannel) context.getAttribute(RESPONSE_SOURCE_CHANNEL);
+        ReadableByteChannel source = (ReadableByteChannel) context.getAttribute(RESPONSE_SOURCE_CHANNEL);
         ByteBuffer outbuf = (ByteBuffer) context.getAttribute(RESPONSE_BUFFER);
 
         try {
@@ -264,7 +267,8 @@
      * @param e the exception encountered
      */
     public void exception(NHttpServerConnection conn, IOException e) {
-        if (e instanceof ConnectionClosedException) {
+        if (e instanceof ConnectionClosedException ||
+            "Connection reset by peer".equals(e.getMessage())) {
             log.debug("I/O error: " + e.getMessage());
         } else {
             log.error("I/O error: " + e.getMessage());

Added: webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/util/PipeImpl.java
URL: http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/util/PipeImpl.java?view=auto&rev=523787
==============================================================================
--- webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/util/PipeImpl.java (added)
+++ webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/util/PipeImpl.java Thu Mar 29 11:10:55 2007
@@ -0,0 +1,86 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.axis2.transport.nhttp.util;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.nio.channels.Pipe;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
+import java.nio.channels.spi.SelectorProvider;
+import java.nio.ByteBuffer;
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
+import java.io.IOException;
+import java.io.File;
+
+/**
+ * Create a Pipe suitable for the runtime platform. The java.nio.channels.Pipe implementation
+ * on Windows uses TCP ports bound to the loopback interface to implement a Pipe. In Linux and
+ * Solaris this is passed to a native method.
+ */
+public class PipeImpl {
+
+    private static final Log log = LogFactory.getLog(PipeImpl.class);
+
+    private ReadableByteChannel source;
+    private WritableByteChannel sink;
+
+    private PipedOutputStream pipedOut;
+    protected static boolean useNative;
+
+    static {
+        if (!"/".equals(File.separator)) {
+            log.info("Using simulated buffered Pipes for event-driven to stream IO bridging");
+        } else {
+            log.info("Using native OS Pipes for event-driven to stream IO bridging");
+            useNative = true;
+        }
+    }
+
+    public PipeImpl() throws IOException {
+        if (useNative) {
+            Pipe pipe = Pipe.open();
+            source = pipe.source();
+            sink = pipe.sink();
+
+        } else {
+            PipedInputStream pipedIn = new PipedInputStream();
+            try {
+                pipedOut = new PipedOutputStream(pipedIn);
+            } catch (IOException e) {
+                e.printStackTrace();
+            }
+
+            source = Channels.newChannel(pipedIn);
+            sink = Channels.newChannel(pipedOut);
+        }
+    }
+
+    public ReadableByteChannel source() {
+        return source;
+    }
+
+    public WritableByteChannel sink() {
+        return sink;
+    }
+}



---------------------------------------------------------------------
To unsubscribe, e-mail: synapse-dev-unsubscribe@ws.apache.org
For additional commands, e-mail: synapse-dev-help@ws.apache.org