You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commons-dev@ws.apache.org by ve...@apache.org on 2009/01/03 01:43:15 UTC
svn commit: r730880 - in
/webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon:
./ core/ core/filter/ eclipse/ui/
Author: veithen
Date: Fri Jan 2 16:43:14 2009
New Revision: 730880
URL: http://svn.apache.org/viewvc?rev=730880&view=rev
Log:
TCPMon: Refactored the code responsible for request/response processing ("Host" header rewriting, plain HTTP <-> proxy HTTP switching, XML formatting, etc.) to make it more readable, manageable and reusable. The new code is organized around a stream/pipeline/filter metaphor. The complexity of stream processing (buffering, rewriting, lookahead, etc.) is hidden in the new Pipeline class and a set of distinct filters replace the processing code contained in AbstractConnection and AbstractSocketRR.
The modifications should also solve WSCOMMONS-80, WSCOMMONS-380 and WSCOMMONS-425.
Added:
webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/SocketRR.java
- copied, changed from r730497, webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/AbstractSocketRR.java
webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/filter/
webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/filter/CharsetDecoderFilter.java
webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/filter/HttpHeaderRewriter.java
webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/filter/HttpProxyClientHandler.java
webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/filter/HttpProxyServerHandler.java
webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/filter/HttpRequestFilter.java
webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/filter/Pipeline.java
webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/filter/RequestLineExtractor.java
webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/filter/Stream.java
webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/filter/StreamFilter.java
webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/filter/Tee.java
webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/filter/XmlFormatFilter.java
Removed:
webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/SocketRR.java
webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/AbstractSocketRR.java
webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/eclipse/ui/SocketRR.java
Modified:
webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/Connection.java
webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/SlowLinkSimulator.java
webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/AbstractConnection.java
webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/eclipse/ui/Connection.java
Modified: webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/Connection.java
URL: http://svn.apache.org/viewvc/webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/Connection.java?rev=730880&r1=730879&r2=730880&view=diff
==============================================================================
--- webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/Connection.java (original)
+++ webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/Connection.java Fri Jan 2 16:43:14 2009
@@ -21,10 +21,8 @@
import javax.swing.ListSelectionModel;
import org.apache.ws.commons.tcpmon.core.AbstractConnection;
-import org.apache.ws.commons.tcpmon.core.AbstractSocketRR;
import java.io.InputStream;
-import java.io.OutputStream;
import java.net.Socket;
/**
@@ -126,21 +124,6 @@
outputWriter = new JTextAreaWriter(outputText);
}
- protected AbstractSocketRR createInputSocketRR(Socket inSocket, InputStream inputStream,
- Socket outSocket, OutputStream outputStream, boolean format,
- SlowLinkSimulator slowLink) {
- return new SocketRR(this, inSocket, inputStream, outSocket, outputStream,
- format, listener.tableModel,
- listener.connections.indexOf(this) + 1, slowLink, inputWriter);
- }
-
- protected AbstractSocketRR createOutputSocketRR(Socket outSocket, InputStream inputStream,
- Socket inSocket, OutputStream outputStream, boolean format,
- SlowLinkSimulator slowLink) {
- return new SocketRR(this, outSocket, inputStream, inSocket, outputStream,
- format, null, 0, slowLink, outputWriter);
- }
-
private void setValue(int column, String value) {
int index = listener.connections.indexOf(this);
if (index >= 0) {
Modified: webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/SlowLinkSimulator.java
URL: http://svn.apache.org/viewvc/webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/SlowLinkSimulator.java?rev=730880&r1=730879&r2=730880&view=diff
==============================================================================
--- webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/SlowLinkSimulator.java (original)
+++ webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/SlowLinkSimulator.java Fri Jan 2 16:43:14 2009
@@ -16,10 +16,15 @@
package org.apache.ws.commons.tcpmon;
+import java.io.IOException;
+
+import org.apache.ws.commons.tcpmon.core.filter.Stream;
+import org.apache.ws.commons.tcpmon.core.filter.StreamFilter;
+
/**
* class to simulate slow connections by slowing down the system
*/
-public class SlowLinkSimulator {
+public class SlowLinkSimulator implements StreamFilter {
/**
* Field delayBytes
@@ -104,6 +109,11 @@
}
}
+ public void invoke(Stream stream) throws IOException {
+ pump(stream.available());
+ stream.skipAll();
+ }
+
/**
* get the current byte count
*
Modified: webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/AbstractConnection.java
URL: http://svn.apache.org/viewvc/webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/AbstractConnection.java?rev=730880&r1=730879&r2=730880&view=diff
==============================================================================
--- webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/AbstractConnection.java (original)
+++ webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/AbstractConnection.java Fri Jan 2 16:43:14 2009
@@ -18,6 +18,14 @@
import org.apache.ws.commons.tcpmon.SlowLinkSimulator;
import org.apache.ws.commons.tcpmon.TCPMonBundle;
+import org.apache.ws.commons.tcpmon.core.filter.CharsetDecoderFilter;
+import org.apache.ws.commons.tcpmon.core.filter.HttpHeaderRewriter;
+import org.apache.ws.commons.tcpmon.core.filter.HttpProxyClientHandler;
+import org.apache.ws.commons.tcpmon.core.filter.HttpProxyServerHandler;
+import org.apache.ws.commons.tcpmon.core.filter.Pipeline;
+import org.apache.ws.commons.tcpmon.core.filter.RequestLineExtractor;
+import org.apache.ws.commons.tcpmon.core.filter.Tee;
+import org.apache.ws.commons.tcpmon.core.filter.XmlFormatFilter;
import java.io.IOException;
import java.io.InputStream;
@@ -26,7 +34,6 @@
import java.io.StringWriter;
import java.io.Writer;
import java.net.Socket;
-import java.net.URL;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
@@ -65,7 +72,7 @@
/**
* Field outSocket
*/
- Socket outSocket = null;
+ volatile Socket outSocket = null;
/**
* Field clientThread
@@ -80,12 +87,12 @@
/**
* Field rr1
*/
- AbstractSocketRR rr1 = null;
+ SocketRR rr1 = null;
/**
* Field rr2
*/
- AbstractSocketRR rr2 = null;
+ SocketRR rr2 = null;
/**
* Field inputStream
@@ -155,7 +162,6 @@
int targetPort = config.getTargetPort();
InputStream tmpIn1 = inputStream;
OutputStream tmpOut1 = null;
- InputStream tmpIn2 = null;
OutputStream tmpOut2 = null;
if (tmpIn1 == null) {
tmpIn1 = inSocket.getInputStream();
@@ -163,157 +169,61 @@
if (inSocket != null) {
tmpOut1 = inSocket.getOutputStream();
}
- String bufferedData = null;
- StringBuffer buf = null;
- if (config.isProxy() || (HTTPProxyHost != null)) {
-
- // Check if we're a proxy
- byte[] b = new byte[1];
- buf = new StringBuffer();
- String s;
- for (; ;) {
- int len;
- len = tmpIn1.read(b, 0, 1);
- if (len == -1) {
- break;
- }
- s = new String(b);
- buf.append(s);
- if (b[0] != '\n') {
- continue;
- }
- break;
- }
- bufferedData = buf.toString();
- inputWriter.write(bufferedData);
- if (bufferedData.startsWith("GET ")
- || bufferedData.startsWith("POST ")
- || bufferedData.startsWith("PUT ")
- || bufferedData.startsWith("DELETE ")) {
- int start, end;
- URL url;
- start = bufferedData.indexOf(' ') + 1;
- while (bufferedData.charAt(start) == ' ') {
- start++;
- }
- end = bufferedData.indexOf(' ', start);
- String urlString = bufferedData.substring(start, end);
- if (urlString.charAt(0) == '/') {
- urlString = urlString.substring(1);
- }
- if (config.isProxy()) {
- url = new URL(urlString);
- targetHost = url.getHost();
- targetPort = url.getPort();
- if (targetPort == -1) {
- targetPort = 80;
- }
- setOutHost(targetHost);
- bufferedData = bufferedData.substring(0, start)
- + url.getFile()
- + bufferedData.substring(end);
- } else {
- url = new URL("http://" + targetHost + ":"
- + targetPort + "/" + urlString);
- setOutHost(targetHost);
- bufferedData = bufferedData.substring(0, start)
- + url.toExternalForm()
- + bufferedData.substring(end);
- targetHost = HTTPProxyHost;
- targetPort = HTTPProxyPort;
- }
- }
+
+ Pipeline requestPipeline = new Pipeline();
+ requestPipeline.addFilter(new RequestLineExtractor(50) {
+ protected void done(String requestLine) {
+ setRequest(requestLine);
+ }
+ });
+ if (config.isProxy()) {
+ requestPipeline.addFilter(new HttpProxyServerHandler() {
+ protected void handleConnection(String host, int port) throws IOException {
+ outSocket = new Socket(host, port);
+ }
+ });
+ } else if (HTTPProxyHost != null) {
+ requestPipeline.addFilter(new HttpProxyClientHandler(targetHost, targetPort));
+ outSocket = new Socket(HTTPProxyHost, HTTPProxyPort);
} else {
- //
- // Change Host: header to point to correct host
- //
- byte[] b1 = new byte[1];
- buf = new StringBuffer();
- String s1;
- String lastLine = null;
- for (; ;) {
- int len;
- len = tmpIn1.read(b1, 0, 1);
- if (len == -1) {
- break;
- }
- s1 = new String(b1);
- buf.append(s1);
- if (b1[0] != '\n') {
- continue;
- }
-
- // we have a complete line
- String line = buf.toString();
- buf.setLength(0);
-
- // check to see if we have found Host: header
- if (line.startsWith("Host: ")) {
-
- // we need to update the hostname to target host
- String newHost = "Host: " + targetHost + ":"
- + targetPort + "\r\n";
- bufferedData = bufferedData.concat(newHost);
- break;
- }
-
- // add it to our headers so far
- if (bufferedData == null) {
- bufferedData = line;
- } else {
- bufferedData = bufferedData.concat(line);
- }
-
- // failsafe
- if (line.equals("\r\n")) {
- break;
- }
- if ("\n".equals(lastLine) && line.equals("\n")) {
- break;
- }
- lastLine = line;
- }
- if (bufferedData != null) {
- inputWriter.write(bufferedData);
- int idx = (bufferedData.length() < 50)
- ? bufferedData.length()
- : 50;
- s1 = bufferedData.substring(0, idx);
- int i = s1.indexOf('\n');
- if (i > 0) {
- s1 = s1.substring(0, i - 1);
- }
- s1 = s1 + " "
- + " ";
- s1 = s1.substring(0, 51);
- setRequest(s1);
- }
+ requestPipeline.addFilter(new HttpHeaderRewriter("Host", targetHost + ":" + targetPort));
+ outSocket = new Socket(targetHost, targetPort);
}
- if (targetPort == -1) {
- targetPort = 80;
+ requestPipeline.addFilter(config.getSlowLink());
+ Tee requestTee = new Tee();
+ requestPipeline.addFilter(requestTee);
+ if (config.isXmlFormat()) {
+ requestPipeline.addFilter(new XmlFormatFilter(3));
+ }
+ requestPipeline.addFilter(new CharsetDecoderFilter(inputWriter));
+
+ // If we act as a proxy, we first need to read the start of the request before
+ // the outSocket is available.
+ while (outSocket == null) {
+ requestPipeline.readFrom(tmpIn1);
}
- outSocket = new Socket(targetHost, targetPort);
- tmpIn2 = outSocket.getInputStream();
+
tmpOut2 = outSocket.getOutputStream();
- SlowLinkSimulator slowLink = config.getSlowLink();
- if (bufferedData != null) {
- byte[] b = bufferedData.getBytes();
- tmpOut2.write(b);
- slowLink.pump(b.length);
+ requestTee.setOutputStream(tmpOut2);
+
+ Pipeline responsePipeline = new Pipeline();
+ responsePipeline.addFilter(new SlowLinkSimulator(config.getSlowLink()));
+ if (tmpOut1 != null) {
+ responsePipeline.addFilter(new Tee(tmpOut1));
}
- boolean format = config.isXmlFormat();
-
+ if (config.isXmlFormat()) {
+ responsePipeline.addFilter(new XmlFormatFilter(3));
+ }
+ responsePipeline.addFilter(new CharsetDecoderFilter(outputWriter));
+
// this is the channel to the endpoint
- rr1 = createInputSocketRR(inSocket, tmpIn1, outSocket, tmpOut2,
- format, slowLink);
-
- // create the response slow link from the inbound slow link
- SlowLinkSimulator responseLink =
- new SlowLinkSimulator(slowLink);
+ rr1 = new SocketRR(this, inSocket, tmpIn1, outSocket, tmpOut2, requestPipeline);
// this is the channel from the endpoint
- rr2 = createOutputSocketRR(outSocket, tmpIn2, inSocket, tmpOut1,
- format, responseLink);
+ rr2 = new SocketRR(this, outSocket, outSocket.getInputStream(), inSocket, tmpOut1, responsePipeline);
+
+ rr1.start();
+ rr2.start();
while ((rr1 != null) || (rr2 != null)) {
@@ -402,12 +312,6 @@
}
protected abstract void init(String time, String fromHost, String targetHost);
- protected abstract AbstractSocketRR createInputSocketRR(Socket inSocket,
- InputStream inputStream, Socket outSocket, OutputStream outputStream, boolean format,
- SlowLinkSimulator slowLink);
- protected abstract AbstractSocketRR createOutputSocketRR(Socket outSocket,
- InputStream inputStream, Socket inSocket, OutputStream outputStream, boolean format,
- SlowLinkSimulator slowLink);
protected abstract void setOutHost(String outHost);
protected abstract void setState(String state);
protected abstract void setRequest(String request);
Copied: webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/SocketRR.java (from r730497, webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/AbstractSocketRR.java)
URL: http://svn.apache.org/viewvc/webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/SocketRR.java?p2=webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/SocketRR.java&p1=webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/AbstractSocketRR.java&r1=730497&r2=730880&rev=730880&view=diff
==============================================================================
--- webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/AbstractSocketRR.java (original)
+++ webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/SocketRR.java Fri Jan 2 16:43:14 2009
@@ -16,18 +16,17 @@
package org.apache.ws.commons.tcpmon.core;
-import org.apache.ws.commons.tcpmon.SlowLinkSimulator;
+import org.apache.ws.commons.tcpmon.core.filter.Pipeline;
import java.io.InputStream;
import java.io.OutputStream;
-import java.io.Writer;
import java.net.Socket;
/**
* this class handles the pumping of data from the incoming socket to the
* outgoing socket
*/
-public abstract class AbstractSocketRR extends Thread {
+public class SocketRR extends Thread {
private final AbstractConnection connection;
/**
@@ -51,11 +50,6 @@
OutputStream out = null;
/**
- * Field xmlFormat
- */
- boolean xmlFormat;
-
- /**
* Field done
*/
volatile boolean done = false;
@@ -65,13 +59,8 @@
*/
volatile long elapsed = 0;
- /**
- * Field slowLink
- */
- SlowLinkSimulator slowLink;
-
- private final Writer writer;
-
+ private final Pipeline pipeline;
+
/**
* Constructor SocketRR
*
@@ -87,18 +76,15 @@
* @param type
* @param slowLink
*/
- public AbstractSocketRR(AbstractConnection connection, Socket inputSocket,
+ public SocketRR(AbstractConnection connection, Socket inputSocket,
InputStream inputStream, Socket outputSocket,
- OutputStream outputStream, boolean format,
- SlowLinkSimulator slowLink, Writer writer) {
+ OutputStream outputStream, Pipeline pipeline) {
this.connection = connection;
inSocket = inputSocket;
in = inputStream;
outSocket = outputSocket;
out = outputStream;
- xmlFormat = format;
- this.slowLink = slowLink;
- this.writer = writer;
+ this.pipeline = pipeline;
}
/**
@@ -119,152 +105,11 @@
*/
public void run() {
try {
- byte[] buffer = new byte[4096];
- byte[] tmpbuffer = new byte[8192];
- int saved = 0;
- int len;
- int i1, i2;
- int i;
- int reqSaved = 0;
- int tabWidth = 3;
- boolean atMargin = true;
- int thisIndent = -1, nextIndent = -1, previousIndent = -1;
- if (isSaveFirstLine()) {
- String tmpStr = getSavedFirstLine();
- if (!"".equals(tmpStr)) {
- reqSaved = tmpStr.length();
- }
+ // TODO: we should distinguish here between exceptions thrown when reading from the
+ // input stream and exceptions thrown in the pipeline
+ while (pipeline.readFrom(in) != -1) {
+ // Just loop
}
- long start = System.currentTimeMillis();
- a:
- for (; ;) {
-
- elapsed = System.currentTimeMillis() - start;
-
- if (done) {
- break;
- }
-
- // try{
- // len = in.available();
- // }catch(Exception e){len=0;}
- len = buffer.length;
-
- // Used to be 1, but if we block it doesn't matter
- // however 1 will break with some servers, including apache
- if (len == 0) {
- len = buffer.length;
- }
- if (saved + len > buffer.length) {
- len = buffer.length - saved;
- }
- int len1 = 0;
- while (len1 == 0) {
- try {
- len1 = in.read(buffer, saved, len);
- } catch (Exception ex) {
- if (done && (saved == 0)) {
- break a;
- }
- len1 = -1;
- break;
- }
- }
- len = len1;
- if ((len == -1) && (saved == 0)) {
- break;
- }
- if (len == -1) {
- done = true;
- }
-
- // No matter how we may (or may not) format it, send it
- // on unformatted - we don't want to mess with how its
- // sent to the other side, just how its displayed
- if ((out != null) && (len > 0)) {
- slowLink.pump(len);
- out.write(buffer, saved, len);
- }
-
- if (isSaveFirstLine() && (reqSaved < 50)) {
- String old = getSavedFirstLine();
- old = old + new String(buffer, saved, len);
- if (old.length() > 50) {
- old = old.substring(0, 50);
- }
- reqSaved = old.length();
- if ((i = old.indexOf('\n')) > 0) {
- old = old.substring(0, i - 1);
- reqSaved = 50;
- }
- setSavedFirstLine(old);
- }
-
-
- if (xmlFormat) {
-
- // Do XML Formatting
- boolean inXML = false;
- int bufferLen = saved;
- if (len != -1) {
- bufferLen += len;
- }
- i1 = 0;
- i2 = 0;
- saved = 0;
- for (; i1 < bufferLen; i1++) {
-
- // Except when we're at EOF, saved last char
- if ((len != -1) && (i1 + 1 == bufferLen)) {
- saved = 1;
- break;
- }
- thisIndent = -1;
- if ((buffer[i1] == '<')
- && (buffer[i1 + 1] != '/')) {
- previousIndent = nextIndent++;
- thisIndent = nextIndent;
- inXML = true;
- }
- if ((buffer[i1] == '<')
- && (buffer[i1 + 1] == '/')) {
- if (previousIndent > nextIndent) {
- thisIndent = nextIndent;
- }
- previousIndent = nextIndent--;
- inXML = true;
- }
- if ((buffer[i1] == '/')
- && (buffer[i1 + 1] == '>')) {
- previousIndent = nextIndent--;
- inXML = true;
- }
- if (thisIndent != -1) {
- if (thisIndent > 0) {
- tmpbuffer[i2++] = (byte) '\n';
- }
- for (i = tabWidth * thisIndent; i > 0; i--) {
- tmpbuffer[i2++] = (byte) ' ';
- }
- }
- atMargin = ((buffer[i1] == '\n')
- || (buffer[i1] == '\r'));
- if (!inXML || !atMargin) {
- tmpbuffer[i2++] = buffer[i1];
- }
- }
-
- writer.write(new String(tmpbuffer, 0, i2));
-
- // Shift saved bytes to the beginning
- for (i = 0; i < saved; i++) {
- buffer[i] = buffer[bufferLen - saved + i];
- }
- } else {
- writer.write(new String(buffer, 0, len));
- }
- }
-
} catch (Exception e) {
e.printStackTrace();
} finally {
@@ -322,8 +167,4 @@
e.printStackTrace();
}
}
-
- protected abstract boolean isSaveFirstLine();
- protected abstract String getSavedFirstLine();
- protected abstract void setSavedFirstLine(String value);
}
Added: webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/filter/CharsetDecoderFilter.java
URL: http://svn.apache.org/viewvc/webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/filter/CharsetDecoderFilter.java?rev=730880&view=auto
==============================================================================
--- webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/filter/CharsetDecoderFilter.java (added)
+++ webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/filter/CharsetDecoderFilter.java Fri Jan 2 16:43:14 2009
@@ -0,0 +1,39 @@
+/*
+ * Copyright 2004,2005 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ws.commons.tcpmon.core.filter;
+
+import java.io.IOException;
+import java.io.Writer;
+
+/**
+ * Filter that decodes the stream to character data and sends it to a {@link Writer}.
+ */
+public class CharsetDecoderFilter implements StreamFilter {
+ private final Writer writer;
+
+ public CharsetDecoderFilter(Writer writer) {
+ this.writer = writer;
+ }
+
+ public void invoke(Stream stream) throws IOException {
+ StringBuffer buffer = new StringBuffer(stream.available());
+ while (stream.available() > 0) {
+ buffer.append((char)stream.skip());
+ }
+ writer.write(buffer.toString());
+ }
+}
Added: webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/filter/HttpHeaderRewriter.java
URL: http://svn.apache.org/viewvc/webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/filter/HttpHeaderRewriter.java?rev=730880&view=auto
==============================================================================
--- webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/filter/HttpHeaderRewriter.java (added)
+++ webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/filter/HttpHeaderRewriter.java Fri Jan 2 16:43:14 2009
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2004,2005 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ws.commons.tcpmon.core.filter;
+
+/**
+ * Filter that replaces the value of a given HTTP header.
+ */
+public class HttpHeaderRewriter extends HttpRequestFilter {
+ private final String headerName;
+ private final String newValue;
+
+ public HttpHeaderRewriter(String headerName, String newValue) {
+ this.headerName = headerName;
+ this.newValue = newValue;
+ }
+
+ protected String processHeader(String name, String value) {
+ return headerName.equals(name) ? newValue : value;
+ }
+}
Added: webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/filter/HttpProxyClientHandler.java
URL: http://svn.apache.org/viewvc/webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/filter/HttpProxyClientHandler.java?rev=730880&view=auto
==============================================================================
--- webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/filter/HttpProxyClientHandler.java (added)
+++ webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/filter/HttpProxyClientHandler.java Fri Jan 2 16:43:14 2009
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2004,2005 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ws.commons.tcpmon.core.filter;
+
+import java.io.IOException;
+
+/**
+ * Filter that rewrites a plain HTTP request to an HTTP proxy request.
+ */
+public class HttpProxyClientHandler extends HttpRequestFilter {
+ private final String targetHost;
+ private final int targetPort;
+
+ public HttpProxyClientHandler(String targetHost, int targetPort) {
+ this.targetHost = targetHost;
+ this.targetPort = targetPort;
+ }
+
+ protected String processRequest(String request) throws IOException {
+ String[] parts = request.split(" ");
+ return parts[0] + " http://" + targetHost + ":" + targetPort + parts[1] + " " + parts[2];
+ }
+}
Added: webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/filter/HttpProxyServerHandler.java
URL: http://svn.apache.org/viewvc/webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/filter/HttpProxyServerHandler.java?rev=730880&view=auto
==============================================================================
--- webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/filter/HttpProxyServerHandler.java (added)
+++ webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/filter/HttpProxyServerHandler.java Fri Jan 2 16:43:14 2009
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2004,2005 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ws.commons.tcpmon.core.filter;
+
+import java.io.IOException;
+import java.net.URL;
+
+/**
+ * Filter that rewrites an HTTP proxy request to a plain HTTP request.
+ */
+public abstract class HttpProxyServerHandler extends HttpRequestFilter {
+ protected String processRequest(String request) throws IOException {
+ String[] parts = request.split(" ");
+ URL url = new URL(parts[1]);
+ int port = url.getPort();
+ handleConnection(url.getHost(), port == -1 ? 80 : port);
+ return parts[0] + " " + url.getFile() + " " + parts[2];
+ }
+
+ protected abstract void handleConnection(String host, int port) throws IOException;
+}
Added: webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/filter/HttpRequestFilter.java
URL: http://svn.apache.org/viewvc/webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/filter/HttpRequestFilter.java?rev=730880&view=auto
==============================================================================
--- webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/filter/HttpRequestFilter.java (added)
+++ webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/filter/HttpRequestFilter.java Fri Jan 2 16:43:14 2009
@@ -0,0 +1,123 @@
+/*
+ * Copyright 2004,2005 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ws.commons.tcpmon.core.filter;
+
+import java.io.IOException;
+
+/**
+ * Abstract filter that allows HTTP request rewriting.
+ */
+public abstract class HttpRequestFilter implements StreamFilter {
+ private static final int STATE_REQUEST = 0;
+ private static final int STATE_HEADER = 1;
+ private static final int STATE_DONE = 2;
+
+ private int state = STATE_REQUEST;
+
+ public void invoke(Stream stream) throws IOException {
+ while (stream.available() > 0) {
+ switch (state) {
+ case STATE_REQUEST: {
+ int eol = searchEndOfLine(stream);
+ if (eol == -1) {
+ // EOL not yet available; maybe next time...
+ return;
+ } else {
+ String orgRequest = getString(stream, 0, eol);
+ String request = processRequest(orgRequest);
+ if (request == orgRequest) {
+ stream.skip(eol+2);
+ } else {
+ stream.discard(eol);
+ insert(stream, request);
+ stream.skip(2);
+ }
+ state = STATE_HEADER;
+ }
+ break;
+ }
+ case STATE_HEADER: {
+ int eol = searchEndOfLine(stream);
+ if (eol == -1) {
+ return;
+ }
+ if (eol == 0) {
+ state = STATE_DONE;
+ break;
+ }
+ int colon = -1;
+ for (int i=0; i<eol; i++) {
+ if (stream.get(i) == ':') {
+ colon = i;
+ break;
+ }
+ }
+ String name = getString(stream, 0, colon);
+ int valueStart = colon+1;
+ while (stream.get(valueStart) == ' ') {
+ valueStart++;
+ }
+ String orgValue = getString(stream, valueStart, eol);
+ String value = processHeader(name, orgValue);
+ if (value == null) {
+ stream.discard(eol+2);
+ } else if (value == orgValue) {
+ stream.skip(eol+2);
+ } else {
+ stream.skip(valueStart);
+ stream.discard(eol-valueStart);
+ insert(stream, value);
+ stream.skip(2);
+ }
+ break;
+ }
+ default:
+ stream.skipAll();
+ }
+ }
+ }
+
+ private static int searchEndOfLine(Stream stream) {
+ for (int i=0; i<stream.available()-1; i++) {
+ if (stream.get(i) == '\r' && stream.get(i+1) == '\n') {
+ return i;
+ }
+ }
+ return -1;
+ }
+
+ private static String getString(Stream stream, int begin, int end) {
+ StringBuffer buffer = new StringBuffer(end-begin);
+ for (int i=begin; i<end; i++) {
+ buffer.append((char)stream.get(i));
+ }
+ return buffer.toString();
+ }
+
+ private static void insert(Stream stream, String s) throws IOException {
+ byte[] b = s.getBytes("ascii");
+ stream.insert(b, 0, b.length);
+ }
+
+ protected String processRequest(String request) throws IOException {
+ return request;
+ }
+
+ protected String processHeader(String name, String value) throws IOException {
+ return value;
+ }
+}
Added: webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/filter/Pipeline.java
URL: http://svn.apache.org/viewvc/webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/filter/Pipeline.java?rev=730880&view=auto
==============================================================================
--- webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/filter/Pipeline.java (added)
+++ webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/filter/Pipeline.java Fri Jan 2 16:43:14 2009
@@ -0,0 +1,324 @@
+/*
+ * Copyright 2004,2005 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ws.commons.tcpmon.core.filter;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.LinkedList;
+
+/**
+ * Class that sends a byte stream through a sequence of filters.
+ * Each filter receives as input the (potentially modified) output
+ * of the previous filter in the chain.
+ * <p>
+ * The pipeline works in push
+ * mode and is non blocking. This distinguishes it from a chain
+ * of (Filter)InputStreams. Also, a filter is not required to process
+ * all received data on each invocation (in which case it gets a chance to process
+ * the unprocessed data during the next invocation). This is useful when
+ * the filter requires more lookahead than is available. This
+ * distinguishes a pipeline from a chain of (Filter)OutputStreams (where each
+ * OutputStream is responsible itself for buffering unprocessed data).
+ * <p>
+ * Finally, the implementation optimizes buffer allocation and minimizes
+ * array copy operations.
+ */
+public class Pipeline {
+ private class StreamImpl implements Stream {
+ private final StreamFilter filter;
+ private StreamImpl next;
+ private byte[] inBuffer;
+ private int inOffset;
+ private int inLength;
+ private boolean lastBuffer;
+ private boolean preserve;
+ private int skipOffset;
+ private int skipLength;
+ private byte[] outBuffer;
+ private int outLength;
+ private boolean eosSignalled; // Set to true if the next filter has been invoked with eos == true
+
+ public StreamImpl(StreamFilter filter) {
+ this.filter = filter;
+ }
+
+ public void setNext(StreamImpl next) {
+ this.next = next;
+ }
+
+ public void invoke(byte[] buffer, int offset, int length, boolean eos, boolean preserve) throws IOException {
+ while (length > 0) {
+ if (inLength > 0) {
+ int c = fillBuffer(buffer, offset, length);
+ if (c == 0) {
+ throw new IOException("Pipeline buffer overflow");
+ }
+ offset += c;
+ length -= c;
+ } else {
+ setBuffer(buffer, offset, length, preserve);
+ offset += length;
+ length = 0;
+ }
+ this.lastBuffer = eos && length == 0;
+ filter.invoke(this);
+ }
+ flushSkip(eos);
+ flushOutput(eos);
+ if (inLength > 0) {
+ if (eos) {
+ throw new IllegalStateException("The filter didn't consume all its input");
+ }
+ if (this.preserve) {
+ compactBuffer();
+ }
+ } else if (inBuffer != null) {
+ if (!this.preserve) {
+ releaseBuffer(inBuffer);
+ }
+ inBuffer = null;
+ inOffset = 0;
+ inLength = 0;
+ }
+ }
+
+ private void setBuffer(byte[] buffer, int offset, int length, boolean preserve) throws IOException {
+ flushSkip(false);
+ if (inBuffer != null) {
+ if (inLength > 0) {
+ throw new IllegalStateException();
+ }
+ if (!this.preserve) {
+ releaseBuffer(inBuffer);
+ }
+ }
+ inBuffer = buffer;
+ inOffset = offset;
+ inLength = length;
+ this.preserve = preserve;
+ }
+
+ private int fillBuffer(byte[] buffer, int offset, int length) throws IOException {
+ if (!preserve && length <= inBuffer.length-inOffset-inLength) {
+ System.arraycopy(buffer, offset, inBuffer, inOffset+inLength, length);
+ inLength += length;
+ return length;
+ } else {
+ compactBuffer();
+ int c = Math.min(length, inBuffer.length-inOffset-inLength);
+ System.arraycopy(buffer, offset, inBuffer, inLength, c);
+ inLength += c;
+ return c;
+ }
+ }
+
+ private void compactBuffer() throws IOException {
+ flushSkip(false);
+ byte[] src = inBuffer;
+ if (preserve) {
+ inBuffer = allocateBuffer();
+ preserve = false;
+ }
+ System.arraycopy(src, inOffset, inBuffer, 0, inLength);
+ inOffset = 0;
+ }
+
+ private void invokeNext(byte[] buffer, int offset, int length, boolean eos, boolean preserve) throws IOException {
+ if (eos && eosSignalled) {
+ throw new IllegalStateException();
+ }
+ if (next != null) {
+ next.invoke(buffer, offset, length, eos, preserve);
+ } else if (!preserve) {
+ releaseBuffer(buffer);
+ }
+ eosSignalled = eos;
+ }
+
+ private void flushSkip(boolean eos) throws IOException {
+ if (skipLength > 0) {
+ if (outLength > 0) {
+ throw new IllegalStateException();
+ }
+ if (inLength == 0 && !preserve) {
+ invokeNext(inBuffer, skipOffset, skipLength, eos, false);
+ inBuffer = null;
+ inOffset = 0;
+ } else {
+ invokeNext(inBuffer, skipOffset, skipLength, eos, true);
+ }
+ skipLength = 0;
+ }
+ }
+
+ private void flushOutput(boolean eos) throws IOException {
+ if (outLength > 0) {
+ if (skipLength > 0) {
+ throw new IllegalStateException();
+ }
+ invokeNext(outBuffer, 0, outLength, eos, false);
+ outBuffer = null;
+ outLength = 0;
+ }
+ }
+
+ public int available() {
+ return inLength;
+ }
+
+ public boolean isEndOfStream() {
+ return lastBuffer && inLength == 0;
+ }
+
+ public int get() {
+ return get(0);
+ }
+
+ public int get(int lookahead) {
+ if (lookahead < 0) {
+ throw new ArrayIndexOutOfBoundsException();
+ } else if (lookahead >= inLength) {
+ if (lastBuffer) {
+ return -1;
+ } else {
+ throw new ArrayIndexOutOfBoundsException();
+ }
+ } else {
+ return inBuffer[inOffset+lookahead];
+ }
+ }
+
+ public int read(byte[] buffer, int offset, int length) {
+ int c = Math.min(length, inLength);
+ System.arraycopy(inBuffer, inOffset, buffer, offset, c);
+ return c;
+ }
+
+ public void readAll(OutputStream out) throws IOException {
+ out.write(inBuffer, inOffset, inLength);
+ }
+
+ public byte discard() {
+ byte b = inBuffer[inOffset];
+ discard(1);
+ return b;
+ }
+
+ public void discard(int len) {
+ if (len < 0 || len > inLength) {
+ throw new ArrayIndexOutOfBoundsException();
+ }
+ inOffset += len;
+ inLength -= len;
+ }
+
+ public void insert(byte b) throws IOException {
+ flushSkip(false);
+ if (outLength > 0 && outLength == outBuffer.length) {
+ flushOutput(false);
+ }
+ if (outBuffer == null) {
+ outBuffer = allocateBuffer();
+ }
+ outBuffer[outLength++] = b;
+ }
+
+ public void insert(byte[] buffer, int offset, int length) throws IOException {
+ flushSkip(false);
+ flushOutput(false);
+ invokeNext(buffer, offset, length, false, true);
+ }
+
+ public byte skip() throws IOException {
+ byte b = inBuffer[inOffset];
+ skip(1);
+ return b;
+ }
+
+ public void skip(int len) throws IOException {
+ if (len < 0 || len > inLength) {
+ throw new ArrayIndexOutOfBoundsException();
+ }
+ flushOutput(false);
+ if (skipLength == 0) {
+ skipOffset = inOffset;
+ skipLength = len;
+ } else if (skipOffset+skipLength != inOffset) {
+ // This means that some bytes have been discarded after
+ // the last skip operation.
+ flushSkip(false);
+ skipOffset = inOffset;
+ skipLength = len;
+ } else {
+ skipLength += len;
+ }
+ inOffset += len;
+ inLength -= len;
+ }
+
+ public void skipAll() throws IOException {
+ skip(inLength);
+ }
+ }
+
+ private final int bufferSize;
+ private final LinkedList buffers = new LinkedList();
+ private StreamImpl first;
+ private StreamImpl last;
+
+ public Pipeline(int bufferSize) {
+ this.bufferSize = bufferSize;
+ }
+
+ public Pipeline() {
+ this(4096);
+ }
+
+ byte[] allocateBuffer() {
+ return buffers.isEmpty() ? new byte[bufferSize] : (byte[])buffers.removeFirst();
+ }
+
+ void releaseBuffer(byte[] buffer) {
+ if (buffer == null) {
+ throw new IllegalArgumentException();
+ }
+ buffers.add(buffer);
+ }
+
+ public void addFilter(StreamFilter filter) {
+ StreamImpl node = new StreamImpl(filter);
+ if (first == null) {
+ first = node;
+ }
+ if (last != null) {
+ last.setNext(node);
+ }
+ last = node;
+ }
+
+ public int readFrom(InputStream in) throws IOException {
+ byte[] buffer = allocateBuffer();
+ int read = in.read(buffer);
+ if (read == -1) {
+ first.invoke(buffer, 0, 0, true, false);
+ } else {
+ first.invoke(buffer, 0, read, false, false);
+ }
+ return read;
+ }
+}
Added: webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/filter/RequestLineExtractor.java
URL: http://svn.apache.org/viewvc/webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/filter/RequestLineExtractor.java?rev=730880&view=auto
==============================================================================
--- webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/filter/RequestLineExtractor.java (added)
+++ webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/filter/RequestLineExtractor.java Fri Jan 2 16:43:14 2009
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2004,2005 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ws.commons.tcpmon.core.filter;
+
+import java.io.IOException;
+
+/**
+ * Filter that extracts the first line of a request, up to a given
+ * maximum length.
+ */
+public abstract class RequestLineExtractor implements StreamFilter {
+ private final byte[] buffer;
+ private int length;
+ private boolean done;
+
+ public RequestLineExtractor(int maxLength) {
+ this.buffer = new byte[maxLength];
+ }
+
+ public void invoke(Stream stream) throws IOException {
+ if (done) {
+ stream.skipAll();
+ } else {
+ while (!done && stream.available() > 0) {
+ byte b = stream.skip();
+ if (b == '\n') {
+ done = true;
+ } else {
+ buffer[length++] = b;
+ if (length == buffer.length) {
+ done = true;
+ }
+ }
+ }
+ if (done) {
+ stream.skipAll();
+ done(new String(buffer, 0, length, "ascii"));
+ }
+ }
+ }
+
+ protected abstract void done(String requestLine);
+}
Added: webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/filter/Stream.java
URL: http://svn.apache.org/viewvc/webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/filter/Stream.java?rev=730880&view=auto
==============================================================================
--- webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/filter/Stream.java (added)
+++ webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/filter/Stream.java Fri Jan 2 16:43:14 2009
@@ -0,0 +1,159 @@
+/*
+ * Copyright 2004,2005 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ws.commons.tcpmon.core.filter;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * Interface used by a filter to access the stream flowing through a pipeline.
+ */
+public interface Stream {
+ /**
+ * Get the number of bytes currently available in the stream.
+ *
+ * @return the number of available bytes
+ */
+ int available();
+
+ /**
+ * Check if the end of the stream has been reached.
+ *
+ * @return true if the end of the stream has been reached
+ */
+ boolean isEndOfStream();
+
+ /**
+ * Get the byte at the current position in the stream.
+ * Calling this method will not modify the current position in
+ * the stream.
+ *
+ * @return the byte at the current position in the stream
+ * or -1 if the end of the stream has been reached
+ * @throws ArrayIndexOutOfBoundsException if the byte at the
+ * current position is not yet available. This is the
+ * case if {@link #available()} returns 0 and
+ * {@link #isEndOfStream()} is returns false.
+ */
+ int get();
+
+ /**
+ * Get the byte at a given distance from the current position in
+ * the stream.
+ *
+ * @param lookahead the distance from the current position
+ * @return the byte at the given position, or -1 if the position
+ * is past the end of the stream
+ * @throws ArrayIndexOutOfBoundsException if the byte at the
+ * given position is not yet available
+ */
+ int get(int lookahead);
+
+ /**
+ * Read data from the stream into a byte array, starting from the
+ * current position in the stream.
+ * Calling this method will not modify the current position in
+ * the stream.
+ *
+ * @param buffer the buffer into which the data is read
+ * @param offset the start offset in array <code>buffer</code>
+ * at which the data is written
+ * @param length the maximum number of bytes to read
+ * @return the total number of bytes read into the buffer
+ */
+ int read(byte[] buffer, int offset, int length);
+
+ /**
+ * Read all currently available data from the stream and
+ * copy it to an {@link OutputStream} object.
+ * Calling this method will not modify the current position in
+ * the stream.
+ *
+ * @param out the output stream to write the data to
+ * @throws IOException if an I/O error occurred when writing
+ * to the output stream
+ */
+ void readAll(OutputStream out) throws IOException;
+
+ /**
+ * Discard the byte at the current position in the stream.
+ * This method increments the current position without copying
+ * the byte to the next filter in the pipeline.
+ *
+ * @return the byte at the current position in the stream
+ */
+ byte discard();
+
+ /**
+ * Discard a given number of bytes from the stream, starting
+ * at the current position.
+ *
+ * @param len the number of bytes to discard
+ */
+ void discard(int len);
+
+ /**
+ * Insert a byte at the current position in the stream.
+ * The logical position after invocation of this method will
+ * be just after the inserted byte, i.e. the inserted byte can't
+ * be read, discarded or skipped.
+ *
+ * @param b the byte to insert
+ * @throws IOException
+ */
+ void insert(byte b) throws IOException;
+
+ /**
+ * Insert a byte sequence at the current position in the stream.
+ * The logical position after invocation of this method will
+ * be just after the last inserted byte.
+ *
+ * @param buffer a byte array containing the sequence to be inserted in the stream
+ * @param offset the start offset in the byte array
+ * @param length the number of bytes to insert
+ * @throws IOException
+ */
+ void insert(byte[] buffer, int offset, int length) throws IOException;
+
+ /**
+ * Skip the byte at the current position in the stream.
+ * This will increment the current position and copy the byte
+ * to the next filter.
+ *
+ * @return the byte at the current position in the stream
+ * @throws IOException
+ */
+ byte skip() throws IOException;
+
+ /**
+ * Skip a given number of bytes in the stream, starting
+ * from the current position.
+ *
+ * @param len the number of bytes to skip
+ * @throws IOException
+ */
+ void skip(int len) throws IOException;
+
+ /**
+ * Skip all the bytes currently available in the stream.
+ * The instruction <code>s.skipAll()</code> is equivalent to
+ * <code>s.skip(s.available())</code>.
+ *
+ * @throws IOException
+ */
+ void skipAll() throws IOException;
+}
Added: webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/filter/StreamFilter.java
URL: http://svn.apache.org/viewvc/webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/filter/StreamFilter.java?rev=730880&view=auto
==============================================================================
--- webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/filter/StreamFilter.java (added)
+++ webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/filter/StreamFilter.java Fri Jan 2 16:43:14 2009
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2004,2005 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ws.commons.tcpmon.core.filter;
+
+import java.io.IOException;
+
+/**
+ * A filter acting on a stream.
+ */
+public interface StreamFilter {
+ /**
+ * Invoke the filter. This method is called by {@link Pipeline}
+ * when data is available for processing. The implementation can
+ * modify the stream by discarding bytes from the stream and
+ * inserting new data. If it doesn't wish to modify the stream,
+ * it should skip the relevant parts, so that it will be processed
+ * by the next filter in the pipeline.
+ * <p>
+ * An implementation is not required to process (skip or discard)
+ * all the data available on each invocation. If after the invocation
+ * of this method {@link Stream#available()} is non zero, the remaining
+ * (unprocessed) data will be available again during the next invocation
+ * of the filter.
+ *
+ * @param stream the stream to process
+ * @throws IOException
+ */
+ void invoke(Stream stream) throws IOException;
+}
Added: webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/filter/Tee.java
URL: http://svn.apache.org/viewvc/webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/filter/Tee.java?rev=730880&view=auto
==============================================================================
--- webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/filter/Tee.java (added)
+++ webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/filter/Tee.java Fri Jan 2 16:43:14 2009
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2004,2005 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ws.commons.tcpmon.core.filter;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * A filter that copies all the data in the stream to a given {@link OutputStream}.
+ * The data is written to the output stream before being processed by filters further
+ * down the pipeline. The target output stream can be specified using the
+ * {@link #Tee(OutputStream)} constructor or later using the
+ * {@link #setOutputStream(OutputStream)} method. If data is submitted to the filter before the
+ * output stream has been set, it will be held back until
+ * {@link #setOutputStream(OutputStream)} has been called and the filter is
+ * invoked again.
+ */
+public class Tee implements StreamFilter {
+ private OutputStream out;
+
+ public Tee() {
+ }
+
+ public Tee(OutputStream out) {
+ setOutputStream(out);
+ }
+
+ public void setOutputStream(OutputStream out) {
+ if (this.out != null) {
+ throw new IllegalStateException("The output stream has already been set");
+ }
+ this.out = out;
+ }
+
+ public void invoke(Stream stream) throws IOException {
+ if (out != null) {
+ stream.readAll(out);
+ stream.skipAll();
+ }
+ }
+}
Added: webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/filter/XmlFormatFilter.java
URL: http://svn.apache.org/viewvc/webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/filter/XmlFormatFilter.java?rev=730880&view=auto
==============================================================================
--- webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/filter/XmlFormatFilter.java (added)
+++ webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/core/filter/XmlFormatFilter.java Fri Jan 2 16:43:14 2009
@@ -0,0 +1,70 @@
+/*
+ * Copyright 2004,2005 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ws.commons.tcpmon.core.filter;
+
+import java.io.IOException;
+
+/**
+ * Filter that reformats XML data so that it is properly indented.
+ */
+public class XmlFormatFilter implements StreamFilter {
+ private final int tabWidth;
+ private int nextIndent = -1;
+ private int previousIndent = -1;
+
+ public XmlFormatFilter(int tabWidth) {
+ this.tabWidth = tabWidth;
+ }
+
+ public void invoke(Stream stream) throws IOException {
+ try {
+ boolean inXML = false;
+ while (stream.available() > 0) {
+ int thisIndent = -1;
+ if (stream.get(0) == '<' && stream.get(1) != '/') {
+ previousIndent = nextIndent++;
+ thisIndent = nextIndent;
+ inXML = true;
+ } else if (stream.get(0) == '<' && stream.get(1) == '/') {
+ if (previousIndent > nextIndent) {
+ thisIndent = nextIndent;
+ }
+ previousIndent = nextIndent--;
+ inXML = true;
+ } else if (stream.get(0) == '/' && stream.get(1) == '>') {
+ previousIndent = nextIndent--;
+ inXML = true;
+ }
+ if (thisIndent != -1) {
+ if (thisIndent > 0) {
+ stream.insert((byte) '\n');
+ }
+ for (int i = tabWidth * thisIndent; i > 0; i--) {
+ stream.insert((byte) ' ');
+ }
+ }
+ if (!inXML || (stream.get(0) != '\n' && stream.get(0) != '\r')) {
+ stream.skip();
+ } else {
+ stream.discard();
+ }
+ }
+ } catch (ArrayIndexOutOfBoundsException ex) {
+ return;
+ }
+ }
+}
Modified: webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/eclipse/ui/Connection.java
URL: http://svn.apache.org/viewvc/webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/eclipse/ui/Connection.java?rev=730880&r1=730879&r2=730880&view=diff
==============================================================================
--- webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/eclipse/ui/Connection.java (original)
+++ webservices/commons/trunk/modules/tcpmon/src/org/apache/ws/commons/tcpmon/eclipse/ui/Connection.java Fri Jan 2 16:43:14 2009
@@ -17,16 +17,13 @@
package org.apache.ws.commons.tcpmon.eclipse.ui;
import java.io.InputStream;
-import java.io.OutputStream;
import java.net.Socket;
import org.eclipse.swt.SWT;
import org.eclipse.swt.widgets.TableItem;
import org.eclipse.swt.widgets.Text;
-import org.apache.ws.commons.tcpmon.SlowLinkSimulator;
import org.apache.ws.commons.tcpmon.TCPMonBundle;
import org.apache.ws.commons.tcpmon.core.AbstractConnection;
-import org.apache.ws.commons.tcpmon.core.AbstractSocketRR;
/**
* a connection listens to a single current connection
@@ -109,21 +106,6 @@
outputWriter = new TextWidgetWriter(outputText);
}
- protected AbstractSocketRR createInputSocketRR(Socket inSocket, InputStream inputStream,
- Socket outSocket, OutputStream outputStream, boolean format,
- SlowLinkSimulator slowLink) {
- return new SocketRR(this, inSocket, inputStream, outSocket, outputStream,
- format, listener.connectionTable,
- listener.connections.indexOf(this) + 1, slowLink, inputWriter);
- }
-
- protected AbstractSocketRR createOutputSocketRR(Socket outSocket, InputStream inputStream,
- Socket inSocket, OutputStream outputStream, boolean format,
- SlowLinkSimulator slowLink) {
- return new SocketRR(this, outSocket, inputStream, inSocket, outputStream,
- format, null, 0, slowLink, outputWriter);
- }
-
private void setValue(final int column, final String value) {
final int index = listener.connections.indexOf(this);
if (index >= 0) {