You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2006/10/02 15:27:49 UTC
svn commit: r452032 - in
/incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io:
EOFException.h FilterInputStream.h FilterOutputStream.h InputStream.h
Author: tabish
Date: Mon Oct 2 06:27:48 2006
New Revision: 452032
URL: http://svn.apache.org/viewvc?view=rev&rev=452032
Log:
Adding new code for Openwire Support
Added:
incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io/EOFException.h (with props)
incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io/FilterInputStream.h (with props)
incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io/FilterOutputStream.h (with props)
Modified:
incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io/InputStream.h
Added: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io/EOFException.h
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io/EOFException.h?view=auto&rev=452032
==============================================================================
--- incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io/EOFException.h (added)
+++ incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io/EOFException.h Mon Oct 2 06:27:48 2006
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_IO_EOFEXCEPTION_H
+#define ACTIVEMQ_IO_EOFEXCEPTION_H
+
+#include <activemq/io/IOException.h>
+
+namespace activemq{
+namespace io{
+
+ /*
+ * Signals that an End of File exception has occurred.
+ */
+ class EOFException : public io::IOException
+ {
+ public:
+
+ /**
+ * Default Constructor
+ */
+ EOFException(){}
+
+ /**
+ * Copy Constructor
+ * @param ex the exception to copy
+ */
+ EOFException( const exceptions::ActiveMQException& ex ){
+ *(exceptions::ActiveMQException*)this = ex;
+ }
+
+ /**
+ * Copy Constructor
+ * @param ex the exception to copy, which is an instance of this type
+ */
+ EOFException( const EOFException& ex ){
+ *(exceptions::ActiveMQException*)this = ex;
+ }
+
+ /**
+ * Consturctor
+ * @param file name of the file were the exception occured.
+ * @param lineNumber line where the exception occured
+ * @param msg the message that was generated
+ */
+ EOFException( const char* file, const int lineNumber,
+ const char* msg, ... )
+ {
+ va_list vargs;
+ va_start( vargs, msg );
+ buildMessage( msg, vargs );
+
+ // Set the first mark for this exception.
+ setMark( file, lineNumber );
+ }
+
+ /**
+ * Clones this exception. This is useful for cases where you need
+ * to preserve the type of the original exception as well as the message.
+ * All subclasses should override.
+ */
+ virtual exceptions::ActiveMQException* clone() const{
+ return new EOFException( *this );
+ }
+
+ virtual ~EOFException(){}
+
+ };
+
+}}
+
+#endif /*ACTIVEMQ_IO_EOFEXCEPTION_H*/
Propchange: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io/EOFException.h
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io/FilterInputStream.h
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io/FilterInputStream.h?view=auto&rev=452032
==============================================================================
--- incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io/FilterInputStream.h (added)
+++ incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io/FilterInputStream.h Mon Oct 2 06:27:48 2006
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _ACTIVEMQ_IO_FILTERINPUTSTREAM_H_
+#define _ACTIVEMQ_IO_FILTERINPUTSTREAM_H_
+
+#include <activemq/io/InputStream.h>
+#include <activemq/io/IOException.h>
+#include <activemq/concurrent/Mutex.h>
+
+namespace activemq{
+namespace io{
+
+ /**
+ * A FilterInputStream contains some other input stream, which it uses
+ * as its basic source of data, possibly transforming the data along the
+ * way or providing additional functionality. The class FilterInputStream
+ * itself simply overrides all methods of InputStream with versions
+ * that pass all requests to the contained input stream. Subclasses of
+ * FilterInputStream may further override some of these methods and may
+ * also provide additional methods and fields.
+ */
+ class FilterInputStream : public InputStream
+ {
+ protected:
+
+ // The input stream to wrap
+ InputStream* inputStream;
+
+ // Synchronization object.
+ concurrent::Mutex mutex;
+
+ // Indicates if we own the wrapped stream
+ bool own;
+
+ public:
+
+ /**
+ * Constructor to create a wrapped InputStream
+ * @param inputStream the stream to wrap and filter
+ * @param own indicates if we own the stream object, defaults to false
+ */
+ FilterInputStream( InputStream* inputStream, bool own = false ) {
+ this->inputStream = inputStream;
+ this->own = own;
+ }
+
+ virtual ~FilterInputStream() {
+ try {
+ if( own == true ) delete inputStream;
+ }
+ AMQ_CATCH_NOTHROW( IOException )
+ AMQ_CATCHALL_NOTHROW( )
+ }
+
+ /**
+ * Returns the number of bytes that can be read from this input stream
+ * without blocking. This method simply performs in.available() and
+ * returns the result.
+ * @return the number of bytes available without blocking.
+ */
+ virtual int available() const throw ( IOException ) {
+ try {
+ return inputStream->available();
+ }
+ AMQ_CATCH_RETHROW( IOException )
+ AMQ_CATCHALL_THROW( IOException )
+ }
+
+ /**
+ * Reads the next byte of data from this input stream. The value byte
+ * is returned as an unsigned char in the range 0 to 255. If no byte is
+ * available because the end of the stream has been reached, the value
+ * -1 is returned. This method blocks until input data is available,
+ * the end of the stream is detected, or an exception is thrown.
+ * This method simply performs in.read() and returns the result.
+ * @return The next byte.
+ * @throws IOException thrown if an error occurs.
+ */
+ virtual unsigned char read() throw ( IOException ) {
+ try {
+ return inputStream->read();
+ }
+ AMQ_CATCH_RETHROW( IOException )
+ AMQ_CATCHALL_THROW( IOException )
+ }
+
+ /**
+ * Reads up to len bytes of data from this input stream into an array
+ * of bytes. This method blocks until some input is available.
+ * This method simply performs in.read(b, len) and returns the result.
+ * @param buffer (out) the target buffer.
+ * @param bufferSize the size of the output buffer.
+ * @return The number of bytes read or -1 if EOS is detected
+ * @throws IOException thrown if an error occurs.
+ */
+ virtual int read( unsigned char* buffer, const int bufferSize )
+ throw ( IOException )
+ {
+ try {
+ return inputStream->read( buffer, bufferSize );
+ }
+ AMQ_CATCH_RETHROW( IOException )
+ AMQ_CATCHALL_THROW( IOException )
+ }
+
+ /**
+ * Close the Stream, the FilterOutputStream simply calls the close
+ * method of the underlying stream
+ * @throws CMSException
+ */
+ virtual void close() throw ( cms::CMSException ) {
+ try {
+ inputStream->close();
+ }
+ AMQ_CATCH_RETHROW( IOException )
+ AMQ_CATCHALL_THROW( IOException )
+ }
+
+ public: // Synchronizable
+
+ /**
+ * Waits on a signal from this object, which is generated
+ * by a call to Notify. Must have this object locked before
+ * calling.
+ * @throws ActiveMQException
+ */
+ virtual void lock() throw( exceptions::ActiveMQException ){
+ mutex.lock();
+ }
+
+ /**
+ * Unlocks the object.
+ * @throws ActiveMQException
+ */
+ virtual void unlock() throw( exceptions::ActiveMQException ){
+ mutex.unlock();
+ }
+
+ /**
+ * Waits on a signal from this object, which is generated
+ * by a call to Notify. Must have this object locked before
+ * calling.
+ * @throws ActiveMQException
+ */
+ virtual void wait() throw( exceptions::ActiveMQException ){
+ mutex.wait();
+ }
+
+ /**
+ * Waits on a signal from this object, which is generated
+ * by a call to Notify. Must have this object locked before
+ * calling. This wait will timeout after the specified time
+ * interval.
+ * @param millisecs the time in millisecsonds to wait, or WAIT_INIFINITE
+ * @throws ActiveMQException
+ */
+ virtual void wait( unsigned long millisecs ) throw( exceptions::ActiveMQException ){
+ mutex.wait(millisecs);
+ }
+
+ /**
+ * Signals a waiter on this object that it can now wake
+ * up and continue. Must have this object locked before
+ * calling.
+ * @throws ActiveMQException
+ */
+ virtual void notify() throw( exceptions::ActiveMQException ){
+ mutex.notify();
+ }
+
+ /**
+ * Signals the waiters on this object that it can now wake
+ * up and continue. Must have this object locked before
+ * calling.
+ * @throws ActiveMQException
+ */
+ virtual void notifyAll() throw( exceptions::ActiveMQException ){
+ mutex.notifyAll();
+ }
+
+ };
+
+}}
+
+#endif /*_ACTIVEMQ_IO_FILTERINPUTSTREAM_H_*/
Propchange: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io/FilterInputStream.h
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io/FilterOutputStream.h
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io/FilterOutputStream.h?view=auto&rev=452032
==============================================================================
--- incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io/FilterOutputStream.h (added)
+++ incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io/FilterOutputStream.h Mon Oct 2 06:27:48 2006
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _ACTIVEMQ_IO_FILTEROUTPUTSTREAM_H_
+#define _ACTIVEMQ_IO_FILTEROUTPUTSTREAM_H_
+
+#include <activemq/io/OutputStream.h>
+#include <activemq/io/IOException.h>
+#include <activemq/concurrent/Mutex.h>
+
+namespace activemq{
+namespace io{
+
+ /**
+ * This class is the superclass of all classes that filter output
+ * streams. These streams sit on top of an already existing output
+ * stream (the underlying output stream) which it uses as its basic
+ * sink of data, but possibly transforming the data along the way or
+ * providing additional functionality.
+ *
+ * The class FilterOutputStream itself simply overrides all methods of
+ * OutputStream with versions that pass all requests to the underlying
+ * output stream. Subclasses of FilterOutputStream may further override
+ * some of these methods as well as provide additional methods and
+ * fields.
+ *
+ * Due to the lack of garbage collection in C++ a design decision was
+ * made to add a boolean parameter to the constructor indicating if the
+ * wrapped <code>InputStream</code> is owned by this object. That way
+ * creation of the underlying stream can occur in a Java like way. Ex:
+ *
+ * DataOutputStream os = new DataOutputStream( new OutputStream(), true )
+ */
+ class FilterOutputStream : public OutputStream
+ {
+ protected:
+
+ // The output Stream to wrap
+ OutputStream* outputStream;
+
+ // Synchronization object.
+ concurrent::Mutex mutex;
+
+ // Indicates if we own the wrapped stream
+ bool own;
+
+ public:
+
+ /**
+ * Constructor, creates a wrapped output stream
+ * @param outputStream the OutputStream to wrap
+ */
+ FilterOutputStream( OutputStream* outputStream, bool own = false ){
+ this->outputStream = outputStream;
+ this->own = own;
+ }
+
+ virtual ~FilterOutputStream() {
+ try {
+ if( own == true ) delete outputStream;
+ }
+ AMQ_CATCH_NOTHROW( IOException )
+ AMQ_CATCHALL_NOTHROW( )
+ }
+
+ /**
+ * Writes a single byte to the output stream. The write method of
+ * FilterOutputStream calls the write method of its underlying output
+ * stream, that is, it performs out.write(b).
+ * @param c the byte.
+ * @throws IOException thrown if an error occurs.
+ */
+ virtual void write( const unsigned char c ) throw ( IOException ) {
+ try {
+ outputStream->write( c );
+ }
+ AMQ_CATCH_RETHROW( IOException )
+ AMQ_CATCHALL_THROW( IOException )
+ }
+
+ /**
+ * Writes an array of bytes to the output stream. The write method of
+ * FilterOutputStream calls the write method of one argument on each
+ * byte to output.
+ * @param buffer The array of bytes to write.
+ * @param len The number of bytes from the buffer to be written.
+ * @throws IOException thrown if an error occurs.
+ */
+ virtual void write( const unsigned char* buffer, const int len ) throw ( IOException ) {
+ try {
+ for( int ix = 0; ix < len; ++ix )
+ {
+ outputStream->write( buffer[ix] );
+ }
+ }
+ AMQ_CATCH_RETHROW( IOException )
+ AMQ_CATCHALL_THROW( IOException )
+ }
+
+ /**
+ * Flushes any pending writes in this output stream.
+ * The flush method of FilterOutputStream calls the flush method
+ * of its underlying output stream
+ * @throws IOException
+ */
+ virtual void flush() throw ( IOException ) {
+ try {
+ outputStream->flush();
+ }
+ AMQ_CATCH_RETHROW( IOException )
+ AMQ_CATCHALL_THROW( IOException )
+ }
+
+ /**
+ * Close the Stream, the FilterOutputStream simply calls the close
+ * method of the underlying stream
+ * @throws CMSException
+ */
+ virtual void close() throw ( cms::CMSException ) {
+ try {
+ outputStream->close();
+ }
+ AMQ_CATCH_RETHROW( IOException )
+ AMQ_CATCHALL_THROW( IOException )
+ }
+
+ public: // Synchronizable
+
+ /**
+ * Waits on a signal from this object, which is generated
+ * by a call to Notify. Must have this object locked before
+ * calling.
+ * @throws ActiveMQException
+ */
+ virtual void lock() throw( exceptions::ActiveMQException ){
+ mutex.lock();
+ }
+
+ /**
+ * Unlocks the object.
+ * @throws ActiveMQException
+ */
+ virtual void unlock() throw( exceptions::ActiveMQException ){
+ mutex.unlock();
+ }
+
+ /**
+ * Waits on a signal from this object, which is generated
+ * by a call to Notify. Must have this object locked before
+ * calling.
+ * @throws ActiveMQException
+ */
+ virtual void wait() throw( exceptions::ActiveMQException ){
+ mutex.wait();
+ }
+
+ /**
+ * Waits on a signal from this object, which is generated
+ * by a call to Notify. Must have this object locked before
+ * calling. This wait will timeout after the specified time
+ * interval.
+ * @param millisecs the time in millisecsonds to wait, or WAIT_INIFINITE
+ * @throws ActiveMQException
+ */
+ virtual void wait( unsigned long millisecs ) throw( exceptions::ActiveMQException ){
+ mutex.wait(millisecs);
+ }
+
+ /**
+ * Signals a waiter on this object that it can now wake
+ * up and continue. Must have this object locked before
+ * calling.
+ * @throws ActiveMQException
+ */
+ virtual void notify() throw( exceptions::ActiveMQException ){
+ mutex.notify();
+ }
+
+ /**
+ * Signals the waiters on this object that it can now wake
+ * up and continue. Must have this object locked before
+ * calling.
+ * @throws ActiveMQException
+ */
+ virtual void notifyAll() throw( exceptions::ActiveMQException ){
+ mutex.notifyAll();
+ }
+
+ };
+
+}}
+
+#endif /*_ACTIVEMQ_IO_FILTEROUTPUTSTREAM_H_*/
Propchange: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io/FilterOutputStream.h
------------------------------------------------------------------------------
svn:eol-style = native
Modified: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io/InputStream.h
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io/InputStream.h?view=diff&rev=452032&r1=452031&r2=452032
==============================================================================
--- incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io/InputStream.h (original)
+++ incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/io/InputStream.h Mon Oct 2 06:27:48 2006
@@ -61,6 +61,26 @@
virtual int read( unsigned char* buffer, const int bufferSize )
throw ( IOException ) = 0;
+ /**
+ * Skips over and discards n bytes of data from this input stream. The
+ * skip method may, for a variety of reasons, end up skipping over some
+ * smaller number of bytes, possibly 0. This may result from any of a
+ * number of conditions; reaching end of file before n bytes have been
+ * skipped is only one possibility. The actual number of bytes skipped
+ * is returned. If n is negative, no bytes are skipped.
+ * <p>
+ * The skip method of InputStream creates a byte array and then
+ * repeatedly reads into it until n bytes have been read or the end
+ * of the stream has been reached. Subclasses are encouraged to
+ * provide a more efficient implementation of this method.
+ * @param num - the number of bytes to skip
+ * @returns total butes skipped
+ * @throws IOException if an error occurs
+ */
+ virtual int skip( int num ) throw ( io::IOException ) {
+ return 0;
+ };
+
};
}}