You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commons-dev@ws.apache.org by ve...@apache.org on 2009/01/03 01:43:15 UTC

svn commit: r730880 - in /webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon: ./ core/ core/filter/ eclipse/ui/

Author: veithen
Date: Fri Jan  2 16:43:14 2009
New Revision: 730880

URL: http://svn.apache.org/viewvc?rev=730880&view=rev
Log:
TCPMon: Refactored the code responsible for request/response processing ("Host" header rewriting, plain HTTP <-> proxy HTTP switching, XML formatting, etc.) to make it more readable, manageable and reusable. The new code is organized around a stream/pipeline/filter metaphor. The complexity of stream processing (buffering, rewriting, lookahead, etc.) is hidden in the new Pipeline class and a set of distinct filters replace the processing code contained in AbstractConnection and AbstractSocketRR.

The modifications should also solve WSCOMMONS-80, WSCOMMONS-380 and WSCOMMONS-425.

Added:
    webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/SocketRR.java
      - copied, changed from r730497, webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/AbstractSocketRR.java
    webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/filter/
    webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/filter/CharsetDecoderFilter.java
    webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/filter/HttpHeaderRewriter.java
    webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/filter/HttpProxyClientHandler.java
    webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/filter/HttpProxyServerHandler.java
    webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/filter/HttpRequestFilter.java
    webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/filter/Pipeline.java
    webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/filter/RequestLineExtractor.java
    webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/filter/Stream.java
    webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/filter/StreamFilter.java
    webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/filter/Tee.java
    webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/filter/XmlFormatFilter.java
Removed:
    webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/SocketRR.java
    webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/AbstractSocketRR.java
    webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/eclipse/ui/SocketRR.java
Modified:
    webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/Connection.java
    webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/SlowLinkSimulator.java
    webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/AbstractConnection.java
    webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/eclipse/ui/Connection.java

Modified: webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/Connection.java
URL: http://svn.apache.org/viewvc/webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/Connection.java?rev=730880&r1=730879&r2=730880&view=diff
==============================================================================
--- webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/Connection.java (original)
+++ webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/Connection.java Fri Jan  2 16:43:14 2009
@@ -21,10 +21,8 @@
 import javax.swing.ListSelectionModel;
 
 import org.apache.ws.commons.tcpmon.core.AbstractConnection;
-import org.apache.ws.commons.tcpmon.core.AbstractSocketRR;
 
 import java.io.InputStream;
-import java.io.OutputStream;
 import java.net.Socket;
 
 /**
@@ -126,21 +124,6 @@
         outputWriter = new JTextAreaWriter(outputText);
     }
     
-    protected AbstractSocketRR createInputSocketRR(Socket inSocket, InputStream inputStream,
-            Socket outSocket, OutputStream outputStream, boolean format,
-            SlowLinkSimulator slowLink) {
-        return new SocketRR(this, inSocket, inputStream, outSocket, outputStream,
-                format, listener.tableModel,
-                listener.connections.indexOf(this) + 1, slowLink, inputWriter);
-    }
-
-    protected AbstractSocketRR createOutputSocketRR(Socket outSocket, InputStream inputStream,
-            Socket inSocket, OutputStream outputStream, boolean format,
-            SlowLinkSimulator slowLink) {
-        return new SocketRR(this, outSocket, inputStream, inSocket, outputStream,
-                format, null, 0, slowLink, outputWriter);
-    }
-
     private void setValue(int column, String value) {
         int index = listener.connections.indexOf(this);
         if (index >= 0) {

Modified: webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/SlowLinkSimulator.java
URL: http://svn.apache.org/viewvc/webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/SlowLinkSimulator.java?rev=730880&r1=730879&r2=730880&view=diff
==============================================================================
--- webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/SlowLinkSimulator.java (original)
+++ webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/SlowLinkSimulator.java Fri Jan  2 16:43:14 2009
@@ -16,10 +16,15 @@
 
 package org.apache.ws.commons.tcpmon;
 
+import java.io.IOException;
+
+import org.apache.ws.commons.tcpmon.core.filter.Stream;
+import org.apache.ws.commons.tcpmon.core.filter.StreamFilter;
+
 /**
  * class to simulate slow connections by slowing down the system
  */
