You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2010/03/24 18:56:56 UTC
svn commit: r927138 - in
/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io:
BufferedInputStream.cpp BufferedInputStream.h
Author: tabish
Date: Wed Mar 24 17:56:55 2010
New Revision: 927138
URL: http://svn.apache.org/viewvc?rev=927138&view=rev
Log:
Rewrite BufferedInputStream for better performance.
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io/BufferedInputStream.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io/BufferedInputStream.h
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io/BufferedInputStream.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io/BufferedInputStream.cpp?rev=927138&r1=927137&r2=927138&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io/BufferedInputStream.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io/BufferedInputStream.cpp Wed Mar 24 17:56:55 2010
@@ -28,166 +28,32 @@ using namespace decaf::lang;
using namespace decaf::lang::exceptions;
////////////////////////////////////////////////////////////////////////////////
-namespace decaf{
-namespace io{
+BufferedInputStream::BufferedInputStream( InputStream* stream, bool own ) :
+ FilterInputStream( stream, own ),
+ pos(0), count(0), markLimit(-1), markPos(-1), bufferSize(8192),
+ buff( new unsigned char[bufferSize] ), deleteBuff(NULL) {
- class StreamBuffer {
- private:
-
- unsigned char* buffer;
- int bufferSize;
- int pos;
- int count;
- int markLimit;
- int markPos;
-
- public:
-
- StreamBuffer( int bufferSize ) {
-
- this->buffer = new unsigned char[bufferSize];
- this->bufferSize = bufferSize;
- this->pos = 0;
- this->count = 0;
- this->markLimit = 0;
- this->markPos = -1;
- }
-
- ~StreamBuffer() {
- delete [] this->buffer;
- }
-
- void resize( int newSize ) {
- unsigned char* temp = new unsigned char[newSize];
- System::arraycopy( temp, 0, buffer, 0, count );
- std::swap( temp, buffer );
- delete [] temp;
- this->bufferSize = newSize;
- }
-
- int getUnusedBytes() const{
- return bufferSize - count;
- }
-
- int available() const {
- return this->count - this->pos;
- }
-
- unsigned char next() {
- return this->buffer[this->pos++];
- }
-
- void advance( int amount ) {
- this->pos += amount;
- }
-
- void reverse( int amount ) {
- this->pos -= amount;
- }
-
- void advanceTail( int amount ) {
- this->count += amount;
- }
-
- int getBufferSize() {
- return this->bufferSize;
- }
-
- unsigned char* getBuffer() {
- return this->buffer;
- }
-
- int getCount() const{
- return count;
- }
-
- void setCount( int count ) {
- this->count = count;
- }
-
- int getPos() const{
- return pos;
- }
-
- void setPos( int pos ) {
- this->pos = pos;
- }
-
- void clear(){
- pos = count = 0;
- }
-
- bool isEmpty() const{
- return pos == count;
- }
-
- bool isMarked() const{
- return this->markPos != -1;
- }
-
- void reset() {
- this->pos = this->markPos;
- }
-
- void normalizeBuffer() {
- if( isEmpty() ){
- clear();
- }
- }
-
- void mark( int readLimit ) {
- this->markLimit = readLimit;
- this->markPos = this->pos;
- }
-
- void unmark() {
- this->markLimit = 0;
- this->markPos = -1;
- }
-
- bool isReadLimitExceeded() const {
- return pos - markPos >= markLimit;
- }
-
- int getMarkPos() const {
- return this->markPos;
- }
-
- void setMarkPos( int markPos ) {
- this->markPos = markPos;
- }
-
- int getMarkLimit() const {
- return this->markLimit;
- }
- };
-
-}}
-
-////////////////////////////////////////////////////////////////////////////////
-BufferedInputStream::BufferedInputStream( InputStream* stream, bool own )
-: FilterInputStream( stream, own ) {
-
- // Default to a 8k buffer.
- this->buffer.reset( new StreamBuffer( 8192 ) );
}
////////////////////////////////////////////////////////////////////////////////
BufferedInputStream::BufferedInputStream( InputStream* stream, int bufferSize, bool own )
- throw ( lang::exceptions::IllegalArgumentException ) : FilterInputStream( stream, own ) {
+ throw ( lang::exceptions::IllegalArgumentException ) :
+ FilterInputStream( stream, own ),
+ pos(0), count(0), markLimit(-1), markPos(-1), bufferSize(bufferSize), buff(NULL), deleteBuff(NULL) {
if( bufferSize < 0 ) {
throw new IllegalArgumentException(
__FILE__, __LINE__, "Size must be greater than zero");
}
- this->buffer.reset( new StreamBuffer( bufferSize ) );
+ this->buff = new unsigned char[bufferSize];
}
////////////////////////////////////////////////////////////////////////////////
BufferedInputStream::~BufferedInputStream() {
try{
this->close();
+ delete this->deleteBuff;
}
DECAF_CATCH_NOTHROW( IOException )
DECAF_CATCHALL_NOTHROW()
@@ -201,44 +67,46 @@ void BufferedInputStream::close() throw(
// Free the class reference, read operation may still be
// holding onto the buffer while blocked.
- this->buffer.reset( NULL );
+ std::swap( this->buff, this->deleteBuff );
}
////////////////////////////////////////////////////////////////////////////////
int BufferedInputStream::available() const throw ( IOException ) {
- if( buffer == NULL || this->isClosed() ) {
+ if( buff == NULL || this->isClosed() ) {
throw IOException(
__FILE__, __LINE__,
"BufferedInputStream::available - Buffer was closed");
}
- return buffer->available() + inputStream->available();
+ return ( this->count - this->pos ) + inputStream->available();
}
////////////////////////////////////////////////////////////////////////////////
void BufferedInputStream::mark( int readLimit ) {
- if( this->buffer != NULL ) {
- this->buffer->mark( readLimit );
+
+ if( this->buff != NULL ) {
+ this->markLimit = readLimit;
+ this->markPos = this->pos;
}
}
////////////////////////////////////////////////////////////////////////////////
void BufferedInputStream::reset() throw ( IOException ) {
- if( this->buffer == NULL ) {
+ if( this->buff == NULL ) {
throw IOException(
__FILE__, __LINE__,
"BufferedInputStream::reset - This stream has been closed." );
}
- if( !this->buffer->isMarked() ) {
+ if( this->markPos == -1 ) {
throw IOException(
__FILE__, __LINE__,
"BufferedInputStream::reset - The mark position was invalidated." );
}
- this->buffer->reset();
+ this->pos = this->markPos;
}
////////////////////////////////////////////////////////////////////////////////
@@ -248,7 +116,7 @@ int BufferedInputStream::doReadByte() th
// Use a local reference in case of unsynchronized close.
InputStream* inputStream = this->inputStream;
- Pointer<StreamBuffer> buffer = this->buffer;
+ unsigned char* buffer = this->buff;
if( isClosed() || buffer == NULL ){
throw IOException(
@@ -257,7 +125,7 @@ int BufferedInputStream::doReadByte() th
}
// Are there buffered bytes available? Or can we read more?
- if( buffer->isEmpty() && bufferData( inputStream, buffer ) == -1 ) {
+ if( this->pos >= this->count && bufferData( inputStream, buffer ) == -1 ) {
return -1;
}
@@ -268,8 +136,8 @@ int BufferedInputStream::doReadByte() th
"BufferedInputStream::bufferData - Stream is closed" );
}
- if( !buffer->isEmpty() ) {
- return buffer->next();
+ if( this->pos != this->count ) {
+ return buffer[this->pos++];;
}
return -1;
@@ -279,8 +147,7 @@ int BufferedInputStream::doReadByte() th
}
////////////////////////////////////////////////////////////////////////////////
-int BufferedInputStream::doReadArrayBounded( unsigned char* buffer, int size,
- int offset, int length )
+int BufferedInputStream::doReadArrayBounded( unsigned char* buffer, int size, int offset, int length )
throw ( decaf::io::IOException,
decaf::lang::exceptions::IndexOutOfBoundsException,
decaf::lang::exceptions::NullPointerException ) {
@@ -288,17 +155,16 @@ int BufferedInputStream::doReadArrayBoun
try{
// Use a local reference in case of unsynchronized close.
- Pointer<StreamBuffer> streamBuffer = this->buffer;
+ unsigned char* lbuffer = this->buff;
- if( streamBuffer == NULL ){
+ if( lbuffer == NULL ){
throw IOException(
__FILE__, __LINE__, "Stream is closed" );
}
if( buffer == NULL ) {
throw NullPointerException(
- __FILE__, __LINE__,
- "Buffer passed was NULL." );
+ __FILE__, __LINE__, "Buffer passed was NULL." );
}
if( offset > size || offset < 0 ) {
@@ -311,11 +177,6 @@ int BufferedInputStream::doReadArrayBoun
__FILE__, __LINE__, "length parameter out of Bounds: %d.", length );
}
- if( buffer == NULL ) {
- throw NullPointerException(
- __FILE__, __LINE__, "Buffer pointer passed was NULL." );
- }
-
// For zero, do nothing
if( length == 0 ) {
return 0;
@@ -334,14 +195,13 @@ int BufferedInputStream::doReadArrayBoun
// There are bytes available in the buffer so use them up first and
// then we check to see if any are available on the stream, if not
// then just return what we had.
- if( !streamBuffer->isEmpty() ) {
+ if( this->pos < this->count ) {
- int copylength =
- streamBuffer->available() >= length ? length : streamBuffer->available();
+ int available = this->count - this->pos;
+ int copylength = available >= length ? length : available;
- System::arraycopy( streamBuffer->getBuffer(), streamBuffer->getPos(),
- buffer, offset, copylength );
- streamBuffer->advance( copylength );
+ System::arraycopy( lbuffer, this->pos, buffer, offset, copylength );
+ this->pos += copylength;
if( copylength == length || inputStream->available() == 0 ) {
return (int)copylength;
@@ -359,7 +219,7 @@ int BufferedInputStream::doReadArrayBoun
// If we're not marked and the required size is greater than the
// buffer, simply read the bytes directly bypassing the buffer.
- if( !this->buffer->isMarked() && required >= this->buffer->getBufferSize() ) {
+ if( this->markPos == -1 && required >= this->bufferSize ) {
read = inputStream->read( buffer, size, offset, required );
if( read == -1 ) {
@@ -368,31 +228,31 @@ int BufferedInputStream::doReadArrayBoun
} else {
- if( bufferData( inputStream, streamBuffer ) == -1 ) {
+ if( bufferData( inputStream, lbuffer ) == -1 ) {
return required == length ? -1 : (int)( length - required );
}
// Stream might have closed while we were buffering.
- if( isClosed() || this->buffer == NULL ){
+ if( isClosed() || this->buff == NULL ){
throw IOException(
__FILE__, __LINE__,
"BufferedInputStream::bufferData - Stream is closed" );
}
- read = (int)( streamBuffer->available() >= required ? required : streamBuffer->available() );
- System::arraycopy( streamBuffer->getBuffer(), streamBuffer->getPos(),
- buffer, offset, read );
- streamBuffer->advance( read );
+ int available = this->count - this->pos;
+ read = available >= required ? required : available;
+ System::arraycopy( lbuffer, this->pos, buffer, offset, read );
+ this->pos += read;
}
required -= read;
if( required == 0 ) {
- return (int)length;
+ return length;
}
if( inputStream->available() == 0 ) {
- return (int)( length - required );
+ return length - required;
}
offset += read;
@@ -416,39 +276,39 @@ long long BufferedInputStream::skip( lon
// Use a local reference in case of unsynchronized close.
InputStream* inputStream = this->inputStream;
- Pointer<StreamBuffer> streamBuffer = this->buffer;
+ unsigned char* lbuffer = this->buff;
- if( isClosed() || streamBuffer == NULL ){
+ if( isClosed() || lbuffer == NULL ){
throw IOException(
__FILE__, __LINE__,
"BufferedInputStream::skip - Stream is closed" );
}
- if( streamBuffer->available() >= amount ) {
- streamBuffer->advance( (int)amount );
+ if( ( this->count - this->pos ) >= amount ) {
+ this->pos += (int)amount;
return amount;
}
- int read = streamBuffer->available();
+ long long read = this->count - this->pos;
- streamBuffer->advance( streamBuffer->getCount() );
+ this->pos = this->count;
- if( streamBuffer->isMarked() ) {
+ if( this->markPos != -1 ) {
- if( amount <= streamBuffer->getMarkLimit() ) {
+ if( amount <= this->markLimit ) {
- if( bufferData( inputStream, streamBuffer ) == -1 ) {
+ if( bufferData( inputStream, lbuffer ) == -1 ) {
return read;
}
- if( streamBuffer->available() >= ( (int)amount - read ) ) {
- streamBuffer->advance( (int)amount - read );
+ if( ( this->count - this->pos ) >= ( amount - read ) ) {
+ this->pos += (int)( amount - read );
return amount;
}
// Couldn't get all the bytes, skip what we read
- read += streamBuffer->available();
- streamBuffer->advance( streamBuffer->getCount() );
+ read += this->count - this->pos;
+ this->pos = this->count;
return read;
}
@@ -461,48 +321,53 @@ long long BufferedInputStream::skip( lon
}
////////////////////////////////////////////////////////////////////////////////
-int BufferedInputStream::bufferData( InputStream* inputStream, Pointer<StreamBuffer>& buffer )
+int BufferedInputStream::bufferData( InputStream* inputStream, unsigned char*& buffer )
throw ( decaf::io::IOException ){
try{
- if( !buffer->isMarked() || buffer->isReadLimitExceeded() ) {
+ if( this->markPos == -1 || pos - markPos >= markLimit ) {
// Mark position not set or exceeded readlimit
- int result = inputStream->read( buffer->getBuffer(), buffer->getBufferSize() );
+ int result = inputStream->read( buffer, this->bufferSize );
if( result > 0 ) {
- buffer->unmark();
- buffer->clear();
- buffer->advanceTail( result == -1 ? 0 : result );
+ this->markLimit = 0;
+ this->markPos = -1;
+ this->pos = this->count = 0;
+ this->count = result == -1 ? 0 : result;
}
return result;
}
- int markPos = buffer->getMarkPos();
- int markLimit = buffer->getMarkLimit();
-
- if( markPos == 0 && markLimit > buffer->getBufferSize() ) {
+ if( this->markPos == 0 && this->markLimit > this->bufferSize ) {
// Increase buffer size to accommodate the readlimit.
- int newLength = buffer->getBufferSize() * 2;
+ int newLength = this->bufferSize * 2;
if( newLength > markLimit ) {
newLength = markLimit;
}
- buffer->resize( newLength );
- } else if( markPos > 0 ) {
- System::arraycopy( buffer->getBuffer(), markPos,
- buffer->getBuffer(), 0, buffer->getBufferSize() - markPos );
+
+ unsigned char* temp = new unsigned char[newLength];
+ System::arraycopy( temp, 0, buffer, 0, count );
+ std::swap( temp, buffer );
+ delete [] temp;
+ this->bufferSize = newLength;
+
+ if( this->buff != NULL ) {
+ this->buff = buffer;
+ }
+
+ } else if( this->markPos > 0 ) {
+ System::arraycopy( buffer, markPos, buffer, 0, this->bufferSize - markPos );
}
// Set the new position and mark position
- buffer->reverse( markPos );
- buffer->setCount( 0 );
- buffer->setMarkPos( 0 );
+ this->pos -= this->markPos;
+ this->count = this->markPos = 0;
- int bytesread = inputStream->read( buffer->getBuffer(), buffer->getBufferSize(),
- buffer->getPos(), buffer->getBufferSize() - buffer->getPos() );
+ int bytesread = inputStream->read( buffer, this->bufferSize, this->pos, this->bufferSize - this->pos );
- buffer->setCount( bytesread <= 0 ? buffer->getPos() : buffer->getPos() + bytesread );
+ this->count = bytesread <= 0 ? this->pos : this->pos + bytesread;
return bytesread;
}
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io/BufferedInputStream.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io/BufferedInputStream.h?rev=927138&r1=927137&r2=927138&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io/BufferedInputStream.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io/BufferedInputStream.h Wed Mar 24 17:56:55 2010
@@ -21,13 +21,10 @@
#include <decaf/util/Config.h>
#include <decaf/io/FilterInputStream.h>
#include <decaf/lang/exceptions/IllegalArgumentException.h>
-#include <decaf/lang/Pointer.h>
namespace decaf{
namespace io{
- class StreamBuffer;
-
/**
* A wrapper around another input stream that performs
* a buffered read, where it reads more data than it needs
@@ -37,10 +34,15 @@ namespace io{
class DECAF_API BufferedInputStream : public FilterInputStream {
private:
- // Internal data buffer, uses a smart pointer so that async close
- // operations allow read methods fail gracefully instead of segfaulting
- // on access to invalid memory.
- decaf::lang::Pointer<StreamBuffer> buffer;
+ int pos;
+ int count;
+ int markLimit;
+ int markPos;
+ int bufferSize;
+ unsigned char* buff;
+
+ // Used to swap the active buffer with so it can be safely deleted later.
+ unsigned char* deleteBuff;
private:
@@ -119,7 +121,7 @@ namespace io{
private:
- int bufferData( InputStream* stream, decaf::lang::Pointer<StreamBuffer>& buffer )
+ int bufferData( InputStream* stream, unsigned char*& buffer )
throw ( decaf::io::IOException );
};