You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2010/03/24 18:56:56 UTC

svn commit: r927138 - in /activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io: BufferedInputStream.cpp BufferedInputStream.h

Author: tabish
Date: Wed Mar 24 17:56:55 2010
New Revision: 927138

URL: http://svn.apache.org/viewvc?rev=927138&view=rev
Log:
Rewrite BufferedInputStream for better performance.  

Modified:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io/BufferedInputStream.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io/BufferedInputStream.h

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io/BufferedInputStream.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io/BufferedInputStream.cpp?rev=927138&r1=927137&r2=927138&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io/BufferedInputStream.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io/BufferedInputStream.cpp Wed Mar 24 17:56:55 2010
@@ -28,166 +28,32 @@ using namespace decaf::lang;
 using namespace decaf::lang::exceptions;
 
 ////////////////////////////////////////////////////////////////////////////////
-namespace decaf{
-namespace io{
+BufferedInputStream::BufferedInputStream( InputStream* stream, bool own ) :
+    FilterInputStream( stream, own ),
+    pos(0), count(0), markLimit(-1), markPos(-1), bufferSize(8192),
+    buff( new unsigned char[bufferSize] ), deleteBuff(NULL) {
 
-    class StreamBuffer {
-    private:
-
-        unsigned char* buffer;
-        int bufferSize;
-        int pos;
-        int count;
-        int markLimit;
-        int markPos;
-
-    public:
-
-        StreamBuffer( int bufferSize ) {
-
-            this->buffer = new unsigned char[bufferSize];
-            this->bufferSize = bufferSize;
-            this->pos = 0;
-            this->count = 0;
-            this->markLimit = 0;
-            this->markPos = -1;
-        }
-
-        ~StreamBuffer() {
-            delete [] this->buffer;
-        }
-
-        void resize( int newSize ) {
-            unsigned char* temp = new unsigned char[newSize];
-            System::arraycopy( temp, 0, buffer, 0, count );
-            std::swap( temp, buffer );
-            delete [] temp;
-            this->bufferSize = newSize;
-        }
-
-        int getUnusedBytes() const{
-            return bufferSize - count;
-        }
-
-        int available() const {
-            return this->count - this->pos;
-        }
-
-        unsigned char next() {
-            return this->buffer[this->pos++];
-        }
-
-        void advance( int amount ) {
-            this->pos += amount;
-        }
-
-        void reverse( int amount ) {
-            this->pos -= amount;
-        }
-
-        void advanceTail( int amount ) {
-            this->count += amount;
-        }
-
-        int getBufferSize() {
-            return this->bufferSize;
-        }
-
-        unsigned char* getBuffer() {
-            return this->buffer;
-        }
-
-        int getCount() const{
-            return count;
-        }
-
-        void setCount( int count ) {
-            this->count = count;
-        }
-
-        int getPos() const{
-            return pos;
-        }
-
-        void setPos( int pos ) {
-            this->pos = pos;
-        }
-
-        void clear(){
-            pos = count = 0;
-        }
-
-        bool isEmpty() const{
-            return pos == count;
-        }
-
-        bool isMarked() const{
-            return this->markPos != -1;
-        }
-
-        void reset() {
-            this->pos = this->markPos;
-        }
-
-        void normalizeBuffer() {
-            if( isEmpty() ){
-                clear();
-            }
-        }
-
-        void mark( int readLimit ) {
-            this->markLimit = readLimit;
-            this->markPos = this->pos;
-        }
-
-        void unmark() {
-            this->markLimit = 0;
-            this->markPos = -1;
-        }
-
-        bool isReadLimitExceeded() const {
-            return pos - markPos >= markLimit;
-        }
-
-        int getMarkPos() const {
-            return this->markPos;
-        }
-
-        void setMarkPos( int markPos ) {
-            this->markPos = markPos;
-        }
-
-        int getMarkLimit() const {
-            return this->markLimit;
-        }
-    };
-
-}}
-
-////////////////////////////////////////////////////////////////////////////////
-BufferedInputStream::BufferedInputStream( InputStream* stream, bool own )
-: FilterInputStream( stream, own ) {
-
-    // Default to a 8k buffer.
-    this->buffer.reset( new StreamBuffer( 8192 ) );
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 BufferedInputStream::BufferedInputStream( InputStream* stream, int bufferSize, bool own )
-    throw ( lang::exceptions::IllegalArgumentException ) : FilterInputStream( stream, own ) {
+    throw ( lang::exceptions::IllegalArgumentException ) :
+        FilterInputStream( stream, own ),
+        pos(0), count(0), markLimit(-1), markPos(-1), bufferSize(bufferSize), buff(NULL), deleteBuff(NULL) {
 
     if( bufferSize < 0 ) {
         throw new IllegalArgumentException(
             __FILE__, __LINE__, "Size must be greater than zero");
     }
 
-    this->buffer.reset( new StreamBuffer( bufferSize ) );
+    this->buff = new unsigned char[bufferSize];
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 BufferedInputStream::~BufferedInputStream() {
     try{
         this->close();
+        delete this->deleteBuff;
     }
     DECAF_CATCH_NOTHROW( IOException )
     DECAF_CATCHALL_NOTHROW()
@@ -201,44 +67,46 @@ void BufferedInputStream::close() throw(
 
     // Free the class reference, read operation may still be
     // holding onto the buffer while blocked.
-    this->buffer.reset( NULL );
+    std::swap( this->buff, this->deleteBuff );
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 int BufferedInputStream::available() const throw ( IOException ) {
 
-    if( buffer == NULL || this->isClosed() ) {
+    if( buff == NULL || this->isClosed() ) {
         throw IOException(
             __FILE__, __LINE__,
             "BufferedInputStream::available - Buffer was closed");
     }
 
-    return buffer->available() + inputStream->available();
+    return ( this->count - this->pos ) + inputStream->available();
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 void BufferedInputStream::mark( int readLimit ) {
-    if( this->buffer != NULL ) {
-        this->buffer->mark( readLimit );
+
+    if( this->buff != NULL ) {
+        this->markLimit = readLimit;
+        this->markPos = this->pos;
     }
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 void BufferedInputStream::reset() throw ( IOException ) {
 
-    if( this->buffer == NULL ) {
+    if( this->buff == NULL ) {
         throw IOException(
             __FILE__, __LINE__,
             "BufferedInputStream::reset - This stream has been closed." );
     }
 
-    if( !this->buffer->isMarked() ) {
+    if( this->markPos == -1 ) {
         throw IOException(
             __FILE__, __LINE__,
             "BufferedInputStream::reset - The mark position was invalidated." );
     }
 
-    this->buffer->reset();
+    this->pos = this->markPos;
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -248,7 +116,7 @@ int BufferedInputStream::doReadByte() th
 
         // Use a local reference in case of unsynchronized close.
         InputStream* inputStream = this->inputStream;
-        Pointer<StreamBuffer> buffer = this->buffer;
+        unsigned char* buffer = this->buff;
 
         if( isClosed() || buffer == NULL ){
             throw IOException(
@@ -257,7 +125,7 @@ int BufferedInputStream::doReadByte() th
         }
 
         // Are there buffered bytes available?  Or can we read more?
-        if( buffer->isEmpty() && bufferData( inputStream, buffer ) == -1 ) {
+        if( this->pos >= this->count && bufferData( inputStream, buffer ) == -1 ) {
             return -1;
         }
 
@@ -268,8 +136,8 @@ int BufferedInputStream::doReadByte() th
                 "BufferedInputStream::bufferData - Stream is closed" );
         }
 
-        if( !buffer->isEmpty() ) {
-            return buffer->next();
+        if( this->pos != this->count ) {
+            return buffer[this->pos++];;
         }
 
         return -1;
@@ -279,8 +147,7 @@ int BufferedInputStream::doReadByte() th
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-int BufferedInputStream::doReadArrayBounded( unsigned char* buffer, int size,
-                                             int offset, int length )
+int BufferedInputStream::doReadArrayBounded( unsigned char* buffer, int size, int offset, int length )
     throw ( decaf::io::IOException,
             decaf::lang::exceptions::IndexOutOfBoundsException,
             decaf::lang::exceptions::NullPointerException ) {
@@ -288,17 +155,16 @@ int BufferedInputStream::doReadArrayBoun
     try{
 
         // Use a local reference in case of unsynchronized close.
-        Pointer<StreamBuffer> streamBuffer = this->buffer;
+        unsigned char* lbuffer = this->buff;
 
-        if( streamBuffer == NULL ){
+        if( lbuffer == NULL ){
             throw IOException(
                 __FILE__, __LINE__, "Stream is closed" );
         }
 
         if( buffer == NULL ) {
             throw NullPointerException(
-                __FILE__, __LINE__,
-                "Buffer passed was NULL." );
+                __FILE__, __LINE__, "Buffer passed was NULL." );
         }
 
         if( offset > size || offset < 0 ) {
@@ -311,11 +177,6 @@ int BufferedInputStream::doReadArrayBoun
                 __FILE__, __LINE__, "length parameter out of Bounds: %d.", length );
         }
 
-        if( buffer == NULL ) {
-            throw NullPointerException(
-                __FILE__, __LINE__, "Buffer pointer passed was NULL." );
-        }
-
         // For zero, do nothing
         if( length == 0 ) {
             return 0;
@@ -334,14 +195,13 @@ int BufferedInputStream::doReadArrayBoun
         // There are bytes available in the buffer so use them up first and
         // then we check to see if any are available on the stream, if not
         // then just return what we had.
-        if( !streamBuffer->isEmpty() ) {
+        if( this->pos < this->count  ) {
 
-            int copylength =
-                streamBuffer->available() >= length ? length : streamBuffer->available();
+            int available = this->count - this->pos;
+            int copylength = available >= length ? length : available;
 
-            System::arraycopy( streamBuffer->getBuffer(), streamBuffer->getPos(),
-                               buffer, offset, copylength );
-            streamBuffer->advance( copylength );
+            System::arraycopy( lbuffer, this->pos, buffer, offset, copylength );
+            this->pos += copylength;
 
             if( copylength == length || inputStream->available() == 0 ) {
                 return (int)copylength;
@@ -359,7 +219,7 @@ int BufferedInputStream::doReadArrayBoun
 
             // If we're not marked and the required size is greater than the
             // buffer, simply read the bytes directly bypassing the buffer.
-            if( !this->buffer->isMarked() && required >= this->buffer->getBufferSize() ) {
+            if( this->markPos == -1 && required >= this->bufferSize ) {
 
                 read = inputStream->read( buffer, size, offset, required );
                 if( read == -1 ) {
@@ -368,31 +228,31 @@ int BufferedInputStream::doReadArrayBoun
 
             } else {
 
-                if( bufferData( inputStream, streamBuffer ) == -1 ) {
+                if( bufferData( inputStream, lbuffer ) == -1 ) {
                     return required == length ? -1 : (int)( length - required );
                 }
 
                 // Stream might have closed while we were buffering.
-                if( isClosed() || this->buffer == NULL ){
+                if( isClosed() || this->buff == NULL ){
                     throw IOException(
                         __FILE__, __LINE__,
                         "BufferedInputStream::bufferData - Stream is closed" );
                 }
 
-                read = (int)( streamBuffer->available() >= required ? required : streamBuffer->available() );
-                System::arraycopy( streamBuffer->getBuffer(), streamBuffer->getPos(),
-                                   buffer, offset, read );
-                streamBuffer->advance( read );
+                int available = this->count - this->pos;
+                read = available >= required ? required : available;
+                System::arraycopy( lbuffer, this->pos, buffer, offset, read );
+                this->pos += read;
             }
 
             required -= read;
 
             if( required == 0 ) {
-                return (int)length;
+                return length;
             }
 
             if( inputStream->available() == 0 ) {
-                return (int)( length - required );
+                return length - required;
             }
 
             offset += read;
@@ -416,39 +276,39 @@ long long BufferedInputStream::skip( lon
 
         // Use a local reference in case of unsynchronized close.
         InputStream* inputStream = this->inputStream;
-        Pointer<StreamBuffer> streamBuffer = this->buffer;
+        unsigned char* lbuffer = this->buff;
 
-        if( isClosed() || streamBuffer == NULL ){
+        if( isClosed() || lbuffer == NULL ){
             throw IOException(
                 __FILE__, __LINE__,
                 "BufferedInputStream::skip - Stream is closed" );
         }
 
-        if( streamBuffer->available() >= amount ) {
-            streamBuffer->advance( (int)amount );
+        if( ( this->count - this->pos ) >= amount ) {
+            this->pos += (int)amount;
             return amount;
         }
 
-        int read = streamBuffer->available();
+        long long read = this->count - this->pos;
 
-        streamBuffer->advance( streamBuffer->getCount() );
+        this->pos = this->count;
 
-        if( streamBuffer->isMarked() ) {
+        if( this->markPos != -1 ) {
 
-            if( amount <= streamBuffer->getMarkLimit() ) {
+            if( amount <= this->markLimit ) {
 
-                if( bufferData( inputStream, streamBuffer ) == -1 ) {
+                if( bufferData( inputStream, lbuffer ) == -1 ) {
                     return read;
                 }
 
-                if( streamBuffer->available() >= ( (int)amount - read ) ) {
-                    streamBuffer->advance( (int)amount - read );
+                if( ( this->count - this->pos ) >= ( amount - read ) ) {
+                    this->pos += (int)( amount - read );
                     return amount;
                 }
 
                 // Couldn't get all the bytes, skip what we read
-                read += streamBuffer->available();
-                streamBuffer->advance( streamBuffer->getCount() );
+                read += this->count - this->pos;
+                this->pos = this->count;
 
                 return read;
             }
@@ -461,48 +321,53 @@ long long BufferedInputStream::skip( lon
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-int BufferedInputStream::bufferData( InputStream* inputStream, Pointer<StreamBuffer>& buffer )
+int BufferedInputStream::bufferData( InputStream* inputStream, unsigned char*& buffer )
     throw ( decaf::io::IOException ){
 
     try{
 
-        if( !buffer->isMarked() || buffer->isReadLimitExceeded() ) {
+        if( this->markPos == -1 || pos - markPos >= markLimit ) {
             // Mark position not set or exceeded readlimit
-            int result = inputStream->read( buffer->getBuffer(), buffer->getBufferSize() );
+            int result = inputStream->read( buffer, this->bufferSize );
             if( result > 0 ) {
-                buffer->unmark();
-                buffer->clear();
-                buffer->advanceTail( result == -1 ? 0 : result );
+                this->markLimit = 0;
+                this->markPos = -1;
+                this->pos = this->count = 0;
+                this->count = result == -1 ? 0 : result;
             }
 
             return result;
         }
 
-        int markPos = buffer->getMarkPos();
-        int markLimit = buffer->getMarkLimit();
-
-        if( markPos == 0 && markLimit > buffer->getBufferSize() ) {
+        if( this->markPos == 0 && this->markLimit > this->bufferSize ) {
 
             // Increase buffer size to accommodate the readlimit.
-            int newLength = buffer->getBufferSize() * 2;
+            int newLength = this->bufferSize * 2;
             if( newLength > markLimit ) {
                 newLength = markLimit;
             }
-            buffer->resize( newLength );
-        } else if( markPos > 0 ) {
-            System::arraycopy( buffer->getBuffer(), markPos,
-                               buffer->getBuffer(), 0, buffer->getBufferSize() - markPos );
+
+            unsigned char* temp = new unsigned char[newLength];
+            System::arraycopy( temp, 0, buffer, 0, count );
+            std::swap( temp, buffer );
+            delete [] temp;
+            this->bufferSize = newLength;
+
+            if( this->buff != NULL ) {
+                this->buff = buffer;
+            }
+
+        } else if( this->markPos > 0 ) {
+            System::arraycopy( buffer, markPos, buffer, 0, this->bufferSize - markPos );
         }
 
         // Set the new position and mark position
-        buffer->reverse( markPos );
-        buffer->setCount( 0 );
-        buffer->setMarkPos( 0 );
+        this->pos -= this->markPos;
+        this->count = this->markPos = 0;
 
-        int bytesread = inputStream->read( buffer->getBuffer(), buffer->getBufferSize(),
-                                           buffer->getPos(), buffer->getBufferSize() - buffer->getPos() );
+        int bytesread = inputStream->read( buffer, this->bufferSize, this->pos, this->bufferSize - this->pos );
 
-        buffer->setCount( bytesread <= 0 ? buffer->getPos() : buffer->getPos() + bytesread );
+        this->count = bytesread <= 0 ? this->pos : this->pos + bytesread;
 
         return bytesread;
     }

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io/BufferedInputStream.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io/BufferedInputStream.h?rev=927138&r1=927137&r2=927138&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io/BufferedInputStream.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/io/BufferedInputStream.h Wed Mar 24 17:56:55 2010
@@ -21,13 +21,10 @@
 #include <decaf/util/Config.h>
 #include <decaf/io/FilterInputStream.h>
 #include <decaf/lang/exceptions/IllegalArgumentException.h>
-#include <decaf/lang/Pointer.h>
 
 namespace decaf{
 namespace io{
 
-    class StreamBuffer;
-
     /**
      * A wrapper around another input stream that performs
      * a buffered read, where it reads more data than it needs
@@ -37,10 +34,15 @@ namespace io{
     class DECAF_API BufferedInputStream : public FilterInputStream {
     private:
 
-        // Internal data buffer, uses a smart pointer so that async close
-        // operations allow read methods fail gracefully instead of segfaulting
-        // on access to invalid memory.
-        decaf::lang::Pointer<StreamBuffer> buffer;
+        int pos;
+        int count;
+        int markLimit;
+        int markPos;
+        int bufferSize;
+        unsigned char* buff;
+
+        // Used to swap the active buffer with so it can be safely deleted later.
+        unsigned char* deleteBuff;
 
     private:
 
@@ -119,7 +121,7 @@ namespace io{
 
     private:
 
-        int bufferData( InputStream* stream, decaf::lang::Pointer<StreamBuffer>& buffer )
+        int bufferData( InputStream* stream, unsigned char*& buffer )
             throw ( decaf::io::IOException );
 
     };