You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2006/10/02 15:27:49 UTC

svn commit: r452032 - in /incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io: EOFException.h FilterInputStream.h FilterOutputStream.h InputStream.h

Author: tabish
Date: Mon Oct  2 06:27:48 2006
New Revision: 452032

URL: http://svn.apache.org/viewvc?view=rev&rev=452032
Log:
Adding new code for Openwire Support

Added:
    incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io/EOFException.h   (with props)
    incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io/FilterInputStream.h   (with props)
    incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io/FilterOutputStream.h   (with props)
Modified:
    incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io/InputStream.h

Added: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io/EOFException.h
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io/EOFException.h?view=auto&rev=452032
==============================================================================
--- incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io/EOFException.h (added)
+++ incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io/EOFException.h Mon Oct  2 06:27:48 2006
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_IO_EOFEXCEPTION_H
+#define ACTIVEMQ_IO_EOFEXCEPTION_H
+
+#include <activemq/io/IOException.h>
+
+namespace activemq{
+namespace io{
+
+    /*
+     * Signals that an End of File exception has occurred.
+     */
+    class EOFException : public io::IOException
+    {
+    public:
+
+        /**
+         * Default Constructor
+         */
+        EOFException(){}
+
+        /**
+         * Copy Constructor
+         * @param ex the exception to copy
+         */
+        EOFException( const exceptions::ActiveMQException& ex ){
+            *(exceptions::ActiveMQException*)this = ex;
+        }
+        
+        /**
+         * Copy Constructor
+         * @param ex the exception to copy, which is an instance of this type
+         */
+        EOFException( const EOFException& ex ){
+            *(exceptions::ActiveMQException*)this = ex;
+        }
+
+        /**
+         * Consturctor
+         * @param file name of the file were the exception occured.
+         * @param lineNumber line where the exception occured
+         * @param msg the message that was generated
+         */
+        EOFException( const char* file, const int lineNumber,
+                      const char* msg, ... )
+        {
+            va_list vargs;
+            va_start( vargs, msg );
+            buildMessage( msg, vargs );
+            
+            // Set the first mark for this exception.
+            setMark( file, lineNumber );
+        }
+        
+        /**
+         * Clones this exception.  This is useful for cases where you need
+         * to preserve the type of the original exception as well as the message.
+         * All subclasses should override.
+         */
+        virtual exceptions::ActiveMQException* clone() const{
+            return new EOFException( *this );
+        }
+        
+        virtual ~EOFException(){}
+        
+    };
+
+}}
+
+#endif /*ACTIVEMQ_IO_EOFEXCEPTION_H*/