-public class SlowLinkSimulator {
+public class SlowLinkSimulator implements StreamFilter {
 
 	/**
      * Field delayBytes
@@ -104,6 +109,11 @@
         }
     }
 
+    public void invoke(Stream stream) throws IOException {
+        pump(stream.available());
+        stream.skipAll();
+    }
+
     /**
      * get the current byte count
      *

Modified: webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/AbstractConnection.java
URL: http://svn.apache.org/viewvc/webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/AbstractConnection.java?rev=730880&r1=730879&r2=730880&view=diff
==============================================================================
--- webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/AbstractConnection.java (original)
+++ webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/AbstractConnection.java Fri Jan  2 16:43:14 2009
@@ -18,6 +18,14 @@
 
 import org.apache.ws.commons.tcpmon.SlowLinkSimulator;
 import org.apache.ws.commons.tcpmon.TCPMonBundle;
+import org.apache.ws.commons.tcpmon.core.filter.CharsetDecoderFilter;
+import org.apache.ws.commons.tcpmon.core.filter.HttpHeaderRewriter;
+import org.apache.ws.commons.tcpmon.core.filter.HttpProxyClientHandler;
+import org.apache.ws.commons.tcpmon.core.filter.HttpProxyServerHandler;
+import org.apache.ws.commons.tcpmon.core.filter.Pipeline;
+import org.apache.ws.commons.tcpmon.core.filter.RequestLineExtractor;
+import org.apache.ws.commons.tcpmon.core.filter.Tee;
+import org.apache.ws.commons.tcpmon.core.filter.XmlFormatFilter;
 
 import java.io.IOException;
 import java.io.InputStream;
@@ -26,7 +34,6 @@
 import java.io.StringWriter;
 import java.io.Writer;
 import java.net.Socket;
-import java.net.URL;
 import java.text.DateFormat;
 import java.text.SimpleDateFormat;
 import java.util.Date;
@@ -65,7 +72,7 @@
     /**
      * Field outSocket
      */
-    Socket outSocket = null;
+    volatile Socket outSocket = null;
 
     /**
      * Field clientThread
@@ -80,12 +87,12 @@
     /**
      * Field rr1
      */
-    AbstractSocketRR rr1 = null;
+    SocketRR rr1 = null;
 
     /**
      * Field rr2
      */
-    AbstractSocketRR rr2 = null;
+    SocketRR rr2 = null;
 
     /**
      * Field inputStream
@@ -155,7 +162,6 @@
             int targetPort = config.getTargetPort();
             InputStream tmpIn1 = inputStream;
             OutputStream tmpOut1 = null;
-            InputStream tmpIn2 = null;
             OutputStream tmpOut2 = null;
             if (tmpIn1 == null) {
                 tmpIn1 = inSocket.getInputStream();
@@ -163,157 +169,61 @@
             if (inSocket != null) {
                 tmpOut1 = inSocket.getOutputStream();
             }
-            String bufferedData = null;
-            StringBuffer buf = null;
-            if (config.isProxy() || (HTTPProxyHost != null)) {
-
-                // Check if we're a proxy
-                byte[] b = new byte[1];
-                buf = new StringBuffer();
-                String s;
-                for (; ;) {
-                    int len;
-                    len = tmpIn1.read(b, 0, 1);
-                    if (len == -1) {
-                        break;
-                    }
-                    s = new String(b);
-                    buf.append(s);
-                    if (b[0] != '\n') {
-                        continue;
-                    }
-                    break;
-                }
-                bufferedData = buf.toString();
-                inputWriter.write(bufferedData);
-                if (bufferedData.startsWith("GET ")
-                        || bufferedData.startsWith("POST ")
-                        || bufferedData.startsWith("PUT ")
-                        || bufferedData.startsWith("DELETE ")) {
-                    int start, end;
-                    URL url;
-                    start = bufferedData.indexOf(' ') + 1;
-                    while (bufferedData.charAt(start) == ' ') {
-                        start++;
-                    }
-                    end = bufferedData.indexOf(' ', start);
-                    String urlString = bufferedData.substring(start, end);
-                    if (urlString.charAt(0) == '/') {
-                        urlString = urlString.substring(1);
-                    }
-                    if (config.isProxy()) {
-                        url = new URL(urlString);
-                        targetHost = url.getHost();
-                        targetPort = url.getPort();
-                        if (targetPort == -1) {
-                            targetPort = 80;
-                        }
-                        setOutHost(targetHost);
-                        bufferedData = bufferedData.substring(0, start)
-                                + url.getFile()
-                                + bufferedData.substring(end);
-                    } else {
-                        url = new URL("http://" + targetHost + ":"
-                                + targetPort + "/" + urlString);
-                        setOutHost(targetHost);
-                        bufferedData = bufferedData.substring(0, start)
-                                + url.toExternalForm()
-                                + bufferedData.substring(end);
-                        targetHost = HTTPProxyHost;
-                        targetPort = HTTPProxyPort;
-                    }
-                }
+            
+            Pipeline requestPipeline = new Pipeline();
+            requestPipeline.addFilter(new RequestLineExtractor(50) {
+                protected void done(String requestLine) {
+                    setRequest(requestLine);
+                }
+            });
+            if (config.isProxy()) {
+                requestPipeline.addFilter(new HttpProxyServerHandler() {
+                    protected void handleConnection(String host, int port) throws IOException {
+                        outSocket = new Socket(host, port);
+                    }
+                });
+            } else if (HTTPProxyHost != null) {
+                requestPipeline.addFilter(new HttpProxyClientHandler(targetHost, targetPort));
+                outSocket = new Socket(HTTPProxyHost, HTTPProxyPort);
             } else {
-                //
-                // Change Host: header to point to correct host
-                //
-                byte[] b1 = new byte[1];
-                buf = new StringBuffer();
-                String s1;
-                String lastLine = null;
-                for (; ;) {
-                    int len;
-                    len = tmpIn1.read(b1, 0, 1);
-                    if (len == -1) {
-                        break;
-                    }
-                    s1 = new String(b1);
-                    buf.append(s1);
-                    if (b1[0] != '\n') {
-                        continue;
-                    }
-
-                    // we have a complete line
-                    String line = buf.toString();
-                    buf.setLength(0);
-
-                    // check to see if we have found Host: header
-                    if (line.startsWith("Host: ")) {
-
-                        // we need to update the hostname to target host
-                        String newHost = "Host: " + targetHost + ":"
-                                + targetPort + "\r\n";
-                        bufferedData = bufferedData.concat(newHost);
-                        break;
-                    }
-
-                    // add it to our headers so far
-                    if (bufferedData == null) {
-                        bufferedData = line;
-                    } else {
-                        bufferedData = bufferedData.concat(line);
-                    }
-
-                    // failsafe
-                    if (line.equals("\r\n")) {
-                        break;
-                    }
-                    if ("\n".equals(lastLine) && line.equals("\n")) {
-                        break;
-                    }
-                    lastLine = line;
-                }
-                if (bufferedData != null) {
-                    inputWriter.write(bufferedData);
-                    int idx = (bufferedData.length() < 50)
-                            ? bufferedData.length()
-                            : 50;
-                    s1 = bufferedData.substring(0, idx);
-                    int i = s1.indexOf('\n');
-                    if (i > 0) {
-                        s1 = s1.substring(0, i - 1);
-                    }
-                    s1 = s1 + "                           "
-                            + "                       ";
-                    s1 = s1.substring(0, 51);
-                    setRequest(s1);
-                }
+                requestPipeline.addFilter(new HttpHeaderRewriter("Host", targetHost + ":" + targetPort));
+                outSocket = new Socket(targetHost, targetPort);
             }
-            if (targetPort == -1) {
-                targetPort = 80;
+            requestPipeline.addFilter(config.getSlowLink());
+            Tee requestTee = new Tee();
+            requestPipeline.addFilter(requestTee);
+            if (config.isXmlFormat()) {
+                requestPipeline.addFilter(new XmlFormatFilter(3));
+            }
+            requestPipeline.addFilter(new CharsetDecoderFilter(inputWriter));
+            
+            // If we act as a proxy, we first need to read the start of the request before
+            // the outSocket is available.
+            while (outSocket == null) {
+                requestPipeline.readFrom(tmpIn1);
             }
-            outSocket = new Socket(targetHost, targetPort);
-            tmpIn2 = outSocket.getInputStream();
+            
             tmpOut2 = outSocket.getOutputStream();
-            SlowLinkSimulator slowLink = config.getSlowLink();
-            if (bufferedData != null) {
-                byte[] b = bufferedData.getBytes();
-                tmpOut2.write(b);
-                slowLink.pump(b.length);
+            requestTee.setOutputStream(tmpOut2);
+            
+            Pipeline responsePipeline = new Pipeline();
+            responsePipeline.addFilter(new SlowLinkSimulator(config.getSlowLink()));
+            if (tmpOut1 != null) {
+                responsePipeline.addFilter(new Tee(tmpOut1));
             }
-            boolean format = config.isXmlFormat();
-
+            if (config.isXmlFormat()) {
+                responsePipeline.addFilter(new XmlFormatFilter(3));
+            }
+            responsePipeline.addFilter(new CharsetDecoderFilter(outputWriter));
+            
             // this is the channel to the endpoint
-            rr1 = createInputSocketRR(inSocket, tmpIn1, outSocket, tmpOut2,
-                    format, slowLink);
-
-            // create the response slow link from the inbound slow link
-            SlowLinkSimulator responseLink =
-                    new SlowLinkSimulator(slowLink);
+            rr1 = new SocketRR(this, inSocket, tmpIn1, outSocket, tmpOut2, requestPipeline);
 
             // this is the channel from the endpoint
-            rr2 = createOutputSocketRR(outSocket, tmpIn2, inSocket, tmpOut1,
-                    format, responseLink);
+            rr2 = new SocketRR(this, outSocket, outSocket.getInputStream(), inSocket, tmpOut1, responsePipeline);
+            
+            rr1.start();
+            rr2.start();
             
             while ((rr1 != null) || (rr2 != null)) {
 
@@ -402,12 +312,6 @@
     }
 
     protected abstract void init(String time, String fromHost, String targetHost);
-    protected abstract AbstractSocketRR createInputSocketRR(Socket inSocket,
-            InputStream inputStream, Socket outSocket, OutputStream outputStream, boolean format,
-            SlowLinkSimulator slowLink);
-    protected abstract AbstractSocketRR createOutputSocketRR(Socket outSocket,
-            InputStream inputStream, Socket inSocket, OutputStream outputStream, boolean format,
-            SlowLinkSimulator slowLink);
     protected abstract void setOutHost(String outHost);
     protected abstract void setState(String state);
     protected abstract void setRequest(String request);

Copied: webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/SocketRR.java (from r730497, webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/AbstractSocketRR.java)
URL: http://svn.apache.org/viewvc/webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/SocketRR.java?p2=webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/SocketRR.java&p1=webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/AbstractSocketRR.java&r1=730497&r2=730880&rev=730880&view=diff
==============================================================================
--- webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/AbstractSocketRR.java (original)
+++ webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/SocketRR.java Fri Jan  2 16:43:14 2009
@@ -16,18 +16,17 @@
 
 package org.apache.ws.commons.tcpmon.core;
 
-import org.apache.ws.commons.tcpmon.SlowLinkSimulator;
+import org.apache.ws.commons.tcpmon.core.filter.Pipeline;
 
 import java.io.InputStream;
 import java.io.OutputStream;
-import java.io.Writer;
 import java.net.Socket;
 
 /**
  * this class handles the pumping of data from the incoming socket to the
  * outgoing socket
  */
-public abstract class AbstractSocketRR extends Thread {
+public class SocketRR extends Thread {
     private final AbstractConnection connection;
 
     /**
@@ -51,11 +50,6 @@
     OutputStream out = null;
 
     /**
-     * Field xmlFormat
-     */
-    boolean xmlFormat;
-
-    /**
      * Field done
      */
     volatile boolean done = false;
@@ -65,13 +59,8 @@
      */
     volatile long elapsed = 0;
     
-    /**
-     * Field slowLink
-     */
-    SlowLinkSimulator slowLink;
-
-    private final Writer writer;
-
+    private final Pipeline pipeline;
+    
     /**
      * Constructor SocketRR
      *
@@ -87,18 +76,15 @@
      * @param type
      * @param slowLink
      */
-    public AbstractSocketRR(AbstractConnection connection, Socket inputSocket,
+    public SocketRR(AbstractConnection connection, Socket inputSocket,
                     InputStream inputStream, Socket outputSocket,
-                    OutputStream outputStream, boolean format,
-                    SlowLinkSimulator slowLink, Writer writer) {
+                    OutputStream outputStream, Pipeline pipeline) {
         this.connection = connection;
         inSocket = inputSocket;
         in = inputStream;
         outSocket = outputSocket;
         out = outputStream;
-        xmlFormat = format;
-        this.slowLink = slowLink;
-        this.writer = writer;
+        this.pipeline = pipeline;
     }
 
     /**
@@ -119,152 +105,11 @@
      */
     public void run() {
         try {
-            byte[] buffer = new byte[4096];
-            byte[] tmpbuffer = new byte[8192];
-            int saved = 0;
-            int len;
-            int i1, i2;
-            int i;
-            int reqSaved = 0;
-            int tabWidth = 3;
-            boolean atMargin = true;
-            int thisIndent = -1, nextIndent = -1, previousIndent = -1;
-            if (isSaveFirstLine()) {
-                String tmpStr = getSavedFirstLine();
-                if (!"".equals(tmpStr)) {
-                    reqSaved = tmpStr.length();
-                }
+            // TODO: we should distinguish here between exceptions thrown when reading from the
+            //       input stream and exceptions thrown in the pipeline
+            while (pipeline.readFrom(in) != -1) {
+                // Just loop
             }
-            long start = System.currentTimeMillis();
-            a:
-            for (; ;) {
-                
-                elapsed = System.currentTimeMillis() - start;
-                
-                if (done) {
-                    break;
-                }
-                
-                // try{
-                // len = in.available();
-                // }catch(Exception e){len=0;}
-                len = buffer.length;
-
-                // Used to be 1, but if we block it doesn't matter
-                // however 1 will break with some servers, including apache
-                if (len == 0) {
-                    len = buffer.length;
-                }
-                if (saved + len > buffer.length) {
-                    len = buffer.length - saved;
-                }
-                int len1 = 0;
-                while (len1 == 0) {
-                    try {
-                        len1 = in.read(buffer, saved, len);
-                    } catch (Exception ex) {
-                        if (done && (saved == 0)) {
-                            break a;
-                        }
-                        len1 = -1;
-                        break;
-                    }
-                }
-                len = len1;
-                if ((len == -1) && (saved == 0)) {
-                    break;
-                }
-                if (len == -1) {
-                    done = true;
-                }
-
-                // No matter how we may (or may not) format it, send it
-                // on unformatted - we don't want to mess with how its
-                // sent to the other side, just how its displayed
-                if ((out != null) && (len > 0)) {
-                    slowLink.pump(len);
-                    out.write(buffer, saved, len);
-                }
-                
-                if (isSaveFirstLine() && (reqSaved < 50)) {
-                    String old = getSavedFirstLine();
-                    old = old + new String(buffer, saved, len);
-                    if (old.length() > 50) {
-                        old = old.substring(0, 50);
-                    }
-                    reqSaved = old.length();
-                    if ((i = old.indexOf('\n')) > 0) {
-                        old = old.substring(0, i - 1);
-                        reqSaved = 50;
-                    }
-                    setSavedFirstLine(old);
-                }
-                
-                
-                if (xmlFormat) {
-
-                    // Do XML Formatting
-                    boolean inXML = false;
-                    int bufferLen = saved;
-                    if (len != -1) {
-                        bufferLen += len;
-                    }
-                    i1 = 0;
-                    i2 = 0;
-                    saved = 0;
-                    for (; i1 < bufferLen; i1++) {
-
-                        // Except when we're at EOF, saved last char
-                        if ((len != -1) && (i1 + 1 == bufferLen)) {
-                            saved = 1;
-                            break;
-                        }
-                        thisIndent = -1;
-                        if ((buffer[i1] == '<')
-                                && (buffer[i1 + 1] != '/')) {
-                            previousIndent = nextIndent++;
-                            thisIndent = nextIndent;
-                            inXML = true;
-                        }
-                        if ((buffer[i1] == '<')
-                                && (buffer[i1 + 1] == '/')) {
-                            if (previousIndent > nextIndent) {
-                                thisIndent = nextIndent;
-                            }
-                            previousIndent = nextIndent--;
-                            inXML = true;
-                        }
-                        if ((buffer[i1] == '/')
-                                && (buffer[i1 + 1] == '>')) {
-                            previousIndent = nextIndent--;
-                            inXML = true;
-                        }
-                        if (thisIndent != -1) {
-                            if (thisIndent > 0) {
-                                tmpbuffer[i2++] = (byte) '\n';
-                            }
-                            for (i = tabWidth * thisIndent; i > 0; i--) {
-                                tmpbuffer[i2++] = (byte) ' ';
-                            }
-                        }
-                        atMargin = ((buffer[i1] == '\n')
-                                || (buffer[i1] == '\r'));
-                        if (!inXML || !atMargin) {
-                            tmpbuffer[i2++] = buffer[i1];
-                        }
-                    }
-                    
-                    writer.write(new String(tmpbuffer, 0, i2));
-
-                    // Shift saved bytes to the beginning
-                    for (i = 0; i < saved; i++) {
-                        buffer[i] = buffer[bufferLen - saved + i];
-                    }
-                } else {
-                    writer.write(new String(buffer, 0, len));
-                }
-            }
-
         } catch (Exception e) {
             e.printStackTrace();
         } finally {
@@ -322,8 +167,4 @@
             e.printStackTrace();
         }
     }
-    
-    protected abstract boolean isSaveFirstLine();
-    protected abstract String getSavedFirstLine();
-    protected abstract void setSavedFirstLine(String value);
 }

Added: webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/filter/CharsetDecoderFilter.java
URL: http://svn.apache.org/viewvc/webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/filter/CharsetDecoderFilter.java?rev=730880&view=auto
==============================================================================
--- webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/filter/CharsetDecoderFilter.java (added)
+++ webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/filter/CharsetDecoderFilter.java Fri Jan  2 16:43:14 2009
@@ -0,0 +1,39 @@
+/*
+ * Copyright 2004,2005 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ws.commons.tcpmon.core.filter;
+
+import java.io.IOException;
+import java.io.Writer;
+
+/**
+ * Filter that decodes the stream to character data and sends it to a {@link Writer}.
+ */
+public class CharsetDecoderFilter implements StreamFilter {
+    private final Writer writer;
+
+    public CharsetDecoderFilter(Writer writer) {
+        this.writer = writer;
+    }
+
+    public void invoke(Stream stream) throws IOException {
+        StringBuffer buffer = new StringBuffer(stream.available());
+        while (stream.available() > 0) {
+            buffer.append((char)stream.skip());
+        }
+        writer.write(buffer.toString());
+    }
+}

Added: webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/filter/HttpHeaderRewriter.java
URL: http://svn.apache.org/viewvc/webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/filter/HttpHeaderRewriter.java?rev=730880&view=auto
==============================================================================
--- webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/filter/HttpHeaderRewriter.java (added)
+++ webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/filter/HttpHeaderRewriter.java Fri Jan  2 16:43:14 2009
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2004,2005 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ws.commons.tcpmon.core.filter;
+
+/**
+ * Filter that replaces the value of a given HTTP header.
+ */
+public class HttpHeaderRewriter extends HttpRequestFilter {
+    private final String headerName;
+    private final String newValue;
+    
+    public HttpHeaderRewriter(String headerName, String newValue) {
+        this.headerName = headerName;
+        this.newValue = newValue;
+    }
+
+    protected String processHeader(String name, String value) {
+        return headerName.equals(name) ? newValue : value;
+    }
+}

Added: webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/filter/HttpProxyClientHandler.java
URL: http://svn.apache.org/viewvc/webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/filter/HttpProxyClientHandler.java?rev=730880&view=auto
==============================================================================
--- webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/filter/HttpProxyClientHandler.java (added)
+++ webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/filter/HttpProxyClientHandler.java Fri Jan  2 16:43:14 2009
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2004,2005 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ws.commons.tcpmon.core.filter;
+
+import java.io.IOException;
+
+/**
+ * Filter that rewrites a plain HTTP request to an HTTP proxy request.
+ */
+public class HttpProxyClientHandler extends HttpRequestFilter {
+    private final String targetHost;
+    private final int targetPort;
+    
+    public HttpProxyClientHandler(String targetHost, int targetPort) {
+        this.targetHost = targetHost;
+        this.targetPort = targetPort;
+    }
+    
+    protected String processRequest(String request) throws IOException {
+        String[] parts = request.split(" ");
+        return parts[0] + " http://" + targetHost + ":" + targetPort + parts[1] + " " + parts[2];
+    }
+}

Added: webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/filter/HttpProxyServerHandler.java
URL: http://svn.apache.org/viewvc/webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/filter/HttpProxyServerHandler.java?rev=730880&view=auto
==============================================================================
--- webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/filter/HttpProxyServerHandler.java (added)
+++ webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/filter/HttpProxyServerHandler.java Fri Jan  2 16:43:14 2009
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2004,2005 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ws.commons.tcpmon.core.filter;
+
+import java.io.IOException;
+import java.net.URL;
+
+/**
+ * Filter that rewrites an HTTP proxy request to a plain HTTP request.
+ */
+public abstract class HttpProxyServerHandler extends HttpRequestFilter {
+    protected String processRequest(String request) throws IOException {
+        String[] parts = request.split(" ");
+        URL url = new URL(parts[1]);
+        int port = url.getPort();
+        handleConnection(url.getHost(), port == -1 ? 80 : port);
+        return parts[0] + " " + url.getFile() + " " + parts[2];
+    }
+    
+    protected abstract void handleConnection(String host, int port) throws IOException;
+}

Added: webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/filter/HttpRequestFilter.java
URL: http://svn.apache.org/viewvc/webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/filter/HttpRequestFilter.java?rev=730880&view=auto
==============================================================================
--- webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/filter/HttpRequestFilter.java (added)
+++ webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/filter/HttpRequestFilter.java Fri Jan  2 16:43:14 2009
@@ -0,0 +1,123 @@
+/*
+ * Copyright 2004,2005 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ws.commons.tcpmon.core.filter;
+
+import java.io.IOException;
+
+/**
+ * Abstract filter that allows HTTP request rewriting.
+ */
+public abstract class HttpRequestFilter implements StreamFilter {
+    private static final int STATE_REQUEST = 0;
+    private static final int STATE_HEADER = 1;
+    private static final int STATE_DONE = 2;
+    
+    private int state = STATE_REQUEST;
+    
+    public void invoke(Stream stream) throws IOException {
+        while (stream.available() > 0) {
+            switch (state) {
+                case STATE_REQUEST: {
+                    int eol = searchEndOfLine(stream);
+                    if (eol == -1) {
+                        // EOL not yet available; maybe next time...
+                        return;
+                    } else {
+                        String orgRequest = getString(stream, 0, eol);
+                        String request = processRequest(orgRequest);
+                        if (request == orgRequest) {
+                            stream.skip(eol+2);
+                        } else {
+                            stream.discard(eol);
+                            insert(stream, request);
+                            stream.skip(2);
+                        }
+                        state = STATE_HEADER;
+                    }
+                    break;
+                }
+                case STATE_HEADER: {
+                    int eol = searchEndOfLine(stream);
+                    if (eol == -1) {
+                        return;
+                    }
+                    if (eol == 0) {
+                        state = STATE_DONE;
+                        break;
+                    }
+                    int colon = -1;
+                    for (int i=0; i<eol; i++) {
+                        if (stream.get(i) == ':') {
+                            colon = i;
+                            break;
+                        }
+                    }
+                    String name = getString(stream, 0, colon);
+                    int valueStart = colon+1;
+                    while (stream.get(valueStart) == ' ') {
+                        valueStart++;
+                    }
+                    String orgValue = getString(stream, valueStart, eol);
+                    String value = processHeader(name, orgValue);
+                    if (value == null) {
+                        stream.discard(eol+2);
+                    } else if (value == orgValue) {
+                        stream.skip(eol+2);
+                    } else {
+                        stream.skip(valueStart);
+                        stream.discard(eol-valueStart);
+                        insert(stream, value);
+                        stream.skip(2);
+                    }
+                    break;
+                }
+                default:
+                    stream.skipAll();
+            }
+        }
+    }
+    
+    private static int searchEndOfLine(Stream stream) {
+        for (int i=0; i<stream.available()-1; i++) {
+            if (stream.get(i) == '\r' && stream.get(i+1) == '\n') {
+                return i;
+            }
+        }
+        return -1;
+    }
+    
+    private static String getString(Stream stream, int begin, int end) {
+        StringBuffer buffer = new StringBuffer(end-begin);
+        for (int i=begin; i<end; i++) {
+            buffer.append((char)stream.get(i));
+        }
+        return buffer.toString();
+    }
+    
+    private static void insert(Stream stream, String s) throws IOException {
+        byte[] b = s.getBytes("ascii");
+        stream.insert(b, 0, b.length);
+    }
+    
+    protected String processRequest(String request) throws IOException {
+        return request;
+    }
+    
+    protected String processHeader(String name, String value) throws IOException {
+        return value;
+    }
+}

Added: webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/filter/Pipeline.java
URL: http://svn.apache.org/viewvc/webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/filter/Pipeline.java?rev=730880&view=auto
==============================================================================
--- webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/filter/Pipeline.java (added)
+++ webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/filter/Pipeline.java Fri Jan  2 16:43:14 2009
@@ -0,0 +1,324 @@
+/*
+ * Copyright 2004,2005 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ws.commons.tcpmon.core.filter;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.LinkedList;
+
+/**
+ * Class that sends a byte stream through a sequence of filters.
+ * Each filter receives as input the (potentially modified) output
+ * of the previous filter in the chain.
+ * <p>
+ * The pipeline works in push
+ * mode and is non blocking. This distinguishes it from a chain
+ * of (Filter)InputStreams. Also, a filter is not required to process
+ * all received data on each invocation (in which case it gets a chance to process
+ * the unprocessed data during the next invocation). This is useful when
+ * the filter requires more lookahead than is available. This
+ * distinguishes a pipeline from a chain of (Filter)OutputStreams (where each
+ * OutputStream is responsible itself for buffering unprocessed data).
+ * <p>
+ * Finally, the implementation optimizes buffer allocation and minimizes
+ * array copy operations.
+ */
+public class Pipeline {
+    private class StreamImpl implements Stream {
+        private final StreamFilter filter;
+        private StreamImpl next;
+        private byte[] inBuffer;
+        private int inOffset;
+        private int inLength;
+        private boolean lastBuffer;
+        private boolean preserve;
+        private int skipOffset;
+        private int skipLength;
+        private byte[] outBuffer;
+        private int outLength;
+        private boolean eosSignalled; // Set to true if the next filter has been invoked with eos == true
+        
+        public StreamImpl(StreamFilter filter) {
+            this.filter = filter;
+        }
+        
+        public void setNext(StreamImpl next) {
+            this.next = next;
+        }
+
+        public void invoke(byte[] buffer, int offset, int length, boolean eos, boolean preserve) throws IOException {
+            while (length > 0) {
+                if (inLength > 0) {
+                    int c = fillBuffer(buffer, offset, length);
+                    if (c == 0) {
+                        throw new IOException("Pipeline buffer overflow");
+                    }
+                    offset += c;
+                    length -= c;
+                } else {
+                    setBuffer(buffer, offset, length, preserve);
+                    offset += length;
+                    length = 0;
+                }
+                this.lastBuffer = eos && length == 0;
+                filter.invoke(this);
+            }
+            flushSkip(eos);
+            flushOutput(eos);
+            if (inLength > 0) {
+                if (eos) {
+                    throw new IllegalStateException("The filter didn't consume all its input");
+                }
+                if (this.preserve) {
+                    compactBuffer();
+                }
+            } else if (inBuffer != null) {
+                if (!this.preserve) {
+                    releaseBuffer(inBuffer);
+                }
+                inBuffer = null;
+                inOffset = 0;
+                inLength = 0;
+            }
+        }
+        
+        private void setBuffer(byte[] buffer, int offset, int length, boolean preserve) throws IOException {
+            flushSkip(false);
+            if (inBuffer != null) {
+                if (inLength > 0) {
+                    throw new IllegalStateException();
+                }
+                if (!this.preserve) {
+                    releaseBuffer(inBuffer);
+                }
+            }
+            inBuffer = buffer;
+            inOffset = offset;
+            inLength = length;
+            this.preserve = preserve;
+        }
+        
+        private int fillBuffer(byte[] buffer, int offset, int length) throws IOException {
+            if (!preserve && length <= inBuffer.length-inOffset-inLength) {
+                System.arraycopy(buffer, offset, inBuffer, inOffset+inLength, length);
+                inLength += length;
+                return length;
+            } else {
+                compactBuffer();
+                int c = Math.min(length, inBuffer.length-inOffset-inLength);
+                System.arraycopy(buffer, offset, inBuffer, inLength, c);
+                inLength += c;
+                return c;
+            }
+        }
+        
+        private void compactBuffer() throws IOException {
+            flushSkip(false);
+            byte[] src = inBuffer;
+            if (preserve) {
+                inBuffer = allocateBuffer();
+                preserve = false;
+            }
+            System.arraycopy(src, inOffset, inBuffer, 0, inLength);
+            inOffset = 0;
+        }
+        
+        private void invokeNext(byte[] buffer, int offset, int length, boolean eos, boolean preserve) throws IOException {
+            if (eos && eosSignalled) {
+                throw new IllegalStateException();
+            }
+            if (next != null) {
+                next.invoke(buffer, offset, length, eos, preserve);
+            } else if (!preserve) {
+                releaseBuffer(buffer);
+            }
+            eosSignalled = eos;
+        }
+
+        private void flushSkip(boolean eos) throws IOException {
+            if (skipLength > 0) {
+                if (outLength > 0) {
+                    throw new IllegalStateException();
+                }
+                if (inLength == 0 && !preserve) {
+                    invokeNext(inBuffer, skipOffset, skipLength, eos, false);
+                    inBuffer = null;
+                    inOffset = 0;
+                } else {
+                    invokeNext(inBuffer, skipOffset, skipLength, eos, true);
+                }
+                skipLength = 0;
+            }
+        }
+        
+        private void flushOutput(boolean eos) throws IOException {
+            if (outLength > 0) {
+                if (skipLength > 0) {
+                    throw new IllegalStateException();
+                }
+                invokeNext(outBuffer, 0, outLength, eos, false);
+                outBuffer = null;
+                outLength = 0;
+            }
+        }
+        
+        public int available() {
+            return inLength;
+        }
+
+        public boolean isEndOfStream() {
+            return lastBuffer && inLength == 0;
+        }
+
+        public int get() {
+            return get(0);
+        }
+
+        public int get(int lookahead) {
+            if (lookahead < 0) {
+                throw new ArrayIndexOutOfBoundsException();
+            } else if (lookahead >= inLength) {
+                if (lastBuffer) {
+                    return -1;
+                } else {
+                    throw new ArrayIndexOutOfBoundsException();
+                }
+            } else {
+                return inBuffer[inOffset+lookahead];
+            }
+        }
+        
+        public int read(byte[] buffer, int offset, int length) {
+            int c = Math.min(length, inLength);
+            System.arraycopy(inBuffer, inOffset, buffer, offset, c);
+            return c;
+        }
+
+        public void readAll(OutputStream out) throws IOException {
+            out.write(inBuffer, inOffset, inLength);
+        }
+
+        public byte discard() {
+            byte b = inBuffer[inOffset];
+            discard(1);
+            return b;
+        }
+
+        public void discard(int len) {
+            if (len < 0 || len > inLength) {
+                throw new ArrayIndexOutOfBoundsException();
+            }
+            inOffset += len;
+            inLength -= len;
+        }
+
+        public void insert(byte b) throws IOException {
+            flushSkip(false);
+            if (outLength > 0 && outLength == outBuffer.length) {
+                flushOutput(false);
+            }
+            if (outBuffer == null) {
+                outBuffer = allocateBuffer();
+            }
+            outBuffer[outLength++] = b;
+        }
+        
+        public void insert(byte[] buffer, int offset, int length) throws IOException {
+            flushSkip(false);
+            flushOutput(false);
+            invokeNext(buffer, offset, length, false, true);
+        }
+
+        public byte skip() throws IOException {
+            byte b = inBuffer[inOffset];
+            skip(1);
+            return b;
+        }
+
+        public void skip(int len) throws IOException {
+            if (len < 0 || len > inLength) {
+                throw new ArrayIndexOutOfBoundsException();
+            }
+            flushOutput(false);
+            if (skipLength == 0) {
+                skipOffset = inOffset;
+                skipLength = len;
+            } else if (skipOffset+skipLength != inOffset) {
+                // This means that some bytes have been discarded after
+                // the last skip operation.
+                flushSkip(false);
+                skipOffset = inOffset;
+                skipLength = len;
+            } else {
+                skipLength += len;
+            }
+            inOffset += len;
+            inLength -= len;
+        }
+
+        public void skipAll() throws IOException {
+            skip(inLength);
+        }
+    }
+    
+    private final int bufferSize;
+    private final LinkedList buffers = new LinkedList();
+    private StreamImpl first;
+    private StreamImpl last;
+    
+    public Pipeline(int bufferSize) {
+        this.bufferSize = bufferSize;
+    }
+    
+    public Pipeline() {
+        this(4096);
+    }
+    
+    byte[] allocateBuffer() {
+        return buffers.isEmpty() ? new byte[bufferSize] : (byte[])buffers.removeFirst();
+    }
+    
+    void releaseBuffer(byte[] buffer) {
+        if (buffer == null) {
+            throw new IllegalArgumentException();
+        }
+        buffers.add(buffer);
+    }
+    
+    public void addFilter(StreamFilter filter) {
+        StreamImpl node = new StreamImpl(filter);
+        if (first == null) {
+            first = node;
+        }
+        if (last != null) {
+            last.setNext(node);
+        }
+        last = node;
+    }
+    
+    public int readFrom(InputStream in) throws IOException {
+        byte[] buffer = allocateBuffer();
+        int read = in.read(buffer);
+        if (read == -1) {
+            first.invoke(buffer, 0, 0, true, false);
+        } else {
+            first.invoke(buffer, 0, read, false, false);
+        }
+        return read;
+    }
+}

Added: webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/filter/RequestLineExtractor.java
URL: http://svn.apache.org/viewvc/webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/filter/RequestLineExtractor.java?rev=730880&view=auto
==============================================================================
--- webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/filter/RequestLineExtractor.java (added)
+++ webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/filter/RequestLineExtractor.java Fri Jan  2 16:43:14 2009
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2004,2005 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ws.commons.tcpmon.core.filter;
+
+import java.io.IOException;
+
+/**
+ * Filter that extracts the first line of a request, up to a given
+ * maximum length.
+ */
+public abstract class RequestLineExtractor implements StreamFilter {
+    private final byte[] buffer;
+    private int length;
+    private boolean done;
+    
+    public RequestLineExtractor(int maxLength) {
+        this.buffer = new byte[maxLength];
+    }
+    
+    public void invoke(Stream stream) throws IOException {
+        if (done) {
+            stream.skipAll();
+        } else {
+            while (!done && stream.available() > 0) {
+                byte b = stream.skip();
+                if (b == '\n') {
+                    done = true;
+                } else {
+                    buffer[length++] = b;
+                    if (length == buffer.length) {
+                        done = true;
+                    }
+                }
+            }
+            if (done) {
+                stream.skipAll();
+                done(new String(buffer, 0, length, "ascii"));
+            }
+        }
+    }
+    
+    protected abstract void done(String requestLine);
+}

Added: webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/filter/Stream.java
URL: http://svn.apache.org/viewvc/webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/filter/Stream.java?rev=730880&view=auto
==============================================================================
--- webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/filter/Stream.java (added)
+++ webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/filter/Stream.java Fri Jan  2 16:43:14 2009
@@ -0,0 +1,159 @@
+/*
+ * Copyright 2004,2005 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ws.commons.tcpmon.core.filter;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * Interface used by a filter to access the stream flowing through a pipeline.
+ */
+public interface Stream {
+    /**
+     * Get the number of bytes currently available in the stream.
+     * 
+     * @return the number of available bytes
+     */
+    int available();
+    
+    /**
+     * Check if the end of the stream has been reached.
+     * 
+     * @return true if the end of the stream has been reached
+     */
+    boolean isEndOfStream();
+    
+    /**
+     * Get the byte at the current position in the stream.
+     * Calling this method will not modify the current position in
+     * the stream.
+     * 
+     * @return the byte at the current position in the stream
+     *         or -1 if the end of the stream has been reached
+     * @throws ArrayIndexOutOfBoundsException if the byte at the
+     *         current position is not yet available. This is the
+     *         case if {@link #available()} returns 0 and
+     *         {@link #isEndOfStream()} is returns false.
+     */
+    int get();
+    
+    /**
+     * Get the byte at a given distance from the current position in
+     * the stream.
+     * 
+     * @param lookahead the distance from the current position
+     * @return the byte at the given position, or -1 if the position
+     *         is past the end of the stream
+     * @throws ArrayIndexOutOfBoundsException if the byte at the
+     *         given position is not yet available
+     */
+    int get(int lookahead);
+    
+    /**
+     * Read data from the stream into a byte array, starting from the
+     * current position in the stream.
+     * Calling this method will not modify the current position in
+     * the stream.
+     * 
+     * @param buffer the buffer into which the data is read
+     * @param offset the start offset in array <code>buffer</code>
+     *               at which the data is written
+     * @param length the maximum number of bytes to read
+     * @return the total number of bytes read into the buffer
+     */
+    int read(byte[] buffer, int offset, int length);
+    
+    /**
+     * Read all currently available data from the stream and
+     * copy it to an {@link OutputStream} object.
+     * Calling this method will not modify the current position in
+     * the stream.
+     * 
+     * @param out the output stream to write the data to
+     * @throws IOException if an I/O error occurred when writing
+     *                     to the output stream
+     */
+    void readAll(OutputStream out) throws IOException;
+    
+    /**
+     * Discard the byte at the current position in the stream.
+     * This method increments the current position without copying
+     * the byte to the next filter in the pipeline.
+     * 
+     * @return the byte at the current position in the stream
+     */
+    byte discard();
+    
+    /**
+     * Discard a given number of bytes from the stream, starting
+     * at the current position.
+     * 
+     * @param len the number of bytes to discard
+     */
+    void discard(int len);
+    
+    /**
+     * Insert a byte at the current position in the stream.
+     * The logical position after invocation of this method will
+     * be just after the inserted byte, i.e. the inserted byte can't
+     * be read, discarded or skipped.
+     * 
+     * @param b the byte to insert
+     * @throws IOException
+     */
+    void insert(byte b) throws IOException;
+    
+    /**
+     * Insert a byte sequence at the current position in the stream.
+     * The logical position after invocation of this method will
+     * be just after the last inserted byte.
+     * 
+     * @param buffer a byte array containing the sequence to be inserted in the stream
+     * @param offset the start offset in the byte array
+     * @param length the number of bytes to insert
+     * @throws IOException
+     */
+    void insert(byte[] buffer, int offset, int length) throws IOException;
+    
+    /**
+     * Skip the byte at the current position in the stream.
+     * This will increment the current position and copy the byte
+     * to the next filter.
+     * 
+     * @return the byte at the current position in the stream 
+     * @throws IOException
+     */
+    byte skip() throws IOException;
+    
+    /**
+     * Skip a given number of bytes in the stream, starting
+     * from the current position.
+     * 
+     * @param len the number of bytes to skip
+     * @throws IOException
+     */
+    void skip(int len) throws IOException;
+    
+    /**
+     * Skip all the bytes currently available in the stream.
+     * The instruction <code>s.skipAll()</code> is equivalent to
+     * <code>s.skip(s.available())</code>.
+     * 
+     * @throws IOException
+     */
+    void skipAll() throws IOException;
+}

Added: webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/filter/StreamFilter.java
URL: http://svn.apache.org/viewvc/webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/filter/StreamFilter.java?rev=730880&view=auto
==============================================================================
--- webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/filter/StreamFilter.java (added)
+++ webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/filter/StreamFilter.java Fri Jan  2 16:43:14 2009
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2004,2005 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ws.commons.tcpmon.core.filter;
+
+import java.io.IOException;
+
+/**
+ * A filter acting on a stream.
+ */
+public interface StreamFilter {
+    /**
+     * Invoke the filter. This method is called by {@link Pipeline}
+     * when data is available for processing. The implementation can
+     * modify the stream by discarding bytes from the stream and
+     * inserting new data. If it doesn't wish to modify the stream,
+     * it should skip the relevant parts, so that it will be processed
+     * by the next filter in the pipeline.
+     * <p>
+     * An implementation is not required to process (skip or discard)
+     * all the data available on each invocation. If after the invocation
+     * of this method {@link Stream#available()} is non zero, the remaining
+     * (unprocessed) data will be available again during the next invocation
+     * of the filter. 
+     * 
+     * @param stream the stream to process
+     * @throws IOException
+     */
+    void invoke(Stream stream) throws IOException;
+}

Added: webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/filter/Tee.java
URL: http://svn.apache.org/viewvc/webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/filter/Tee.java?rev=730880&view=auto
==============================================================================
--- webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/filter/Tee.java (added)
+++ webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/filter/Tee.java Fri Jan  2 16:43:14 2009
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2004,2005 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ws.commons.tcpmon.core.filter;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * A filter that copies all the data in the stream to a given {@link OutputStream}.
+ * The data is written to the output stream before being processed by filters further
+ * down the pipeline. The target output stream can be specified using the
+ * {@link #Tee(OutputStream)} constructor or later using the
+ * {@link #setOutputStream(OutputStream)} method. If data is submitted to the filter before the
+ * output stream has been set, it will be held back until
+ * {@link #setOutputStream(OutputStream)} has been called and the filter is
+ * invoked again.
+ */
+public class Tee implements StreamFilter {
+    private OutputStream out;
+    
+    public Tee() {
+    }
+    
+    public Tee(OutputStream out) {
+        setOutputStream(out);
+    }
+
+    public void setOutputStream(OutputStream out) {
+        if (this.out != null) {
+            throw new IllegalStateException("The output stream has already been set");
+        }
+        this.out = out;
+    }
+
+    public void invoke(Stream stream) throws IOException {
+        if (out != null) {
+            stream.readAll(out);
+            stream.skipAll();
+        }
+    }
+}

Added: webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/filter/XmlFormatFilter.java
URL: http://svn.apache.org/viewvc/webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/filter/XmlFormatFilter.java?rev=730880&view=auto
==============================================================================
--- webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/filter/XmlFormatFilter.java (added)
+++ webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/filter/XmlFormatFilter.java Fri Jan  2 16:43:14 2009
@@ -0,0 +1,70 @@
+/*
+ * Copyright 2004,2005 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ws.commons.tcpmon.core.filter;
+
+import java.io.IOException;
+
+/**
+ * Filter that reformats XML data so that it is properly indented.
+ */
+public class XmlFormatFilter implements StreamFilter {
+    private final int tabWidth;
+    private int nextIndent = -1;
+    private int previousIndent = -1;
+    
+    public XmlFormatFilter(int tabWidth) {
+        this.tabWidth = tabWidth;
+    }
+
+    public void invoke(Stream stream) throws IOException {
+        try {
+            boolean inXML = false;
+            while (stream.available() > 0) {
+                int thisIndent = -1;
+                if (stream.get(0) == '<' && stream.get(1) != '/') {
+                    previousIndent = nextIndent++;
+                    thisIndent = nextIndent;
+                    inXML = true;
+                } else if (stream.get(0) == '<' && stream.get(1) == '/') {
+                    if (previousIndent > nextIndent) {
+                        thisIndent = nextIndent;
+                    }
+                    previousIndent = nextIndent--;
+                    inXML = true;
+                } else if (stream.get(0) == '/' && stream.get(1) == '>') {
+                    previousIndent = nextIndent--;
+                    inXML = true;
+                }
+                if (thisIndent != -1) {
+                    if (thisIndent > 0) {
+                        stream.insert((byte) '\n');
+                    }
+                    for (int i = tabWidth * thisIndent; i > 0; i--) {
+                        stream.insert((byte) ' ');
+                    }
+                }
+                if (!inXML || (stream.get(0) != '\n' && stream.get(0) != '\r')) {
+                    stream.skip();
+                } else {
+                    stream.discard();
+                }
+            }
+        } catch (ArrayIndexOutOfBoundsException ex) {
+            return;
+        }
+    }
+}

Modified: webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/eclipse/ui/Connection.java
URL: http://svn.apache.org/viewvc/webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/eclipse/ui/Connection.java?rev=730880&r1=730879&r2=730880&view=diff
==============================================================================
--- webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/eclipse/ui/Connection.java (original)
+++ webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/eclipse/ui/Connection.java Fri Jan  2 16:43:14 2009
@@ -17,16 +17,13 @@
 package org.apache.ws.commons.tcpmon.eclipse.ui;
 
 import java.io.InputStream;
-import java.io.OutputStream;
 import java.net.Socket;
 
 import org.eclipse.swt.SWT;
 import org.eclipse.swt.widgets.TableItem;
 import org.eclipse.swt.widgets.Text;
-import org.apache.ws.commons.tcpmon.SlowLinkSimulator;
 import org.apache.ws.commons.tcpmon.TCPMonBundle;
 import org.apache.ws.commons.tcpmon.core.AbstractConnection;
-import org.apache.ws.commons.tcpmon.core.AbstractSocketRR;
 
 /**
  * a connection listens to a single current connection
@@ -109,21 +106,6 @@
         outputWriter = new TextWidgetWriter(outputText);
     }
 
-    protected AbstractSocketRR createInputSocketRR(Socket inSocket, InputStream inputStream,
-            Socket outSocket, OutputStream outputStream, boolean format,
-            SlowLinkSimulator slowLink) {
-        return new SocketRR(this, inSocket, inputStream, outSocket, outputStream,
-                format, listener.connectionTable,
-                listener.connections.indexOf(this) + 1, slowLink, inputWriter);
-    }
-
-    protected AbstractSocketRR createOutputSocketRR(Socket outSocket, InputStream inputStream,
-            Socket inSocket, OutputStream outputStream, boolean format,
-            SlowLinkSimulator slowLink) {
-        return new SocketRR(this, outSocket, inputStream, inSocket, outputStream,
-                format, null, 0, slowLink, outputWriter);
-    }
-
     private void setValue(final int column, final String value) {
         final int index = listener.connections.indexOf(this);
         if (index >= 0) {