You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cxf.apache.org by dk...@apache.org on 2008/07/19 05:39:38 UTC
svn commit: r678094 - in /cxf/trunk: api/src/main/java/org/apache/cxf/io/
common/common/src/main/java/org/apache/cxf/helpers/
rt/transports/http/src/main/java/org/apache/cxf/transport/http/
rt/transports/http/src/main/java/org/apache/cxf/transport/http...
Author: dkulp
Date: Fri Jul 18 20:39:37 2008
New Revision: 678094
URL: http://svn.apache.org/viewvc?rev=678094&view=rev
Log:
Implement threshold for chunking. Some http cleanups. Throw exceptions on some errors.
Added:
cxf/trunk/api/src/main/java/org/apache/cxf/io/AbstractThresholdOutputStream.java (with props)
Modified:
cxf/trunk/api/src/main/java/org/apache/cxf/io/AbstractWrappedOutputStream.java
cxf/trunk/common/common/src/main/java/org/apache/cxf/helpers/LoadingByteArrayOutputStream.java
cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java
cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/policy/PolicyUtils.java
cxf/trunk/rt/transports/http/src/main/resources/schemas/wsdl/http-conf.xsd
cxf/trunk/rt/transports/http/src/test/java/org/apache/cxf/transport/http/HTTPConduitURLEasyMockTest.java
Added: cxf/trunk/api/src/main/java/org/apache/cxf/io/AbstractThresholdOutputStream.java
URL: http://svn.apache.org/viewvc/cxf/trunk/api/src/main/java/org/apache/cxf/io/AbstractThresholdOutputStream.java?rev=678094&view=auto
==============================================================================
--- cxf/trunk/api/src/main/java/org/apache/cxf/io/AbstractThresholdOutputStream.java (added)
+++ cxf/trunk/api/src/main/java/org/apache/cxf/io/AbstractThresholdOutputStream.java Fri Jul 18 20:39:37 2008
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.cxf.io;
+
+import java.io.IOException;
+
+import org.apache.cxf.helpers.LoadingByteArrayOutputStream;
+
+/**
+ * Outputstream that will buffer a certain amount before writing anything to the underlying
+ * stream. When the threshold is reached, provides a callback point to allow the
+ * subclass to update headers, replace/set the output stream, etc...
+ *
+ * Also provides a callback for when the stream is closed without it reaching the threshold.
+ */
+public abstract class AbstractThresholdOutputStream extends AbstractWrappedOutputStream {
+
+ protected int threshold;
+ protected LoadingByteArrayOutputStream buffer;
+
+ public AbstractThresholdOutputStream(int threshold) {
+ this.threshold = threshold;
+ if (threshold > 0) {
+ buffer = new LoadingByteArrayOutputStream(threshold + 1);
+ }
+ }
+
+
+ public abstract void thresholdReached();
+ public abstract void thresholdNotReached();
+
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ if (buffer != null) {
+ int space = threshold - buffer.size();
+ if (space > len) {
+ space = len;
+ }
+ buffer.write(b, off, space);
+ len -= space;
+ off += space;
+
+ if (buffer.size() >= threshold) {
+ thresholdReached();
+ unBuffer();
+ }
+ if (len == 0) {
+ return;
+ }
+ }
+ super.write(b, off, len);
+ }
+
+
+ @Override
+ public void write(int b) throws IOException {
+ if (buffer != null) {
+ buffer.write(b);
+ if (buffer.size() >= threshold) {
+ thresholdReached();
+ unBuffer();
+ }
+ return;
+ }
+ super.write(b);
+ }
+
+ public void unBuffer() throws IOException {
+ if (buffer != null) {
+ if (buffer.size() > 0) {
+ super.write(buffer.getRawBytes(), 0, buffer.size());
+ }
+ buffer = null;
+ }
+ }
+
+
+ @Override
+ public void close() throws IOException {
+ if (buffer != null) {
+ thresholdNotReached();
+ unBuffer();
+ }
+ super.close();
+ }
+
+}
Propchange: cxf/trunk/api/src/main/java/org/apache/cxf/io/AbstractThresholdOutputStream.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: cxf/trunk/api/src/main/java/org/apache/cxf/io/AbstractThresholdOutputStream.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Propchange: cxf/trunk/api/src/main/java/org/apache/cxf/io/AbstractThresholdOutputStream.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified: cxf/trunk/api/src/main/java/org/apache/cxf/io/AbstractWrappedOutputStream.java
URL: http://svn.apache.org/viewvc/cxf/trunk/api/src/main/java/org/apache/cxf/io/AbstractWrappedOutputStream.java?rev=678094&r1=678093&r2=678094&view=diff
==============================================================================
--- cxf/trunk/api/src/main/java/org/apache/cxf/io/AbstractWrappedOutputStream.java (original)
+++ cxf/trunk/api/src/main/java/org/apache/cxf/io/AbstractWrappedOutputStream.java Fri Jul 18 20:39:37 2008
@@ -50,11 +50,7 @@
@Override
public void write(byte[] b) throws IOException {
- if (!written) {
- onFirstWrite();
- written = true;
- }
- wrappedStream.write(b);
+ write(b, 0, b.length);
}
@Override
Modified: cxf/trunk/common/common/src/main/java/org/apache/cxf/helpers/LoadingByteArrayOutputStream.java
URL: http://svn.apache.org/viewvc/cxf/trunk/common/common/src/main/java/org/apache/cxf/helpers/LoadingByteArrayOutputStream.java?rev=678094&r1=678093&r2=678094&view=diff
==============================================================================
--- cxf/trunk/common/common/src/main/java/org/apache/cxf/helpers/LoadingByteArrayOutputStream.java (original)
+++ cxf/trunk/common/common/src/main/java/org/apache/cxf/helpers/LoadingByteArrayOutputStream.java Fri Jul 18 20:39:37 2008
@@ -53,4 +53,8 @@
}
return buf;
}
+
+ public byte[] getRawBytes() {
+ return buf;
+ }
}
\ No newline at end of file
Modified: cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java?rev=678094&r1=678093&r2=678094&view=diff
==============================================================================
--- cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java (original)
+++ cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/HTTPConduit.java Fri Jul 18 20:39:37 2008
@@ -51,7 +51,8 @@
import org.apache.cxf.configuration.security.ProxyAuthorizationPolicy;
import org.apache.cxf.helpers.CastUtils;
import org.apache.cxf.helpers.HttpHeaderHelper;
-import org.apache.cxf.io.AbstractWrappedOutputStream;
+import org.apache.cxf.helpers.LoadingByteArrayOutputStream;
+import org.apache.cxf.io.AbstractThresholdOutputStream;
import org.apache.cxf.io.CacheAndWriteOutputStream;
import org.apache.cxf.message.Exchange;
import org.apache.cxf.message.ExchangeImpl;
@@ -510,6 +511,7 @@
}
boolean isChunking = false;
+ int chunkThreshold = 0;
// We must cache the request if we have basic auth supplier
// without preemptive basic auth.
if (basicAuthSupplier != null) {
@@ -521,24 +523,23 @@
+ " We must cache request.");
}
if (getClient().isAutoRedirect()) {
- // If the AutoRedirect property is set then we cannot
- // use chunked streaming mode. We ignore the "AllowChunking"
- // property if AutoRedirect is turned on.
-
needToCacheRequest = true;
LOG.log(Level.INFO, "AutoRedirect is turned on.");
- } else {
- if (!connection.getRequestMethod().equals("GET")
- && getClient().isAllowChunking()
- && !needToCacheRequest) {
- //TODO: The chunking mode be configured or at least some
- // documented client constant.
- //use -1 and allow the URL connection to pick a default value
- connection.setChunkedStreamingMode(-1);
- isChunking = true;
+ }
+ if (!connection.getRequestMethod().equals("GET")
+ && getClient().isAllowChunking()) {
+ //TODO: The chunking mode be configured or at least some
+ // documented client constant.
+ //use -1 and allow the URL connection to pick a default value
+ isChunking = true;
+ chunkThreshold = getClient().getChunkingThreshold();
+ if (chunkThreshold <= 0) {
+ chunkThreshold = 0;
+ connection.setChunkedStreamingMode(-1);
}
}
+
//Do we need to maintain a session?
maintainSession = Boolean.TRUE.equals((Boolean)message.get(Message.MAINTAIN_SESSION));
@@ -567,8 +568,10 @@
message.setContent(OutputStream.class,
new WrappedOutputStream(
message, connection,
- needToCacheRequest, isChunking));
-
+ needToCacheRequest,
+ isChunking,
+ chunkThreshold));
+
// We are now "ready" to "send" the message.
}
@@ -1411,7 +1414,11 @@
+ newURL
+ "'");
}
- return connection;
+ throw new IOException("Redirect loop detected on Conduit \""
+ + getConduitName()
+ + "\" on '"
+ + newURL
+ + "'");
}
// We are going to redirect.
// Remove any Server Authentication Information for the previous
@@ -1515,7 +1522,12 @@
+ "\"");
}
- return connection;
+ throw new IOException("Authorization loop detected on Conduit \""
+ + getConduitName()
+ + "\" on URL \""
+ + "\" with realm \""
+ + realm
+ + "\"");
}
HttpBasicAuthSupplier.UserPass up =
@@ -1583,6 +1595,8 @@
}
message.put(KEY_HTTP_CONNECTION, connection);
+ connection.setFixedLengthStreamingMode(stream.size());
+
// Need to set the headers before the trust decision
// because they are set before the connect().
setURLRequestHeaders(message);
@@ -1602,7 +1616,7 @@
return connection;
}
- // Trust is okay, write the cached request.
+ // Trust is okay, write the cached request
OutputStream out = connection.getOutputStream();
stream.writeCacheTo(out);
@@ -1710,7 +1724,7 @@
* Wrapper output stream responsible for flushing headers and handling
* the incoming HTTP-level response (not necessarily the MEP response).
*/
- protected class WrappedOutputStream extends AbstractWrappedOutputStream {
+ protected class WrappedOutputStream extends AbstractThresholdOutputStream {
/**
* This field contains the currently active connection.
*/
@@ -1739,14 +1753,30 @@
Message m,
HttpURLConnection c,
boolean possibleRetransmit,
- boolean isChunking
+ boolean isChunking,
+ int chunkThreshold
) {
- super();
+ super(chunkThreshold);
this.outMessage = m;
connection = c;
cachingForRetransmission = possibleRetransmit;
chunking = isChunking;
}
+
+
+ @Override
+ public void thresholdNotReached() {
+ if (chunking) {
+ connection.setFixedLengthStreamingMode(buffer.size());
+ }
+ }
+
+ @Override
+ public void thresholdReached() {
+ if (chunking) {
+ connection.setChunkedStreamingMode(-1);
+ }
+ }
/**
* Perform any actions required on stream flush (freeze headers,
@@ -1811,6 +1841,12 @@
* Perform any actions required on stream closure (handle response etc.)
*/
public void close() throws IOException {
+ if (buffer != null && buffer.size() > 0) {
+ thresholdNotReached();
+ LoadingByteArrayOutputStream tmp = buffer;
+ buffer = null;
+ super.write(tmp.getRawBytes(), 0, tmp.size());
+ }
if (!written) {
handleHeadersTrustCaching();
}
@@ -1988,6 +2024,8 @@
incomingObserver.onMessage(inMessage);
}
+
+
}
/**
Modified: cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/policy/PolicyUtils.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/policy/PolicyUtils.java?rev=678094&r1=678093&r2=678094&view=diff
==============================================================================
--- cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/policy/PolicyUtils.java (original)
+++ cxf/trunk/rt/transports/http/src/main/java/org/apache/cxf/transport/http/policy/PolicyUtils.java Fri Jul 18 20:39:37 2008
@@ -302,7 +302,8 @@
}
if (compatible) {
- compatible &= p1.getContentType().equals(p2.getContentType());
+ compatible = !p1.isSetContentType() || !p2.isSetContentType()
+ || p1.getContentType().equals(p2.getContentType());
}
if (compatible) {
@@ -495,7 +496,7 @@
}
if (compatible) {
- compatible &= p1.getContentType().equals(p2.getContentType());
+ compatible &= compatible(p1.getContentType(), p2.getContentType());
}
if (compatible) {
Modified: cxf/trunk/rt/transports/http/src/main/resources/schemas/wsdl/http-conf.xsd
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/http/src/main/resources/schemas/wsdl/http-conf.xsd?rev=678094&r1=678093&r2=678094&view=diff
==============================================================================
--- cxf/trunk/rt/transports/http/src/main/resources/schemas/wsdl/http-conf.xsd (original)
+++ cxf/trunk/rt/transports/http/src/main/resources/schemas/wsdl/http-conf.xsd Fri Jul 18 20:39:37 2008
@@ -117,10 +117,10 @@
</xs:annotation>
</xs:attribute>
- <xs:attribute name="ContentType" type="xs:string" use="optional" default="text/xml">
+ <xs:attribute name="ContentType" type="xs:string" use="optional">
<xs:annotation>
<xs:documentation>
- what MIME type this reply is
+ what MIME type this reply is. Default is set by the binding.
</xs:documentation>
</xs:annotation>
</xs:attribute>
@@ -204,6 +204,14 @@
</xs:documentation>
</xs:annotation>
</xs:attribute>
+ <xs:attribute name="ChunkingThreshold" type="xs:int" use="optional" default="4096">
+ <xs:annotation>
+ <xs:documentation>
+ If AllowChunking is true, this sets the threshold at which mesages start
+ getting chunked. Messages under this limit do not get chunked.
+ </xs:documentation>
+ </xs:annotation>
+ </xs:attribute>
<xs:attribute name="Accept" type="xs:string" use="optional">
<xs:annotation>
@@ -229,7 +237,7 @@
</xs:annotation>
</xs:attribute>
- <xs:attribute name="ContentType" type="xs:string" use="optional" default="text/xml">
+ <xs:attribute name="ContentType" type="xs:string" use="optional">
<xs:annotation>
<xs:documentation>
The content type of the stream being sent in a post request
Modified: cxf/trunk/rt/transports/http/src/test/java/org/apache/cxf/transport/http/HTTPConduitURLEasyMockTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/http/src/test/java/org/apache/cxf/transport/http/HTTPConduitURLEasyMockTest.java?rev=678094&r1=678093&r2=678094&view=diff
==============================================================================
--- cxf/trunk/rt/transports/http/src/test/java/org/apache/cxf/transport/http/HTTPConduitURLEasyMockTest.java (original)
+++ cxf/trunk/rt/transports/http/src/test/java/org/apache/cxf/transport/http/HTTPConduitURLEasyMockTest.java Fri Jul 18 20:39:37 2008
@@ -36,6 +36,7 @@
import org.apache.cxf.bus.CXFBusImpl;
import org.apache.cxf.configuration.security.AuthorizationPolicy;
import org.apache.cxf.helpers.CastUtils;
+import org.apache.cxf.io.AbstractThresholdOutputStream;
import org.apache.cxf.message.Exchange;
import org.apache.cxf.message.Message;
import org.apache.cxf.message.MessageImpl;
@@ -408,6 +409,7 @@
conduit.getClient().setAutoRedirect(autoRedirect);
if (!autoRedirect) {
conduit.getClient().setAllowChunking(true);
+ conduit.getClient().setChunkingThreshold(0);
}
}
@@ -561,10 +563,12 @@
control.replay();
- OutputStream wrappedOS = message.getContent(OutputStream.class);
+ AbstractThresholdOutputStream wrappedOS
+ = (AbstractThresholdOutputStream) message.getContent(OutputStream.class);
assertNotNull("expected output stream", wrappedOS);
wrappedOS.write(PAYLOAD.getBytes());
+ wrappedOS.unBuffer();
control.verify();
control.reset();