Propchange: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io/EOFException.h
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io/FilterInputStream.h
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io/FilterInputStream.h?view=auto&rev=452032
==============================================================================
--- incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io/FilterInputStream.h (added)
+++ incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io/FilterInputStream.h Mon Oct  2 06:27:48 2006
@@ -0,0 +1,200 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+#ifndef _ACTIVEMQ_IO_FILTERINPUTSTREAM_H_
+#define _ACTIVEMQ_IO_FILTERINPUTSTREAM_H_
+
+#include <activemq/io/InputStream.h>
+#include <activemq/io/IOException.h>
+#include <activemq/concurrent/Mutex.h>
+
+namespace activemq{
+namespace io{
+
+    /** 
+     * A FilterInputStream contains some other input stream, which it uses 
+     * as its basic source of data, possibly transforming the data along the 
+     * way or providing additional functionality. The class FilterInputStream
+     * itself simply overrides all methods of InputStream with versions 
+     * that pass all requests to the contained input stream. Subclasses of
+     * FilterInputStream  may further override some of these methods and may 
+     * also provide additional methods and fields.
+     */
+    class FilterInputStream : public InputStream
+    {
+    protected:
+    
+        // The input stream to wrap
+        InputStream* inputStream;
+        
+        // Synchronization object.
+        concurrent::Mutex mutex;
+        
+        // Indicates if we own the wrapped stream
+        bool own;
+
+    public:
+    
+        /**
+         * Constructor to create a wrapped InputStream
+         * @param inputStream the stream to wrap and filter
+         * @param own indicates if we own the stream object, defaults to false
+         */ 
+        FilterInputStream( InputStream* inputStream, bool own = false ) {
+            this->inputStream = inputStream;
+            this->own = own;
+        }
+        
+        virtual ~FilterInputStream() {
+            try {
+                if( own == true ) delete inputStream;
+            }
+            AMQ_CATCH_NOTHROW( IOException )
+            AMQ_CATCHALL_NOTHROW( )     
+        }
+        
+        /**
+         * Returns the number of bytes that can be read from this input stream 
+         * without blocking.  This method simply performs in.available() and 
+         * returns the result. 
+         * @return the number of bytes available without blocking.
+         */
+        virtual int available() const throw ( IOException ) {
+            try {
+                return inputStream->available();
+            }
+            AMQ_CATCH_RETHROW( IOException )
+            AMQ_CATCHALL_THROW( IOException )
+        }
+
+        /**
+         * Reads the next byte of data from this input stream. The value byte 
+         * is returned as an unsigned char in the range 0 to 255. If no byte is
+         * available because the end of the stream has been reached, the value 
+         * -1 is returned. This method blocks until input data is available, 
+         * the end of the stream is detected, or an exception is thrown.
+         * This method simply performs in.read() and returns the result. 
+         * @return The next byte.
+         * @throws IOException thrown if an error occurs.
+         */
+        virtual unsigned char read() throw ( IOException ) {
+            try {
+                return inputStream->read();
+            }
+            AMQ_CATCH_RETHROW( IOException )
+            AMQ_CATCHALL_THROW( IOException )
+        }
+        
+        /**
+         * Reads up to len bytes of data from this input stream into an array 
+         * of bytes. This method blocks until some input is available.
+         * This method simply performs in.read(b, len) and returns the result. 
+         * @param buffer (out) the target buffer.
+         * @param bufferSize the size of the output buffer.
+         * @return The number of bytes read or -1 if EOS is detected
+         * @throws IOException thrown if an error occurs.
+         */
+        virtual int read( unsigned char* buffer, const int bufferSize ) 
+            throw ( IOException )
+        {
+            try {
+                return inputStream->read( buffer, bufferSize );
+            }
+            AMQ_CATCH_RETHROW( IOException )
+            AMQ_CATCHALL_THROW( IOException )
+        }
+        
+        /**
+         * Close the Stream, the FilterOutputStream simply calls the close
+         * method of the underlying stream
+         * @throws CMSException
+         */
+        virtual void close() throw ( cms::CMSException ) {
+            try {
+                inputStream->close();
+            }
+            AMQ_CATCH_RETHROW( IOException )
+            AMQ_CATCHALL_THROW( IOException )
+        }
+
+    public:  // Synchronizable
+    
+        /**
+         * Waits on a signal from this object, which is generated
+         * by a call to Notify.  Must have this object locked before
+         * calling.
+         * @throws ActiveMQException
+         */
+        virtual void lock() throw( exceptions::ActiveMQException ){
+            mutex.lock();
+        }
+    
+        /**
+         * Unlocks the object.
+         * @throws ActiveMQException
+         */
+        virtual void unlock() throw( exceptions::ActiveMQException ){ 
+            mutex.unlock();
+        }
+        
+        /**
+         * Waits on a signal from this object, which is generated
+         * by a call to Notify.  Must have this object locked before
+         * calling.
+         * @throws ActiveMQException
+         */
+        virtual void wait() throw( exceptions::ActiveMQException ){
+            mutex.wait();
+        }
+    
+        /**
+         * Waits on a signal from this object, which is generated
+         * by a call to Notify.  Must have this object locked before
+         * calling.  This wait will timeout after the specified time
+         * interval.
+         * @param millisecs the time in millisecsonds to wait, or WAIT_INIFINITE
+         * @throws ActiveMQException
+         */
+        virtual void wait( unsigned long millisecs ) throw( exceptions::ActiveMQException ){
+            mutex.wait(millisecs);
+        }
+
+        /**
+         * Signals a waiter on this object that it can now wake
+         * up and continue.  Must have this object locked before
+         * calling.
+         * @throws ActiveMQException
+         */
+        virtual void notify() throw( exceptions::ActiveMQException ){
+            mutex.notify();
+        }
+        
+        /**
+         * Signals the waiters on this object that it can now wake
+         * up and continue.  Must have this object locked before
+         * calling.
+         * @throws ActiveMQException
+         */
+        virtual void notifyAll() throw( exceptions::ActiveMQException ){
+            mutex.notifyAll();
+        }
+    
+    };
+
+}}
+
+#endif /*_ACTIVEMQ_IO_FILTERINPUTSTREAM_H_*/

