You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commons-dev@ws.apache.org by sc...@apache.org on 2008/05/22 17:27:14 UTC
svn commit: r659134 - in /webservices/commons/trunk/modules/axiom/modules:
axiom-api/src/main/java/org/apache/axiom/attachments/impl/
axiom-api/src/main/java/org/apache/axiom/attachments/utils/
axiom-tests/src/test/java/org/apache/axiom/attachments/impl/
Author: scheu
Date: Thu May 22 08:27:13 2008
New Revision: 659134
URL: http://svn.apache.org/viewvc?rev=659134&view=rev
Log:
WSCOMMONS-349
Contributor:Rich Scheuerle
Performance Analysis: Tim Hefele
Localized improvements to PartFactory and related code to improve the througput for large (>100K) attachments.
Added verification test too.
Added:
webservices/commons/trunk/modules/axiom/modules/axiom-tests/src/test/java/org/apache/axiom/attachments/impl/BufferUtilsTest.java
- copied, changed from r652071, webservices/commons/trunk/modules/axiom/modules/axiom-tests/src/test/java/org/apache/axiom/attachments/impl/BufferUtilsTests.java
Removed:
webservices/commons/trunk/modules/axiom/modules/axiom-tests/src/test/java/org/apache/axiom/attachments/impl/BufferUtilsTests.java
Modified:
webservices/commons/trunk/modules/axiom/modules/axiom-api/src/main/java/org/apache/axiom/attachments/impl/BufferUtils.java
webservices/commons/trunk/modules/axiom/modules/axiom-api/src/main/java/org/apache/axiom/attachments/impl/PartFactory.java
webservices/commons/trunk/modules/axiom/modules/axiom-api/src/main/java/org/apache/axiom/attachments/impl/PartOnMemoryEnhanced.java
webservices/commons/trunk/modules/axiom/modules/axiom-api/src/main/java/org/apache/axiom/attachments/utils/BAAInputStream.java
webservices/commons/trunk/modules/axiom/modules/axiom-api/src/main/java/org/apache/axiom/attachments/utils/BAAOutputStream.java
Modified: webservices/commons/trunk/modules/axiom/modules/axiom-api/src/main/java/org/apache/axiom/attachments/impl/BufferUtils.java
URL: http://svn.apache.org/viewvc/webservices/commons/trunk/modules/axiom/modules/axiom-api/src/main/java/org/apache/axiom/attachments/impl/BufferUtils.java?rev=659134&r1=659133&r2=659134&view=diff
==============================================================================
--- webservices/commons/trunk/modules/axiom/modules/axiom-api/src/main/java/org/apache/axiom/attachments/impl/BufferUtils.java (original)
+++ webservices/commons/trunk/modules/axiom/modules/axiom-api/src/main/java/org/apache/axiom/attachments/impl/BufferUtils.java Thu May 22 08:27:13 2008
@@ -30,6 +30,7 @@
import javax.activation.DataSource;
import javax.activation.FileDataSource;
+import org.apache.axiom.attachments.utils.BAAOutputStream;
import org.apache.axiom.om.OMException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -43,8 +44,9 @@
private static Log log = LogFactory.getLog(BufferUtils.class);
// Performance testing indicates that 4K is the best size for medium
// and small payloads. And there is a neglible effect on large payloads.
- static int BUFFER_LEN = 4 * 1024; // Copy Buffer size
- static boolean ENABLE_FILE_CHANNEL = true; // Enable file channel optimization
+ public final static int BUFFER_LEN = 4 * 1024; // Copy Buffer size
+ static boolean ENABLE_FILE_CHANNEL = true; // Enable file channel optimization
+ static boolean ENABLE_BAAOS_OPT = true; // Enable BAAOutputStream opt
private static byte[] _cacheBuffer = new byte[BUFFER_LEN];
private static boolean _cacheBufferInUse = false;
@@ -63,13 +65,19 @@
throws IOException {
- // If this is a FileOutputStream, use th
+ // If this is a FileOutputStream, use the optimized method
if (ENABLE_FILE_CHANNEL && os instanceof FileOutputStream) {
if (inputStream2FileOutputStream(is, (FileOutputStream) os)) {
return;
}
}
+ // If this is a BAAOutputStream, use the optimized method
+ if (ENABLE_BAAOS_OPT && os instanceof BAAOutputStream) {
+ inputStream2BAAOutputStream(is, (BAAOutputStream) os, Long.MAX_VALUE);
+ return;
+ }
+
byte[] buffer = getTempBuffer();
try {
@@ -94,13 +102,18 @@
* @param is InputStream
* @param os OutputStream
* @param limit maximum number of bytes to read
- * @return total ytes read
+ * @return total bytes read
* @throws IOException
*/
public static int inputStream2OutputStream(InputStream is,
OutputStream os,
int limit)
throws IOException {
+
+ // If this is a BAAOutputStream, use the optimized method
+ if (ENABLE_BAAOS_OPT && os instanceof BAAOutputStream) {
+ return (int) inputStream2BAAOutputStream(is, (BAAOutputStream) os, (long) limit);
+ }
byte[] buffer = getTempBuffer();
int totalWritten = 0;
@@ -189,6 +202,21 @@
}
return true;
}
+
+ /**
+ * inputStream2BAAOutputStream
+ * @param is
+ * @param baaos
+ * @param limit
+ * @return
+ */
+ public static long inputStream2BAAOutputStream(InputStream is,
+ BAAOutputStream baaos,
+ long limit) throws IOException {
+ return baaos.receive(is, limit);
+ }
+
+
/**
* The method checks to see if attachment is eligble for optimization.
* An attachment is eligible for optimization if and only if the size of
Modified: webservices/commons/trunk/modules/axiom/modules/axiom-api/src/main/java/org/apache/axiom/attachments/impl/PartFactory.java
URL: http://svn.apache.org/viewvc/webservices/commons/trunk/modules/axiom/modules/axiom-api/src/main/java/org/apache/axiom/attachments/impl/PartFactory.java?rev=659134&r1=659133&r2=659134&view=diff
==============================================================================
--- webservices/commons/trunk/modules/axiom/modules/axiom-api/src/main/java/org/apache/axiom/attachments/impl/PartFactory.java (original)
+++ webservices/commons/trunk/modules/axiom/modules/axiom-api/src/main/java/org/apache/axiom/attachments/impl/PartFactory.java Thu May 22 08:27:13 2008
@@ -47,8 +47,18 @@
*/
public class PartFactory {
+ private static int inflight = 0; // How many attachments are currently being built.
+ private static String semifore = "PartFactory.semifore";
+
private static Log log = LogFactory.getLog(PartFactory.class);
+ // Maximum number of threads allowed through createPart
+ private static int INFLIGHT_MAX = 4;
+
+ // Constants for dynamic threshold
+ // Dynamic Threshold = availMemory / THRESHOLD_FACTOR
+ private static final int THRESHOLD_FACTOR = 5;
+
/**
* Creates a part from the input stream.
* The remaining parameters are used to determine if the
@@ -57,7 +67,7 @@
*
* @param in MIMEBodyPartInputStream
* @param isSOAPPart
- * @param threshholdSize
+ * @param thresholdSize
* @param attachmentDir
* @param messageContentLength
* @return Part
@@ -65,14 +75,14 @@
*/
public static Part createPart(LifecycleManager manager, MIMEBodyPartInputStream in,
boolean isSOAPPart,
- int threshholdSize,
+ int thresholdSize,
String attachmentDir,
int messageContentLength
) throws OMException {
if(log.isDebugEnabled()){
log.debug("Start createPart()");
log.debug(" isSOAPPart=" + isSOAPPart);
- log.debug(" threshholdSize= " + threshholdSize);
+ log.debug(" thresholdSize= " + thresholdSize);
log.debug(" attachmentDir=" + attachmentDir);
log.debug(" messageContentLength " + messageContentLength);
}
@@ -84,44 +94,71 @@
Hashtable headers = new Hashtable();
InputStream dross = readHeaders(in, headers);
-
- if (isSOAPPart ||
- threshholdSize <= 0 ||
- (messageContentLength > 0 &&
- messageContentLength < threshholdSize)) {
- // If the entire message is less than the threshold size,
- // keep it in memory.
- // If this is a SOAPPart, keep it in memory.
+ Part part;
+ try {
- // Get the bytes of the data without a lot
- // of resizing and GC. The BAAOutputStream
- // keeps the data in non-contiguous byte buffers.
- BAAOutputStream baaos = new BAAOutputStream();
- BufferUtils.inputStream2OutputStream(dross, baaos);
- BufferUtils.inputStream2OutputStream(in, baaos);
- return new PartOnMemoryEnhanced(headers, baaos.buffers(), baaos.length());
- } else {
- // We need to read the input stream to determine whether
- // the size is bigger or smaller than the threshhold.
- BAAOutputStream baaos = new BAAOutputStream();
- int t1 = BufferUtils.inputStream2OutputStream(dross, baaos, threshholdSize);
- int t2 = BufferUtils.inputStream2OutputStream(in, baaos, threshholdSize - t1);
- int total = t1 + t2;
+ // Message throughput is increased if the number of threads in this
+ // section is limited to INFLIGHT_MAX. Allowing more threads tends to cause
+ // thrashing while reading from the HTTP InputStream.
+ // Allowing fewer threads reduces the thrashing. And when the remaining threads
+ // are notified their input (chunked) data is available.
+ synchronized(semifore) {
+ if (inflight >= INFLIGHT_MAX) {
+ semifore.wait();
+ }
+ inflight++;
+ }
+ if (thresholdSize > 0) {
+ // Get new threshold based on the current available memory in the runtime
+ thresholdSize = getRuntimeThreshold(thresholdSize, inflight);
+ }
+
- if (total < threshholdSize) {
- return new PartOnMemoryEnhanced(headers, baaos.buffers(), baaos.length());
+ if (isSOAPPart ||
+ thresholdSize <= 0 ||
+ (messageContentLength > 0 &&
+ messageContentLength < thresholdSize)) {
+ // If the entire message is less than the threshold size,
+ // keep it in memory.
+ // If this is a SOAPPart, keep it in memory.
+
+ // Get the bytes of the data without a lot
+ // of resizing and GC. The BAAOutputStream
+ // keeps the data in non-contiguous byte buffers.
+ BAAOutputStream baaos = new BAAOutputStream();
+ BufferUtils.inputStream2OutputStream(dross, baaos);
+ BufferUtils.inputStream2OutputStream(in, baaos);
+ part = new PartOnMemoryEnhanced(headers, baaos.buffers(), baaos.length());
} else {
- // A BAAInputStream is an input stream over a list of non-contiguous 4K buffers.
- BAAInputStream baais =
- new BAAInputStream(baaos.buffers(), baaos.length());
-
- return new PartOnFile(manager, headers,
- baais,
- in,
- attachmentDir);
+ // We need to read the input stream to determine whether
+ // the size is bigger or smaller than the threshold.
+ BAAOutputStream baaos = new BAAOutputStream();
+ int t1 = BufferUtils.inputStream2OutputStream(dross, baaos, thresholdSize);
+ int t2 = BufferUtils.inputStream2OutputStream(in, baaos, thresholdSize - t1);
+ int total = t1 + t2;
+
+ if (total < thresholdSize) {
+ return new PartOnMemoryEnhanced(headers, baaos.buffers(), baaos.length());
+ } else {
+ // A BAAInputStream is an input stream over a list of non-contiguous 4K buffers.
+ BAAInputStream baais =
+ new BAAInputStream(baaos.buffers(), baaos.length());
+
+ part = new PartOnFile(manager, headers,
+ baais,
+ in,
+ attachmentDir);
+ }
+
+ }
+ } finally {
+ synchronized(semifore) {
+ semifore.notify();
+ inflight--;
}
-
}
+
+ return part;
} catch (Exception e) {
throw new OMException(e);
@@ -243,6 +280,63 @@
}
/**
+ * This method checks the configured threshold and
+ * the current runtime information. If it appears that we could
+ * run out of memory, the threshold is reduced.
+ *
+ * This method allows the user to request a much larger threshold without
+ * fear of running out of memory. Using a larger in memory threshold generally
+ * results in better throughput.
+ *
+ * @param configThreshold
+ * @param inflight
+ * @return threshold
+ */
+ private static int getRuntimeThreshold(int configThreshold, int inflight) {
+
+ // Determine how much free memory is available
+ Runtime r = Runtime.getRuntime();
+ long totalmem = r.totalMemory();
+ long maxmem = r.maxMemory();
+ long freemem = r.freeMemory();
+
+ // @REVIEW
+ // If maximum is not defined...limit to 1G
+ if (maxmem == java.lang.Long.MAX_VALUE) {
+ maxmem = 1024*1024*1024;
+ }
+
+ long availmem = maxmem - (totalmem - freemem);
+
+
+ // Now determine the dynamic threshold
+ int dynamicThreshold = (int) availmem / (THRESHOLD_FACTOR * inflight);
+
+ // If it appears that we might run out of memory with this
+ // threshold, reduce the threshold size.
+ if (dynamicThreshold < configThreshold) {
+ if (log.isDebugEnabled()) {
+ log.debug("Using Runtime Attachment File Threshold " + dynamicThreshold);
+ log.debug("maxmem = " + maxmem);
+ log.debug("totalmem = " + totalmem);
+ log.debug("freemem = " + freemem);
+ log.debug("availmem = " + availmem);
+ }
+
+ } else {
+ dynamicThreshold = configThreshold;
+ if (log.isDebugEnabled()) {
+ log.debug("Using Configured Attachment File Threshold " + configThreshold);
+ log.debug("maxmem = " + maxmem);
+ log.debug("totalmem = " + totalmem);
+ log.debug("freemem = " + freemem);
+ log.debug("availmem = " + availmem);
+ }
+ }
+ return dynamicThreshold;
+ }
+
+ /**
* A normal ByteArrayOutputStream, except that it returns the buffer
* directly instead of returning a copy of the buffer.
*/
Modified: webservices/commons/trunk/modules/axiom/modules/axiom-api/src/main/java/org/apache/axiom/attachments/impl/PartOnMemoryEnhanced.java
URL: http://svn.apache.org/viewvc/webservices/commons/trunk/modules/axiom/modules/axiom-api/src/main/java/org/apache/axiom/attachments/impl/PartOnMemoryEnhanced.java?rev=659134&r1=659133&r2=659134&view=diff
==============================================================================
--- webservices/commons/trunk/modules/axiom/modules/axiom-api/src/main/java/org/apache/axiom/attachments/impl/PartOnMemoryEnhanced.java (original)
+++ webservices/commons/trunk/modules/axiom/modules/axiom-api/src/main/java/org/apache/axiom/attachments/impl/PartOnMemoryEnhanced.java Thu May 22 08:27:13 2008
@@ -63,7 +63,7 @@
public DataHandler getDataHandler() throws MessagingException {
DataSource ds = new MyByteArrayDataSource();
- return new DataHandler(ds);
+ return new MyDataHandler(ds);
}
/* (non-Javadoc)
@@ -83,6 +83,22 @@
}
+ class MyDataHandler extends DataHandler {
+
+ DataSource ds;
+ public MyDataHandler(DataSource ds) {
+ super(ds);
+ this.ds = ds;
+ }
+
+ public void writeTo(OutputStream os) throws IOException {
+ InputStream is = ds.getInputStream();
+ if (is instanceof BAAInputStream) {
+ ((BAAInputStream)is).writeTo(os);
+ }
+ }
+ }
+
/**
* A DataSource that is backed by the byte[] and
* headers map.
Modified: webservices/commons/trunk/modules/axiom/modules/axiom-api/src/main/java/org/apache/axiom/attachments/utils/BAAInputStream.java
URL: http://svn.apache.org/viewvc/webservices/commons/trunk/modules/axiom/modules/axiom-api/src/main/java/org/apache/axiom/attachments/utils/BAAInputStream.java?rev=659134&r1=659133&r2=659134&view=diff
==============================================================================
--- webservices/commons/trunk/modules/axiom/modules/axiom-api/src/main/java/org/apache/axiom/attachments/utils/BAAInputStream.java (original)
+++ webservices/commons/trunk/modules/axiom/modules/axiom-api/src/main/java/org/apache/axiom/attachments/utils/BAAInputStream.java Thu May 22 08:27:13 2008
@@ -18,21 +18,24 @@
*/
package org.apache.axiom.attachments.utils;
+import org.apache.axiom.attachments.impl.BufferUtils;
+
import java.io.IOException;
import java.io.InputStream;
+import java.io.OutputStream;
import java.util.ArrayList;
/**
* BAAInputStream is like a ByteArrayInputStream.
* A ByteArrayInputStream stores the backing data in a byte[].
* BAAInputStream stores the backing data in a Array of
- * 4K byte[]. Using several non-contiguous chunks reduces
+ * byte[]. Using several non-contiguous chunks reduces
* memory copy and resizing.
*/
public class BAAInputStream extends InputStream {
ArrayList data = new ArrayList();
- int BUFFER_SIZE = 4 * 1024;
+ final static int BUFFER_SIZE = BufferUtils.BUFFER_LEN;
int i;
int size;
int currIndex;
@@ -111,4 +114,22 @@
totalIndex = mark;
}
+ /**
+ * Write all of the buffers to the output stream
+ * @param os
+ * @throws IOException
+ */
+ public void writeTo(OutputStream os) throws IOException {
+
+ if (data != null) {
+ int numBuffers = data.size();
+ for (int j = 0; j < numBuffers-1; j ++) {
+ os.write( (byte[]) data.get(j), 0, BUFFER_SIZE);
+ }
+ if (numBuffers > 0) {
+ int writeLimit = size - ((numBuffers-1) * BUFFER_SIZE);
+ os.write( (byte[]) data.get(numBuffers-1), 0, writeLimit);
+ }
+ }
+ }
}
Modified: webservices/commons/trunk/modules/axiom/modules/axiom-api/src/main/java/org/apache/axiom/attachments/utils/BAAOutputStream.java
URL: http://svn.apache.org/viewvc/webservices/commons/trunk/modules/axiom/modules/axiom-api/src/main/java/org/apache/axiom/attachments/utils/BAAOutputStream.java?rev=659134&r1=659133&r2=659134&view=diff
==============================================================================
--- webservices/commons/trunk/modules/axiom/modules/axiom-api/src/main/java/org/apache/axiom/attachments/utils/BAAOutputStream.java (original)
+++ webservices/commons/trunk/modules/axiom/modules/axiom-api/src/main/java/org/apache/axiom/attachments/utils/BAAOutputStream.java Thu May 22 08:27:13 2008
@@ -18,7 +18,10 @@
*/
package org.apache.axiom.attachments.utils;
+import org.apache.axiom.attachments.impl.BufferUtils;
+
import java.io.IOException;
+import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
@@ -26,13 +29,13 @@
* BAAOutputStream is like a ByteArrayOutputStream.
* A ByteArrayOutputStream stores the backing data in a byte[].
* BAAOutputStream stores the backing data in a Array of
- * 4K byte[]. Using several non-contiguous chunks reduces
+ * byte[]. Using several non-contiguous chunks reduces
* memory copy and resizing.
*/
public class BAAOutputStream extends OutputStream {
ArrayList data = new ArrayList();
- int BUFFER_SIZE = 4 * 1024;
+ final static int BUFFER_SIZE = BufferUtils.BUFFER_LEN;
int index = 0;
byte[] currBuffer = null;
public BAAOutputStream() {
@@ -78,4 +81,38 @@
public int length() {
return (BUFFER_SIZE * (data.size()-1)) + index;
}
+
+ /**
+ * @param is InputStream containing data
+ * @param maxRead the maximum number of bytes to receive
+ * @return bytesReceived
+ */
+ public long receive(InputStream is, long maxRead) throws IOException {
+ long bytesReceived = 0;
+
+ // Now directly write to the buffers
+ boolean done = false;
+ while (!done) {
+
+ // Don't get more than will fit in the current buffer
+ int len = (int) Math.min(BUFFER_SIZE - index, maxRead-bytesReceived);
+
+ // Now get the bytes
+ int bytesRead = is.read(currBuffer, index, len);
+ if (bytesRead >= 0) {
+ bytesReceived += bytesRead;
+ index += bytesRead;
+ if (index >= BUFFER_SIZE) {
+ addBuffer();
+ }
+ if (bytesReceived >= maxRead) {
+ done = true;
+ }
+ } else {
+ done = true;
+ }
+ }
+
+ return bytesReceived;
+ }
}
Copied: webservices/commons/trunk/modules/axiom/modules/axiom-tests/src/test/java/org/apache/axiom/attachments/impl/BufferUtilsTest.java (from r652071, webservices/commons/trunk/modules/axiom/modules/axiom-tests/src/test/java/org/apache/axiom/attachments/impl/BufferUtilsTests.java)
URL: http://svn.apache.org/viewvc/webservices/commons/trunk/modules/axiom/modules/axiom-tests/src/test/java/org/apache/axiom/attachments/impl/BufferUtilsTest.java?p2=webservices/commons/trunk/modules/axiom/modules/axiom-tests/src/test/java/org/apache/axiom/attachments/impl/BufferUtilsTest.java&p1=webservices/commons/trunk/modules/axiom/modules/axiom-tests/src/test/java/org/apache/axiom/attachments/impl/BufferUtilsTests.java&r1=652071&r2=659134&rev=659134&view=diff
==============================================================================
--- webservices/commons/trunk/modules/axiom/modules/axiom-tests/src/test/java/org/apache/axiom/attachments/impl/BufferUtilsTests.java (original)
+++ webservices/commons/trunk/modules/axiom/modules/axiom-tests/src/test/java/org/apache/axiom/attachments/impl/BufferUtilsTest.java Thu May 22 08:27:13 2008
@@ -18,23 +18,26 @@
*/
package org.apache.axiom.attachments.impl;
+import javax.activation.DataHandler;
+import javax.activation.FileDataSource;
+import javax.mail.util.ByteArrayDataSource;
+
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
+import java.io.FilterInputStream;
+import java.io.FilterOutputStream;
import java.io.IOException;
-
-import javax.activation.DataHandler;
-import javax.activation.FileDataSource;
-import javax.activation.MimeType;
-import javax.mail.util.ByteArrayDataSource;
+import java.io.InputStream;
+import java.io.OutputStream;
import junit.framework.TestCase;
/**
* Simple test for the BufferUtils copying code
*/
-public class BufferUtilsTests extends TestCase {
+public class BufferUtilsTest extends TestCase {
byte[] bytes;
static final int MAX = 1024 * 1024;
@@ -55,17 +58,15 @@
public void test() throws Exception {
// Create temp file
File file = File.createTempFile("bufferUtils", "tst");
+ file.deleteOnExit();
try {
- //System.out.println(file.getCanonicalPath());
- //System.out.println(file.getName());
- FileOutputStream fos = new FileOutputStream(file, true);
+ OutputStream fos = new FileOutputStream(file, true);
for (int i = 0; i < 20; i++) {
- //long start = System.currentTimeMillis();
- ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
+ long start = System.currentTimeMillis();
+ InputStream bais = new ByteArrayInputStream(bytes);
BufferUtils.inputStream2OutputStream(bais, fos);
fos.flush();
- //long end = System.currentTimeMillis();
- //System.out.println(end - start);
+ long end = System.currentTimeMillis();
}
fos.close();
@@ -73,17 +74,15 @@
byte[] buffer = new byte[20];
fis.read(buffer);
for (int i = 0; i < buffer.length; i++) {
- //System.out.println(buffer[i]);
assertTrue(buffer[i] == (byte) i);
}
} finally {
file.delete();
- }
- file.delete();
+ }
}
public void testDataSourceBackedDataHandlerExceedLimit(){
- String imgFileLocation="modules/axiom-tests/test-resources/mtom/img/test2.jpg";
+ String imgFileLocation="test-resources/mtom/img/test2.jpg";
try{
String baseDir = new File(System.getProperty("basedir",".")).getCanonicalPath();
imgFileLocation = new File(baseDir +File.separator+ imgFileLocation).getAbsolutePath();
@@ -103,6 +102,7 @@
assertEquals(doesNotExceed, 0);
}
+
public void testObjectBackedDataHandlerExceedLimit(){
String str = "This is a test String";
try{