Propchange: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io/FilterInputStream.h
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io/FilterOutputStream.h
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io/FilterOutputStream.h?view=auto&rev=452032
==============================================================================
--- incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io/FilterOutputStream.h (added)
+++ incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io/FilterOutputStream.h Mon Oct  2 06:27:48 2006
@@ -0,0 +1,207 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _ACTIVEMQ_IO_FILTEROUTPUTSTREAM_H_
+#define _ACTIVEMQ_IO_FILTEROUTPUTSTREAM_H_
+
+#include <activemq/io/OutputStream.h>
+#include <activemq/io/IOException.h>
+#include <activemq/concurrent/Mutex.h>
+
+namespace activemq{
+namespace io{
+
+    /**
+     * This class is the superclass of all classes that filter output 
+     * streams. These streams sit on top of an already existing output 
+     * stream (the underlying output stream) which it uses as its basic 
+     * sink of data, but possibly transforming the data along the way or 
+     * providing additional functionality.
+     * 
+     * The class FilterOutputStream itself simply overrides all methods of 
+     * OutputStream with versions that pass all requests to the underlying 
+     * output stream. Subclasses of FilterOutputStream may further override 
+     * some of these methods as well as provide additional methods and 
+     * fields.
+     * 
+     * Due to the lack of garbage collection in C++ a design decision was 
+     * made to add a boolean parameter to the constructor indicating if the
+     * wrapped <code>InputStream</code> is owned by this object.  That way
+     * creation of the underlying stream can occur in a Java like way. Ex:
+     * 
+     *  DataOutputStream os = new DataOutputStream( new OutputStream(), true )
+     */
+    class FilterOutputStream : public OutputStream
+    {
+    protected:
+    
+        // The output Stream to wrap
+        OutputStream* outputStream;
+        
+        // Synchronization object.
+        concurrent::Mutex mutex;
+        
+        // Indicates if we own the wrapped stream
+        bool own;
+
+    public:
+
+        /**
+         * Constructor, creates a wrapped output stream
+         * @param outputStream the OutputStream to wrap
+         */
+    	FilterOutputStream( OutputStream* outputStream, bool own = false ){
+            this->outputStream = outputStream;
+            this->own = own;
+        }
+        
+    	virtual ~FilterOutputStream() {
+            try {
+                if( own == true ) delete outputStream;
+            }
+            AMQ_CATCH_NOTHROW( IOException )
+            AMQ_CATCHALL_NOTHROW( )                 
+        }
+
+        /**
+         * Writes a single byte to the output stream.  The write method of 
+         * FilterOutputStream calls the write method of its underlying output 
+         * stream, that is, it performs out.write(b).
+         * @param c the byte.
+         * @throws IOException thrown if an error occurs.
+         */
+        virtual void write( const unsigned char c ) throw ( IOException ) {
+            try {
+                outputStream->write( c );
+            }
+            AMQ_CATCH_RETHROW( IOException )
+            AMQ_CATCHALL_THROW( IOException )
+        }
+        
+        /**
+         * Writes an array of bytes to the output stream.  The write method of 
+         * FilterOutputStream calls the write method of one argument on each 
+         * byte to output.
+         * @param buffer The array of bytes to write.
+         * @param len The number of bytes from the buffer to be written.
+         * @throws IOException thrown if an error occurs.
+         */
+        virtual void write( const unsigned char* buffer, const int len ) throw ( IOException ) {
+            try {
+                for( int ix = 0; ix < len; ++ix )
+                {
+                    outputStream->write( buffer[ix] );
+                }
+            }
+            AMQ_CATCH_RETHROW( IOException )
+            AMQ_CATCHALL_THROW( IOException )
+        }
+        
+        /**
+         * Flushes any pending writes in this output stream.
+         * The flush method of FilterOutputStream calls the flush method 
+         * of its underlying output stream
+         * @throws IOException
+         */
+        virtual void flush() throw ( IOException ) {
+            try {
+                outputStream->flush();
+            }
+            AMQ_CATCH_RETHROW( IOException )
+            AMQ_CATCHALL_THROW( IOException )
+        }
+        
+        /**
+         * Close the Stream, the FilterOutputStream simply calls the close
+         * method of the underlying stream
+         * @throws CMSException
+         */
+        virtual void close() throw ( cms::CMSException ) {
+            try {
+                outputStream->close();
+            }
+            AMQ_CATCH_RETHROW( IOException )
+            AMQ_CATCHALL_THROW( IOException )
+        }
+
+    public:  // Synchronizable
+    
+        /**
+         * Waits on a signal from this object, which is generated
+         * by a call to Notify.  Must have this object locked before
+         * calling.
+         * @throws ActiveMQException
+         */
+        virtual void lock() throw( exceptions::ActiveMQException ){
+            mutex.lock();
+        }
+    
+        /**
+         * Unlocks the object.
+         * @throws ActiveMQException
+         */
+        virtual void unlock() throw( exceptions::ActiveMQException ){ 
+            mutex.unlock();
+        }
+        
+        /**
+         * Waits on a signal from this object, which is generated
+         * by a call to Notify.  Must have this object locked before
+         * calling.
+         * @throws ActiveMQException
+         */
+        virtual void wait() throw( exceptions::ActiveMQException ){
+            mutex.wait();
+        }
+    
+        /**
+         * Waits on a signal from this object, which is generated
+         * by a call to Notify.  Must have this object locked before
+         * calling.  This wait will timeout after the specified time
+         * interval.
+         * @param millisecs the time in millisecsonds to wait, or WAIT_INIFINITE
+         * @throws ActiveMQException
+         */
+        virtual void wait( unsigned long millisecs ) throw( exceptions::ActiveMQException ){
+            mutex.wait(millisecs);
+        }
+
+        /**
+         * Signals a waiter on this object that it can now wake
+         * up and continue.  Must have this object locked before
+         * calling.
+         * @throws ActiveMQException
+         */
+        virtual void notify() throw( exceptions::ActiveMQException ){
+            mutex.notify();
+        }
+        
+        /**
+         * Signals the waiters on this object that it can now wake
+         * up and continue.  Must have this object locked before
+         * calling.
+         * @throws ActiveMQException
+         */
+        virtual void notifyAll() throw( exceptions::ActiveMQException ){
+            mutex.notifyAll();
+        }
+
+    };
+
+}}
+
+#endif /*_ACTIVEMQ_IO_FILTEROUTPUTSTREAM_H_*/

Propchange: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io/FilterOutputStream.h
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io/InputStream.h
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io/InputStream.h?view=diff&rev=452032&r1=452031&r2=452032
==============================================================================
--- incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io/InputStream.h (original)
+++ incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io/InputStream.h Mon Oct  2 06:27:48 2006
@@ -61,6 +61,26 @@
         virtual int read( unsigned char* buffer, const int bufferSize ) 
             throw ( IOException ) = 0;
 
+        /**
+         * Skips over and discards n bytes of data from this input stream. The 
+         * skip method may, for a variety of reasons, end up skipping over some
+         * smaller number of bytes, possibly 0. This may result from any of a
+         * number of conditions; reaching end of file before n bytes have been
+         * skipped is only one possibility. The actual number of bytes skipped
+         * is returned. If n is negative, no bytes are skipped.
+         * <p>
+         * The skip method of InputStream creates a byte array and then 
+         * repeatedly reads into it until n bytes have been read or the end 
+         * of the stream has been reached. Subclasses are encouraged to 
+         * provide a more efficient implementation of this method.
+         * @param num - the number of bytes to skip
+         * @returns total butes skipped
+         * @throws IOException if an error occurs
+         */ 
+        virtual int skip( int num ) throw ( io::IOException ) {
+            return 0;
+        };
+
     };
     
 }}